- Timestamp:
- Mar 9, 2007 11:25:11 PM (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib-util/src/pjstun-srv-test/turn_usage.c
r1049 r1052 22 22 23 23 #define MAX_CLIENTS 8000 24 #define MAX_PEER_PER_CLIENT S1624 #define MAX_PEER_PER_CLIENT 16 25 25 #define START_PORT 2000 26 26 #define END_PORT 65530 27 27 28 static void tu_on_rx_data(pj_stun_usage *usage, 29 void *pkt, 30 pj_size_t pkt_size, 31 const pj_sockaddr_t *src_addr, 32 unsigned src_addr_len); 33 static void tu_on_destroy(pj_stun_usage *usage); 34 35 static pj_status_t sess_on_send_msg(pj_stun_session *sess, 36 const void *pkt, 37 pj_size_t pkt_size, 38 const pj_sockaddr_t *dst_addr, 39 unsigned addr_len); 40 static pj_status_t sess_on_rx_request(pj_stun_session *sess, 41 const pj_uint8_t *pkt, 42 unsigned pkt_len, 43 const pj_stun_msg *msg, 44 const pj_sockaddr_t *src_addr, 45 unsigned src_addr_len); 28 /* 29 * Forward declarations. 30 */ 31 struct turn_usage; 32 struct turn_client; 33 34 static void tu_on_rx_data(pj_stun_usage *usage, 35 void *pkt, 36 pj_size_t pkt_size, 37 const pj_sockaddr_t *src_addr, 38 unsigned src_addr_len); 39 static void tu_on_destroy(pj_stun_usage *usage); 40 static pj_status_t tu_sess_on_send_msg(pj_stun_session *sess, 41 const void *pkt, 42 pj_size_t pkt_size, 43 const pj_sockaddr_t *dst_addr, 44 unsigned addr_len); 45 static pj_status_t tu_sess_on_rx_request(pj_stun_session *sess, 46 const pj_uint8_t *pkt, 47 unsigned pkt_len, 48 const pj_stun_msg *msg, 49 const pj_sockaddr_t *src_addr, 50 unsigned src_addr_len); 51 52 static pj_status_t client_create(struct turn_usage *tu, 53 const pj_sockaddr_t *src_addr, 54 unsigned src_addr_len, 55 struct turn_client **p_client); 56 static pj_status_t client_destroy(struct turn_client *client, 57 pj_status_t reason); 58 static pj_status_t client_handle_stun_msg(struct turn_client *client, 59 const pj_stun_msg *msg, 60 const pj_sockaddr_t *src_addr, 61 unsigned src_addr_len); 46 62 47 63 … … 51 67 pj_stun_endpoint *endpt; 52 68 pj_ioqueue_t *ioqueue; 69 pj_timer_heap_t *timer_heap; 53 70 pj_pool_t *pool; 71 pj_mutex_t *mutex; 54 72 pj_stun_usage *usage; 55 73 int type; 56 74 pj_stun_session *default_session; 57 75 pj_hash_table_t *client_htable; 58 pj_hash_table_t *peer_htable;59 76 60 77 unsigned max_bw_kbps; … … 68 85 struct turn_client 69 86 { 87 char obj_name[PJ_MAX_OBJ_NAME]; 70 88 struct turn_usage *tu; 71 89 pj_pool_t *pool; 72 pj_sockaddr_in addr;73 90 pj_stun_session *session; 74 91 pj_mutex_t *mutex; 92 int sock_type; 93 94 /* Socket and socket address of the allocated port */ 95 pj_sock_t sock; 96 pj_ioqueue_key_t *key; 97 pj_sockaddr_in client_addr; 98 99 /* Allocation properties */ 75 100 unsigned bw_kbps; 76 101 unsigned lifetime; 77 78 pj_sock_t sock; 79 pj_ioqueue_key_t *key; 80 char packet[4000]; 81 pj_sockaddr_in src_addr; 82 int src_addr_len; 83 pj_ioqueue_op_key_t read_key; 84 pj_ioqueue_op_key_t write_key; 85 86 pj_bool_t has_ad; 87 pj_bool_t ad_is_pending; 88 pj_sockaddr_in ad; 102 pj_timer_entry expiry_timer; 103 104 105 /* Hash table to keep all peers, key-ed by their address */ 106 pj_hash_table_t *peer_htable; 107 108 /* Active destination, or sin_addr.s_addr will be zero if 109 * no active destination is set. 110 */ 111 struct peer *active_peer; 112 113 /* Current packet received/sent from/to the allocated port */ 114 pj_uint8_t pkt[4000]; 115 pj_sockaddr_in pkt_src_addr; 116 int pkt_src_addr_len; 117 pj_ioqueue_op_key_t pkt_read_key; 118 pj_ioqueue_op_key_t pkt_write_key; 89 119 }; 90 120 … … 102 132 103 133 134 /* 135 * This is the only public API, to create and start the TURN usage. 136 */ 104 137 PJ_DEF(pj_status_t) pj_stun_turn_usage_create(pj_stun_server *srv, 105 138 int type, … … 127 160 tu->pf = si->pf; 128 161 tu->endpt = si->endpt; 162 tu->timer_heap = si->timer_heap; 129 163 tu->next_port = START_PORT; 130 164 … … 149 183 /* Init hash tables */ 150 184 tu->client_htable = pj_hash_create(tu->pool, MAX_CLIENTS); 151 tu->peer_htable = pj_hash_create(tu->pool, MAX_CLIENTS);152 185 153 186 /* Create default session */ 154 187 pj_bzero(&sess_cb, sizeof(sess_cb)); 155 sess_cb.on_send_msg = & sess_on_send_msg;156 sess_cb.on_rx_request = & sess_on_rx_request;188 sess_cb.on_send_msg = &tu_sess_on_send_msg; 189 sess_cb.on_rx_request = &tu_sess_on_rx_request; 157 190 status = pj_stun_session_create(si->endpt, "turns%p", &sess_cb, PJ_FALSE, 158 191 &tu->default_session); … … 166 199 pj_stun_session_set_user_data(tu->default_session, sd); 167 200 201 /* Create mutex */ 202 status = pj_mutex_create_recursive(pool, "turn%p", &tu->mutex); 203 if (status != PJ_SUCCESS) { 204 pj_stun_usage_destroy(tu->usage); 205 return status; 206 } 207 168 208 *p_bu = tu->usage; 169 209 … … 172 212 173 213 214 /* 215 * This is a callback called by usage.c when the particular STUN usage 216 * is to be destroyed. 217 */ 174 218 static void tu_on_destroy(pj_stun_usage *usage) 175 219 { 176 220 struct turn_usage *tu; 221 pj_hash_iterator_t hit, *it; 177 222 178 223 tu = (struct turn_usage*) pj_stun_usage_get_user_data(usage); 179 PJ_TODO(DESTROY_ALL_CLIENTS); 224 225 /* Destroy all clients */ 226 it = pj_hash_first(tu->client_htable, &hit); 227 while (it) { 228 struct turn_client *client; 229 230 client = (struct turn_client *)pj_hash_this(tu->client_htable, it); 231 client_destroy(client, PJ_SUCCESS); 232 233 it = pj_hash_next(tu->client_htable, it); 234 } 235 236 pj_stun_session_destroy(tu->default_session); 237 pj_mutex_destroy(tu->mutex); 180 238 pj_pool_release(tu->pool); 181 239 } 182 240 183 241 242 /* 243 * This is a callback called by the usage.c to notify the TURN usage, 244 * that incoming packet (may or may not be a STUN packet) is received 245 * on the port where the TURN usage is listening. 246 */ 184 247 static void tu_on_rx_data(pj_stun_usage *usage, 185 248 void *pkt, … … 189 252 { 190 253 struct turn_usage *tu; 191 pj_stun_session *session;192 254 struct turn_client *client; 193 255 unsigned flags; … … 201 263 src_addr_len, NULL); 202 264 203 if (client == NULL) { 204 session = tu->default_session; 205 } else { 206 session = client->session; 207 } 208 209 /* Handle packet to session */ 210 flags = PJ_STUN_CHECK_PACKET; 265 /* STUN message decoding flag */ 266 flags = 0; 211 267 if (tu->type == PJ_SOCK_DGRAM) 212 268 flags |= PJ_STUN_IS_DATAGRAM; 213 214 status = pj_stun_session_on_rx_pkt(session, (pj_uint8_t*)pkt, pkt_size, 215 flags, NULL, src_addr, src_addr_len); 216 if (status != PJ_SUCCESS) { 217 pj_stun_perror(THIS_FILE, "Error handling incoming packet", status); 218 return; 219 } 220 } 221 222 269 270 271 if (client) { 272 status = pj_stun_msg_check(pkt, pkt_size, flags); 273 274 if (status == PJ_SUCCESS) { 275 /* Received STUN message */ 276 status = pj_stun_session_on_rx_pkt(client->session, 277 (pj_uint8_t*)pkt, pkt_size, 278 flags, NULL, 279 src_addr, src_addr_len); 280 } else if (client->active_peer) { 281 /* Received non-STUN message and client has active destination */ 282 pj_ssize_t sz = pkt_size; 283 pj_ioqueue_sendto(client->key, &client->pkt_write_key, 284 pkt, &sz, 0, 285 &client->active_peer->addr, 286 sizeof(client->active_peer->addr)); 287 } else { 288 /* Received non-STUN message and client doesn't have active 289 * destination. 290 */ 291 /* Ignore */ 292 } 293 294 } else { 295 /* Received packet (could be STUN or no) from new source */ 296 flags |= PJ_STUN_CHECK_PACKET; 297 pj_stun_session_on_rx_pkt(tu->default_session, (pj_uint8_t*)pkt, 298 pkt_size, flags, NULL, 299 src_addr, src_addr_len); 300 } 301 } 302 303 304 /* 305 * This is a utility function provided by TU (Turn Usage) to reserve 306 * or allocate internal port/socket. The allocation needs to be 307 * coordinated to minimize bind() collissions. 308 */ 223 309 static pj_status_t tu_alloc_port(struct turn_usage *tu, 224 310 int type, … … 236 322 if (req_addr && req_addr->sin_port != 0) { 237 323 238 *err_code = PJ_STUN_S TATUS_INVALID_PORT;324 *err_code = PJ_STUN_SC_INVALID_PORT; 239 325 240 326 /* Allocate specific port */ … … 255 341 256 342 } else { 257 *err_code = PJ_STUN_STATUS_INSUFFICIENT_CAPACITY; 343 status = -1; 344 *err_code = PJ_STUN_SC_INSUFFICIENT_CAPACITY; 258 345 259 346 if (req_addr && req_addr->sin_addr.s_addr) { 260 *err_code = PJ_STUN_S TATUS_INVALID_IP_ADDR;347 *err_code = PJ_STUN_SC_INVALID_IP_ADDR; 261 348 pj_memcpy(&addr, req_addr, sizeof(pj_sockaddr_in)); 262 349 } else { … … 309 396 } 310 397 398 399 /* 400 * This callback is called by the TU's STUN session when it receives 401 * a valid STUN message. This is called from tu_on_rx_data above. 402 */ 403 static pj_status_t tu_sess_on_rx_request(pj_stun_session *sess, 404 const pj_uint8_t *pkt, 405 unsigned pkt_len, 406 const pj_stun_msg *msg, 407 const pj_sockaddr_t *src_addr, 408 unsigned src_addr_len) 409 { 410 struct session_data *sd; 411 struct turn_client *client; 412 pj_stun_tx_data *tdata; 413 pj_status_t status; 414 415 PJ_UNUSED_ARG(pkt); 416 PJ_UNUSED_ARG(pkt_len); 417 418 sd = (struct session_data*) pj_stun_session_get_user_data(sess); 419 420 pj_assert(sd->client == NULL); 421 422 if (msg->hdr.type != PJ_STUN_ALLOCATE_REQUEST) { 423 if (PJ_STUN_IS_REQUEST(msg->hdr.type)) { 424 status = pj_stun_session_create_response(sess, msg, 425 PJ_STUN_SC_NO_BINDING, 426 NULL, &tdata); 427 if (status==PJ_SUCCESS) { 428 status = pj_stun_session_send_msg(sess, PJ_FALSE, 429 src_addr, src_addr_len, 430 tdata); 431 } 432 } else { 433 PJ_LOG(4,(THIS_FILE, 434 "Received %s %s without matching Allocation, " 435 "ignored", pj_stun_get_method_name(msg->hdr.type), 436 pj_stun_get_class_name(msg->hdr.type))); 437 } 438 return PJ_SUCCESS; 439 } 440 441 status = client_create(sd->tu, src_addr, src_addr_len, &client); 442 if (status != PJ_SUCCESS) { 443 pj_stun_perror(THIS_FILE, "Error creating new TURN client", 444 status); 445 return status; 446 } 447 448 449 /* Hand over message to client */ 450 pj_mutex_lock(client->mutex); 451 status = client_handle_stun_msg(client, msg, src_addr, src_addr_len); 452 pj_mutex_unlock(client->mutex); 453 454 return status; 455 } 456 457 458 /* 459 * This callback is called by STUN session when it needs to send packet 460 * to the network. 461 */ 462 static pj_status_t tu_sess_on_send_msg(pj_stun_session *sess, 463 const void *pkt, 464 pj_size_t pkt_size, 465 const pj_sockaddr_t *dst_addr, 466 unsigned addr_len) 467 { 468 struct session_data *sd; 469 470 sd = (struct session_data*) pj_stun_session_get_user_data(sess); 471 472 if (sd->tu->type == PJ_SOCK_DGRAM) { 473 return pj_stun_usage_sendto(sd->tu->usage, pkt, pkt_size, 0, 474 dst_addr, addr_len); 475 } else { 476 return PJ_ENOTSUP; 477 } 478 } 479 480 311 481 /****************************************************************************/ 312 482 /* 483 * TURN client operations. 484 */ 485 486 /* Function prototypes */ 487 static pj_status_t client_create_relay(struct turn_client *client); 488 static pj_status_t client_destroy_relay(struct turn_client *client); 489 static void client_on_expired(pj_timer_heap_t *th, pj_timer_entry *e); 490 static void client_on_read_complete(pj_ioqueue_key_t *key, 491 pj_ioqueue_op_key_t *op_key, 492 pj_ssize_t bytes_read); 493 static pj_status_t client_respond(struct turn_client *client, 494 const pj_stun_msg *msg, 495 int err_code, 496 const char *err_msg, 497 const pj_sockaddr_t *dst_addr, 498 int dst_addr_len); 499 static struct peer* client_get_peer(struct turn_client *client, 500 const pj_sockaddr_in *peer_addr, 501 pj_uint32_t *hval); 502 static struct peer* client_add_peer(struct turn_client *client, 503 const pj_sockaddr_in *peer_addr, 504 pj_uint32_t hval); 505 506 507 /* 508 * This callback is called when incoming STUN message is received 509 * in the TURN usage. This is called from by tu_on_rx_data() when 510 * the packet is handed over to the client. 511 */ 512 static pj_status_t client_sess_on_rx_request(pj_stun_session *sess, 513 const pj_uint8_t *pkt, 514 unsigned pkt_len, 515 const pj_stun_msg *msg, 516 const pj_sockaddr_t *src_addr, 517 unsigned src_addr_len) 518 { 519 struct session_data *sd; 520 521 PJ_UNUSED_ARG(pkt); 522 PJ_UNUSED_ARG(pkt_len); 523 524 sd = (struct session_data*) pj_stun_session_get_user_data(sess); 525 pj_assert(sd->client != PJ_SUCCESS); 526 527 return client_handle_stun_msg(sd->client, msg, src_addr, src_addr_len); 528 } 529 530 531 /* 532 * This callback is called by client's STUN session to send outgoing 533 * STUN packet. It's called when client calls pj_stun_session_send_msg() 534 * function. 535 */ 536 static pj_status_t client_sess_on_send_msg(pj_stun_session *sess, 537 const void *pkt, 538 pj_size_t pkt_size, 539 const pj_sockaddr_t *dst_addr, 540 unsigned addr_len) 541 { 542 struct session_data *sd; 543 544 sd = (struct session_data*) pj_stun_session_get_user_data(sess); 545 546 if (sd->tu->type == PJ_SOCK_DGRAM) { 547 return pj_stun_usage_sendto(sd->tu->usage, pkt, pkt_size, 0, 548 dst_addr, addr_len); 549 } else { 550 return PJ_ENOTSUP; 551 } 552 } 553 554 555 /* 556 * Create a new TURN client for the specified source address. 557 */ 313 558 static pj_status_t client_create(struct turn_usage *tu, 314 const pj_stun_msg *msg,315 559 const pj_sockaddr_t *src_addr, 316 560 unsigned src_addr_len, … … 323 567 pj_status_t status; 324 568 325 PJ_UNUSED_ARG(msg);326 327 569 pool = pj_pool_create(tu->pf, "turnc%p", 4000, 4000, NULL); 328 570 client = PJ_POOL_ZALLOC_T(pool, struct turn_client); … … 331 573 client->sock = PJ_INVALID_SOCKET; 332 574 575 if (src_addr) { 576 const pj_sockaddr_in *a4 = (const pj_sockaddr_in *)src_addr; 577 pj_ansi_snprintf(client->obj_name, sizeof(client->obj_name), 578 "%s:%d", 579 pj_inet_ntoa(a4->sin_addr), 580 (int)pj_ntohs(a4->sin_port)); 581 client->obj_name[sizeof(client->obj_name)-1] = '\0'; 582 } 583 333 584 /* Create session */ 334 585 pj_bzero(&sess_cb, sizeof(sess_cb)); 335 sess_cb.on_send_msg = & sess_on_send_msg;336 sess_cb.on_rx_request = & sess_on_rx_request;586 sess_cb.on_send_msg = &client_sess_on_send_msg; 587 sess_cb.on_rx_request = &client_sess_on_rx_request; 337 588 status = pj_stun_session_create(tu->endpt, "turnc%p", &sess_cb, PJ_FALSE, 338 589 &client->session); … … 347 598 pj_stun_session_set_user_data(client->session, sd); 348 599 600 /* Mutex */ 601 status = pj_mutex_create_recursive(client->pool, pool->obj_name, 602 &client->mutex); 603 if (status != PJ_SUCCESS) { 604 client_destroy(client, status); 605 return status; 606 } 607 608 /* Create hash table */ 609 client->peer_htable = pj_hash_create(client->pool, MAX_PEER_PER_CLIENT); 610 if (client->peer_htable == NULL) { 611 client_destroy(client, status); 612 return PJ_ENOMEM; 613 } 614 615 /* Init timer entry */ 616 client->expiry_timer.user_data = client; 617 client->expiry_timer.cb = &client_on_expired; 618 client->expiry_timer.id = 0; 619 349 620 /* Register to hash table */ 621 pj_mutex_lock(tu->mutex); 350 622 pj_hash_set(pool, tu->client_htable, src_addr, src_addr_len, 0, client); 351 623 pj_mutex_unlock(tu->mutex); 624 625 /* Done */ 352 626 *p_client = client; 627 628 PJ_LOG(4,(THIS_FILE, "TURN client %s created", client->obj_name)); 629 353 630 return PJ_SUCCESS; 354 631 } 355 632 356 static pj_status_t client_destroy(struct turn_client *client) 357 { 358 } 359 360 static void client_on_read_complete(pj_ioqueue_key_t *key, 361 pj_ioqueue_op_key_t *op_key, 362 pj_ssize_t bytes_read) 363 { 364 } 365 366 static void client_on_write_complete(pj_ioqueue_key_t *key, 367 pj_ioqueue_op_key_t *op_key, 368 pj_ssize_t bytes_sent) 369 { 370 } 371 633 634 /* 635 * Destroy TURN client. 636 */ 637 static pj_status_t client_destroy(struct turn_client *client, 638 pj_status_t reason) 639 { 640 struct turn_usage *tu = client->tu; 641 char name[PJ_MAX_OBJ_NAME]; 642 643 pj_assert(sizeof(name)==sizeof(client->obj_name)); 644 pj_memcpy(name, client->obj_name, sizeof(name)); 645 646 /* Kill timer if it's active */ 647 if (client->expiry_timer.id != 0) { 648 pj_timer_heap_cancel(tu->timer_heap, &client->expiry_timer); 649 client->expiry_timer.id = PJ_FALSE; 650 } 651 652 /* Destroy relay */ 653 client_destroy_relay(client); 654 655 /* Unregister client from hash table */ 656 pj_mutex_lock(tu->mutex); 657 pj_hash_set(NULL, tu->client_htable, 658 &client->client_addr, sizeof(client->client_addr), 0, NULL); 659 pj_mutex_unlock(tu->mutex); 660 661 /* Destroy STUN session */ 662 if (client->session) { 663 pj_stun_session_destroy(client->session); 664 client->session = NULL; 665 } 666 667 /* Mutex */ 668 if (client->mutex) { 669 pj_mutex_destroy(client->mutex); 670 client->mutex = NULL; 671 } 672 673 /* Finally destroy pool */ 674 if (client->pool) { 675 pj_pool_t *pool = client->pool; 676 client->pool = NULL; 677 pj_pool_release(pool); 678 } 679 680 if (reason == PJ_SUCCESS) { 681 PJ_LOG(4,(THIS_FILE, "TURN client %s destroyed", name)); 682 } 683 684 return PJ_SUCCESS; 685 } 686 687 688 /* 689 * This utility function is used to setup relay (with ioqueue) after 690 * socket has been allocated for the TURN client. 691 */ 372 692 static pj_status_t client_create_relay(struct turn_client *client) 373 693 { … … 378 698 pj_bzero(&client_ioq_cb, sizeof(client_ioq_cb)); 379 699 client_ioq_cb.on_read_complete = &client_on_read_complete; 380 client_ioq_cb.on_write_complete = &client_on_write_complete;381 700 status = pj_ioqueue_register_sock(client->pool, client->tu->ioqueue, 382 701 client->sock, client, … … 388 707 } 389 708 390 pj_ioqueue_op_key_init(&client->read_key, sizeof(client->read_key)); 391 pj_ioqueue_op_key_init(&client->write_key, sizeof(client->write_key)); 709 pj_ioqueue_op_key_init(&client->pkt_read_key, 710 sizeof(client->pkt_read_key)); 711 pj_ioqueue_op_key_init(&client->pkt_write_key, 712 sizeof(client->pkt_write_key)); 392 713 393 714 /* Trigger the first read */ 394 client_on_read_complete(client->key, &client-> read_key, 0);715 client_on_read_complete(client->key, &client->pkt_read_key, 0); 395 716 396 717 return PJ_SUCCESS; 397 718 } 398 719 720 721 /* 722 * This utility function is used to destroy the port allocated for 723 * the TURN client. 724 */ 725 static pj_status_t client_destroy_relay(struct turn_client *client) 726 { 727 /* Close socket */ 728 if (client->key) { 729 pj_ioqueue_unregister(client->key); 730 client->key = NULL; 731 client->sock = PJ_INVALID_SOCKET; 732 } else if (client->sock && client->sock != PJ_INVALID_SOCKET) { 733 pj_sock_close(client->sock); 734 client->sock = PJ_INVALID_SOCKET; 735 } 736 737 PJ_LOG(4,(THIS_FILE, "TURN client %s: relay allocation %s:%d destroyed", 738 client->obj_name, 739 pj_inet_ntoa(client->client_addr.sin_addr), 740 (int)pj_ntohs(client->client_addr.sin_port))); 741 return PJ_SUCCESS; 742 } 743 744 745 /* 746 * From the source packet address, get the peer instance from hash table. 747 */ 748 static struct peer* client_get_peer(struct turn_client *client, 749 const pj_sockaddr_in *peer_addr, 750 pj_uint32_t *hval) 751 { 752 return (struct peer*) 753 pj_hash_get(client->peer_htable, peer_addr, sizeof(*peer_addr), hval); 754 } 755 756 757 /* 758 * Add a peer instance to the peer hash table. 759 */ 760 static struct peer* client_add_peer(struct turn_client *client, 761 const pj_sockaddr_in *peer_addr, 762 unsigned hval) 763 { 764 struct peer *peer; 765 766 peer = PJ_POOL_ZALLOC_T(client->pool, struct peer); 767 peer->client = client; 768 pj_memcpy(&peer->addr, peer_addr, sizeof(*peer_addr)); 769 770 pj_hash_set(client->pool, client->peer_htable, 771 peer_addr, sizeof(*peer_addr), hval, peer); 772 773 PJ_LOG(4,(THIS_FILE, "TURN client %s: peer %s:%s:%d added", 774 client->obj_name, "udp", pj_inet_ntoa(peer_addr->sin_addr), 775 (int)pj_ntohs(peer_addr->sin_port))); 776 777 return peer; 778 } 779 780 781 /* 782 * Utility to send STUN response message (normally to send error response). 783 */ 784 static pj_status_t client_respond(struct turn_client *client, 785 const pj_stun_msg *msg, 786 int err_code, 787 const char *custom_msg, 788 const pj_sockaddr_t *dst_addr, 789 int dst_addr_len) 790 { 791 pj_str_t err_msg; 792 pj_str_t *p_err_msg = NULL; 793 pj_stun_tx_data *response; 794 pj_status_t status; 795 796 if (custom_msg) 797 pj_cstr(&err_msg, custom_msg), p_err_msg = &err_msg; 798 799 status = pj_stun_session_create_response(client->session, msg, 800 err_code, p_err_msg, 801 &response); 802 if (status == PJ_SUCCESS) 803 status = pj_stun_session_send_msg(client->session, PJ_TRUE, 804 dst_addr, dst_addr_len, response); 805 806 return status; 807 } 808 809 810 /* 811 * Handle incoming initial or subsequent Allocate Request. 812 * This function is called by client_handle_stun_msg() below. 813 */ 399 814 static pj_status_t client_handle_allocate_req(struct turn_client *client, 400 const pj_uint8_t *pkt,401 unsigned pkt_len,402 815 const pj_stun_msg *msg, 403 816 const pj_sockaddr_t *src_addr, … … 412 825 pj_sockaddr_in req_addr; 413 826 int addr_len; 414 unsigned type;415 827 unsigned rpp_bits; 828 pj_time_val timeout; 416 829 pj_status_t status; 417 830 … … 432 845 /* Process BANDWIDTH attribute */ 433 846 if (a_bw && a_bw->value > client->tu->max_bw_kbps) { 434 status = pj_stun_session_create_response(client->session, msg, 435 PJ_STUN_STATUS_INSUFFICIENT_CAPACITY, 436 NULL, &response); 437 if (status == PJ_SUCCESS && response) { 438 pj_stun_session_send_msg(client->session, PJ_TRUE, 439 src_addr, src_addr_len, response); 440 } 441 return -1; 847 client_respond(client, msg, PJ_STUN_SC_INSUFFICIENT_CAPACITY, NULL, 848 src_addr, src_addr_len); 849 return PJ_SUCCESS; 442 850 } else if (a_bw) { 443 851 client->bw_kbps = a_bw->value; … … 448 856 /* Process REQUESTED-TRANSPORT attribute */ 449 857 if (a_rt && a_rt->value != 0) { 450 status = pj_stun_session_create_response(client->session, msg, 451 PJ_STUN_STATUS_UNSUPP_TRANSPORT_PROTO, 452 NULL, &response); 453 if (status == PJ_SUCCESS && response) { 454 pj_stun_session_send_msg(client->session, PJ_TRUE, 455 src_addr, src_addr_len, response); 456 } 457 return -1; 858 client_respond(client, msg, PJ_STUN_SC_UNSUPP_TRANSPORT_PROTO, NULL, 859 src_addr, src_addr_len); 860 return PJ_SUCCESS; 458 861 } else if (a_rt) { 459 type = a_rt->value ? PJ_SOCK_STREAM : PJ_SOCK_DGRAM;862 client->sock_type = a_rt->value ? PJ_SOCK_STREAM : PJ_SOCK_DGRAM; 460 863 } else { 461 type = client->tu->type;;864 client->sock_type = client->tu->type;; 462 865 } 463 866 464 867 /* Process REQUESTED-IP attribute */ 465 868 if (a_rip && a_rip->addr.addr.sa_family != PJ_AF_INET) { 466 status = pj_stun_session_create_response(client->session, msg, 467 PJ_STUN_STATUS_INVALID_IP_ADDR, 468 NULL, &response); 469 if (status == PJ_SUCCESS && response) { 470 pj_stun_session_send_msg(client->session, PJ_TRUE, 471 src_addr, src_addr_len, response); 472 } 473 return -1; 869 client_respond(client, msg, PJ_STUN_SC_INVALID_IP_ADDR, NULL, 870 src_addr, src_addr_len); 871 return PJ_SUCCESS; 474 872 475 873 } else if (a_rip) { … … 498 896 499 897 /* Allocate socket if we don't have one */ 500 if (client-> sock == PJ_INVALID_SOCKET) {898 if (client->key == NULL) { 501 899 int err_code; 502 900 503 status = tu_alloc_port(client->tu, type, rpp_bits, &req_addr, 504 &client->sock, &err_code); 901 PJ_LOG(4,(THIS_FILE, "TURN client %s: received initial Allocate " 902 "request, requested type:addr:port=%d:%s:%d, rpp " 903 "bits=%d", 904 client->obj_name, client->sock_type, 905 pj_inet_ntoa(req_addr.sin_addr), pj_ntohs(req_addr.sin_port), 906 rpp_bits)); 907 908 status = tu_alloc_port(client->tu, client->sock_type, rpp_bits, 909 &req_addr, &client->sock, &err_code); 505 910 if (status != PJ_SUCCESS) { 506 507 status = pj_stun_session_create_response(client->session, msg, 508 err_code, NULL, 509 &response); 510 if (status == PJ_SUCCESS && response) { 511 pj_stun_session_send_msg(client->session, PJ_TRUE, 512 src_addr, src_addr_len, response); 513 } 514 return -1; 911 char errmsg[PJ_ERR_MSG_SIZE]; 912 913 pj_strerror(status, errmsg, sizeof(errmsg)); 914 PJ_LOG(4,(THIS_FILE, "TURN client %s: error allocating relay port" 915 ": %s", 916 client->obj_name, errmsg)); 917 918 client_respond(client, msg, err_code, NULL, 919 src_addr, src_addr_len); 920 921 return status; 515 922 } 516 923 517 924 status = client_create_relay(client); 518 925 if (status != PJ_SUCCESS) { 519 status = pj_stun_session_create_response(client->session, msg, 520 PJ_STUN_STATUS_SERVER_ERROR, 521 NULL, &response); 522 if (status == PJ_SUCCESS && response) { 523 pj_stun_session_send_msg(client->session, PJ_TRUE, 524 src_addr, src_addr_len, response); 525 } 526 return -1; 926 client_respond(client, msg, PJ_STUN_SC_SERVER_ERROR, NULL, 927 src_addr, src_addr_len); 928 return status; 527 929 } 528 930 } else { 529 931 /* Otherwise check if the port parameter stays the same */ 530 932 /* TODO */ 531 } 933 PJ_LOG(4,(THIS_FILE, "TURN client %s: received Allocate refresh", 934 client->obj_name)); 935 } 936 937 /* Refresh timer */ 938 if (client->expiry_timer.id != PJ_FALSE) { 939 pj_timer_heap_cancel(client->tu->timer_heap, &client->expiry_timer); 940 client->expiry_timer.id = PJ_FALSE; 941 } 942 timeout.sec = client->lifetime; 943 timeout.msec = 0; 944 pj_timer_heap_schedule(client->tu->timer_heap, &client->expiry_timer, &timeout); 945 532 946 533 947 /* Done successfully, create and send success response */ … … 535 949 0, NULL, &response); 536 950 if (status != PJ_SUCCESS) { 537 return -1;951 return status; 538 952 } 539 953 … … 555 969 &req_addr, addr_len); 556 970 971 PJ_LOG(4,(THIS_FILE, "TURN client %s: relay allocated or refreshed, " 972 "internal address is %s:%d", 973 client->obj_name, 974 pj_inet_ntoa(req_addr.sin_addr), 975 (int)pj_ntohs(req_addr.sin_port))); 976 557 977 return pj_stun_session_send_msg(client->session, PJ_TRUE, 558 978 src_addr, src_addr_len, response); 559 979 } 560 980 981 982 /* 983 * client handling incoming STUN Set Active Destination request 984 * This function is called by client_handle_stun_msg() below. 985 */ 986 static pj_status_t client_handle_sad(struct turn_client *client, 987 const pj_stun_msg *msg, 988 const pj_sockaddr_t *src_addr, 989 unsigned src_addr_len) 990 { 991 pj_stun_remote_addr_attr *a_raddr; 992 993 a_raddr = (pj_stun_remote_addr_attr*) 994 pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_REMOTE_ADDR, 0); 995 if (!a_raddr) { 996 /* Remote active destination needs to be cleared */ 997 client->active_peer = NULL; 998 999 } else if (a_raddr->addr.addr.sa_family != PJ_AF_INET) { 1000 /* Bad request (not IPv4) */ 1001 client_respond(client, msg, PJ_STUN_SC_BAD_REQUEST, NULL, 1002 src_addr, src_addr_len); 1003 return PJ_SUCCESS; 1004 1005 } else if (client->active_peer) { 1006 /* Client tries to set new active destination without clearing 1007 * it first. Reject with 439. 1008 */ 1009 client_respond(client, msg, PJ_STUN_SC_TRANSITIONING, NULL, 1010 src_addr, src_addr_len); 1011 return PJ_SUCCESS; 1012 1013 } else { 1014 struct peer *peer; 1015 pj_uint32_t hval = 0; 1016 1017 /* Add a new peer/permission if we don't have one for this address */ 1018 peer = client_get_peer(client, &a_raddr->addr.ipv4, &hval); 1019 if (peer==NULL) { 1020 peer = client_add_peer(client, &a_raddr->addr.ipv4, hval); 1021 } 1022 1023 /* Set active destination */ 1024 client->active_peer = peer; 1025 } 1026 1027 PJ_LOG(4,(THIS_FILE, "TURN client %s: active destination set to %s:%d", 1028 client->obj_name, 1029 pj_inet_ntoa(client->active_peer->addr.sin_addr), 1030 (int)pj_ntohs(client->active_peer->addr.sin_port))); 1031 1032 /* Respond with successful response */ 1033 client_respond(client, msg, 0, NULL, src_addr, src_addr_len); 1034 1035 return PJ_SUCCESS; 1036 } 1037 1038 1039 /* 1040 * client handling incoming STUN Send Indication 1041 * This function is called by client_handle_stun_msg() below. 1042 */ 561 1043 static pj_status_t client_handle_send_ind(struct turn_client *client, 562 const pj_uint8_t *pkt, 563 unsigned pkt_len, 1044 const pj_stun_msg *msg) 1045 { 1046 pj_stun_remote_addr_attr *a_raddr; 1047 pj_stun_data_attr *a_data; 1048 pj_uint32_t hval = 0; 1049 const pj_uint8_t *data; 1050 pj_ssize_t datalen; 1051 1052 /* Get REMOTE-ADDRESS attribute */ 1053 a_raddr = (pj_stun_remote_addr_attr*) 1054 pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_REMOTE_ADDR, 0); 1055 if (!a_raddr) { 1056 /* REMOTE-ADDRESS not present, discard packet */ 1057 return PJ_SUCCESS; 1058 1059 } else if (a_raddr->addr.addr.sa_family != PJ_AF_INET) { 1060 /* REMOTE-ADDRESS present but not IPv4, discard packet */ 1061 return PJ_SUCCESS; 1062 1063 } 1064 1065 /* Get the DATA attribute */ 1066 a_data = (pj_stun_data_attr*) 1067 pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_DATA, 0); 1068 if (a_data) { 1069 data = (const pj_uint8_t *)a_data->data; 1070 datalen = a_data->length; 1071 1072 } else if (client->sock_type == PJ_SOCK_STREAM) { 1073 /* Discard if no Data and Allocation type is TCP */ 1074 return PJ_SUCCESS; 1075 1076 } else { 1077 data = (const pj_uint8_t *)""; 1078 datalen = 0; 1079 } 1080 1081 /* Add to peer table if necessary */ 1082 if (client_get_peer(client, &a_raddr->addr.ipv4, &hval)==NULL) 1083 client_add_peer(client, &a_raddr->addr.ipv4, hval); 1084 1085 /* Send the packet */ 1086 pj_ioqueue_sendto(client->key, &client->pkt_write_key, 1087 data, &datalen, 0, 1088 &a_raddr->addr.ipv4, sizeof(a_raddr->addr.ipv4)); 1089 1090 return PJ_SUCCESS; 1091 } 1092 1093 1094 /* 1095 * client handling unknown incoming STUN message. 1096 * This function is called by client_handle_stun_msg() below. 1097 */ 1098 static pj_status_t client_handle_unknown_msg(struct turn_client *client, 1099 const pj_stun_msg *msg, 1100 const pj_sockaddr_t *src_addr, 1101 unsigned src_addr_len) 1102 { 1103 PJ_LOG(4,(THIS_FILE, "TURN client %s: unhandled %s %s", 1104 client->obj_name, pj_stun_get_method_name(msg->hdr.type), 1105 pj_stun_get_class_name(msg->hdr.type))); 1106 1107 if (PJ_STUN_IS_REQUEST(msg->hdr.type)) { 1108 return client_respond(client, msg, PJ_STUN_SC_BAD_REQUEST, NULL, 1109 src_addr, src_addr_len); 1110 } else { 1111 /* Ignore */ 1112 return PJ_SUCCESS; 1113 } 1114 } 1115 1116 1117 /* 1118 * Main entry for handling STUN messages arriving on the main TURN port, 1119 * for this client 1120 */ 1121 static pj_status_t client_handle_stun_msg(struct turn_client *client, 564 1122 const pj_stun_msg *msg, 565 1123 const pj_sockaddr_t *src_addr, 566 1124 unsigned src_addr_len) 567 1125 { 568 }569 570 static pj_status_t client_handle_unknown_msg(struct turn_client *client,571 const pj_uint8_t *pkt,572 unsigned pkt_len,573 const pj_stun_msg *msg,574 const pj_sockaddr_t *src_addr,575 unsigned src_addr_len)576 {577 }578 579 static pj_status_t client_handle_stun_msg(struct turn_client *client,580 const pj_uint8_t *pkt,581 unsigned pkt_len,582 const pj_stun_msg *msg,583 const pj_sockaddr_t *src_addr,584 unsigned src_addr_len)585 {586 1126 pj_status_t status; 587 1127 588 1128 switch (msg->hdr.type) { 1129 case PJ_STUN_SEND_INDICATION: 1130 status = client_handle_send_ind(client, msg); 1131 1132 case PJ_STUN_SET_ACTIVE_DESTINATION_REQUEST: 1133 status = client_handle_sad(client, msg, 1134 src_addr, src_addr_len); 589 1135 case PJ_STUN_ALLOCATE_REQUEST: 590 return client_handle_allocate_req(client, pkt, pkt_len, msg, 591 src_addr, src_addr_len); 592 593 case PJ_STUN_SEND_INDICATION: 594 return client_handle_send_ind(client, pkt, pkt_len, msg, 595 src_addr, src_addr_len); 1136 status = client_handle_allocate_req(client, msg, 1137 src_addr, src_addr_len); 596 1138 597 1139 default: 598 return client_handle_unknown_msg(client, pkt, pkt_len, msg, 599 src_addr, src_addr_len); 600 } 601 } 602 603 604 static pj_status_t sess_on_rx_request(pj_stun_session *sess, 605 const pj_uint8_t *pkt, 606 unsigned pkt_len, 607 const pj_stun_msg *msg, 608 const pj_sockaddr_t *src_addr, 609 unsigned src_addr_len) 610 { 611 struct session_data *sd; 1140 status = client_handle_unknown_msg(client, msg, 1141 src_addr, src_addr_len); 1142 } 1143 1144 return status; 1145 } 1146 1147 1148 PJ_INLINE(pj_uint32_t) GET_VAL32(const pj_uint8_t *pdu, unsigned pos) 1149 { 1150 return (pdu[pos+0] << 24) + 1151 (pdu[pos+1] << 16) + 1152 (pdu[pos+2] << 8) + 1153 (pdu[pos+3]); 1154 } 1155 1156 1157 /* 1158 * Handle incoming data from peer 1159 * This function is called by client_on_read_complete() below. 1160 */ 1161 static void client_handle_peer_data(struct turn_client *client, 1162 unsigned bytes_read) 1163 { 1164 struct peer *peer; 1165 pj_bool_t has_magic_cookie; 1166 pj_status_t status; 1167 1168 /* Has the sender been registered as peer? */ 1169 peer = client_get_peer(client, &client->pkt_src_addr, NULL); 1170 if (peer == NULL) { 1171 /* Nope. Discard packet */ 1172 return; 1173 } 1174 1175 /* Check if packet has STUN magic cookie */ 1176 has_magic_cookie = (GET_VAL32(client->pkt, 4) == PJ_STUN_MAGIC); 1177 1178 /* If this is the Active Destination and the packet doesn't have 1179 * STUN magic cookie, send the packet to client as is. 1180 */ 1181 if (peer == client->active_peer && !has_magic_cookie) { 1182 pj_stun_usage_sendto(client->tu->usage, client->pkt, bytes_read, 0, 1183 &client->pkt_src_addr, client->pkt_src_addr_len); 1184 } else { 1185 /* Otherwise wrap in Data Indication */ 1186 pj_stun_tx_data *data_ind; 1187 1188 status = pj_stun_session_create_ind(client->session, 1189 PJ_STUN_DATA_INDICATION, 1190 &data_ind); 1191 if (status != PJ_SUCCESS) 1192 return; 1193 1194 pj_stun_msg_add_ip_addr_attr(data_ind->pool, data_ind->msg, 1195 PJ_STUN_ATTR_REMOTE_ADDR, PJ_FALSE, 1196 &client->pkt_src_addr, 1197 client->pkt_src_addr_len); 1198 pj_stun_msg_add_binary_attr(data_ind->pool, data_ind->msg, 1199 PJ_STUN_ATTR_DATA, 1200 client->pkt, bytes_read); 1201 1202 1203 pj_stun_session_send_msg(client->session, PJ_FALSE, 1204 &client->pkt_src_addr, 1205 client->pkt_src_addr_len, 1206 data_ind); 1207 } 1208 } 1209 1210 1211 /* 1212 * This callback is called by the ioqueue when read operation has 1213 * completed on the allocated relay port. 1214 */ 1215 static void client_on_read_complete(pj_ioqueue_key_t *key, 1216 pj_ioqueue_op_key_t *op_key, 1217 pj_ssize_t bytes_read) 1218 { 1219 enum { MAX_LOOP = 10 }; 612 1220 struct turn_client *client; 613 pj_stun_tx_data *tdata;1221 unsigned count; 614 1222 pj_status_t status; 615 1223 616 sd = (struct session_data*) pj_stun_session_get_user_data(sess); 617 618 if (sd->client == NULL) { 619 /* No client is associated with this source address. Create a new 620 * one if this is an Allocate request. 621 */ 622 if (msg->hdr.type != PJ_STUN_ALLOCATE_REQUEST) { 623 PJ_LOG(4,(THIS_FILE, "Received first packet not Allocate request")); 624 return PJ_SUCCESS; 1224 PJ_UNUSED_ARG(op_key); 1225 1226 client = pj_ioqueue_get_user_data(key); 1227 1228 /* Lock client */ 1229 pj_mutex_lock(client->mutex); 1230 1231 for (count=0; ; ++count) { 1232 unsigned flags; 1233 1234 if (bytes_read > 0) { 1235 /* Received data from peer! */ 1236 client_handle_peer_data(client, bytes_read); 1237 1238 } else if (bytes_read < 0) { 1239 char errmsg[PJ_ERR_MSG_SIZE]; 1240 pj_strerror(-bytes_read, errmsg, sizeof(errmsg)); 1241 PJ_LOG(4,(THIS_FILE, "TURN client %s: error reading data " 1242 "from allocated relay port: %s", 1243 client->obj_name, errmsg)); 625 1244 } 626 1245 627 PJ_TODO(SUPPORT_MOVE); 628 629 status = client_create(sd->tu, msg, src_addr, src_addr_len, &client); 630 if (status != PJ_SUCCESS) { 631 pj_stun_perror(THIS_FILE, "Error creating new TURN client", status); 632 return status; 633 } 634 635 } else { 636 client = sd->client; 637 } 638 639 return client_handle_stun_msg(client, pkt, pkt_len, msg, 640 src_addr, src_addr_len); 641 } 642 643 static pj_status_t sess_on_send_msg(pj_stun_session *sess, 644 const void *pkt, 645 pj_size_t pkt_size, 646 const pj_sockaddr_t *dst_addr, 647 unsigned addr_len) 648 { 649 struct session_data *sd; 650 651 sd = (struct session_data*) pj_stun_session_get_user_data(sess); 652 653 if (sd->tu->type == PJ_SOCK_DGRAM) { 654 return pj_stun_usage_sendto(sd->tu->usage, pkt, pkt_size, 0, 655 dst_addr, addr_len); 656 } else { 657 return PJ_ENOTSUP; 658 } 659 } 660 661 662 1246 bytes_read = sizeof(client->pkt); 1247 flags = (count >= MAX_LOOP) ? PJ_IOQUEUE_ALWAYS_ASYNC : 0; 1248 client->pkt_src_addr_len = sizeof(client->pkt_src_addr); 1249 status = pj_ioqueue_recvfrom(client->key, 1250 &client->pkt_read_key, 1251 client->pkt, &bytes_read, flags, 1252 &client->pkt_src_addr, 1253 &client->pkt_src_addr_len); 1254 if (status == PJ_EPENDING) 1255 break; 1256 } 1257 1258 /* Unlock client */ 1259 pj_mutex_unlock(client->mutex); 1260 } 1261 1262 1263 /* On Allocation timer timeout (i.e. we don't receive new Allocate request 1264 * to refresh the allocation in time) 1265 */ 1266 static void client_on_expired(pj_timer_heap_t *th, pj_timer_entry *e) 1267 { 1268 struct turn_client *client; 1269 1270 PJ_UNUSED_ARG(th); 1271 1272 client = (struct turn_client*) e->user_data; 1273 1274 PJ_LOG(4,(THIS_FILE, "TURN client %s: allocation timer timeout, " 1275 "destroying client", 1276 client->obj_name)); 1277 client_destroy(client, PJ_SUCCESS); 1278 } 1279
Note: See TracChangeset
for help on using the changeset viewer.