Changeset 1988 for pjproject/trunk/pjnath/src/pjnath/turn_sock.c
- Timestamp:
- Jun 6, 2008 2:47:10 PM (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjnath/src/pjnath/turn_sock.c
r1914 r1988 18 18 */ 19 19 #include <pjnath/turn_sock.h> 20 #include <pj/activesock.h> 20 21 #include <pj/assert.h> 21 22 #include <pj/errno.h> … … 51 52 int af; 52 53 pj_turn_tp_type conn_type; 53 pj_sock_t sock; 54 pj_ioqueue_key_t *key; 55 pj_ioqueue_op_key_t read_key; 54 pj_activesock_t *active_sock; 56 55 pj_ioqueue_op_key_t send_key; 57 pj_uint8_t pkt[PJ_TURN_MAX_PKT_LEN];58 56 }; 59 57 … … 72 70 unsigned ch_num); 73 71 static void turn_on_rx_data(pj_turn_session *sess, 74 const pj_uint8_t*pkt,72 void *pkt, 75 73 unsigned pkt_len, 76 74 const pj_sockaddr_t *peer_addr, … … 79 77 pj_turn_state_t old_state, 80 78 pj_turn_state_t new_state); 81 static void on_read_complete(pj_ioqueue_key_t *key, 82 pj_ioqueue_op_key_t *op_key, 83 pj_ssize_t bytes_read); 84 static void on_connect_complete(pj_ioqueue_key_t *key, 85 pj_status_t status); 79 80 static pj_bool_t on_data_read(pj_activesock_t *asock, 81 void *data, 82 pj_size_t size, 83 pj_status_t status, 84 pj_size_t *remainder); 85 static pj_bool_t on_connect_complete(pj_activesock_t *asock, 86 pj_status_t status); 87 86 88 87 89 … … 159 161 sess_cb.on_state = &turn_on_state; 160 162 status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type, 161 &sess_cb, turn_sock, 0, &turn_sock->sess);163 &sess_cb, 0, turn_sock, &turn_sock->sess); 162 164 if (status != PJ_SUCCESS) { 163 165 destroy(turn_sock); … … 188 190 } 189 191 190 if (turn_sock->key) { 191 pj_ioqueue_unregister(turn_sock->key); 192 turn_sock->key = NULL; 193 turn_sock->sock = 0; 194 } else if (turn_sock->sock) { 195 pj_sock_close(turn_sock->sock); 196 turn_sock->sock = 0; 192 if (turn_sock->active_sock) { 193 pj_activesock_close(turn_sock->active_sock); 194 turn_sock->active_sock = NULL; 197 195 } 198 196 … … 272 270 { 273 271 show_err(turn_sock, title, status); 274 pj_turn_session_destroy(turn_sock->sess); 272 if (turn_sock->sess) 273 pj_turn_session_destroy(turn_sock->sess); 275 274 } 276 275 … … 281 280 void *user_data) 282 281 { 282 PJ_ASSERT_RETURN(turn_sock, PJ_EINVAL); 283 283 turn_sock->user_data = user_data; 284 284 return PJ_SUCCESS; … … 290 290 PJ_DEF(void*) pj_turn_sock_get_user_data(pj_turn_sock *turn_sock) 291 291 { 292 PJ_ASSERT_RETURN(turn_sock, NULL); 292 293 return turn_sock->user_data; 293 294 } … … 297 298 */ 298 299 PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock, 299 pj_turn_session_info *info)300 pj_turn_session_info *info) 300 301 { 301 302 PJ_ASSERT_RETURN(turn_sock && info, PJ_EINVAL); … … 310 311 } 311 312 313 /** 314 * Lock the TURN socket. Application may need to call this function to 315 * synchronize access to other objects to avoid deadlock. 316 */ 317 PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) 318 { 319 return pj_lock_acquire(turn_sock->lock); 320 } 321 322 /** 323 * Unlock the TURN socket. 324 */ 325 PJ_DEF(pj_status_t) pj_turn_sock_unlock(pj_turn_sock *turn_sock) 326 { 327 return pj_lock_release(turn_sock->lock); 328 } 329 330 /* 331 * Set STUN message logging for this TURN session. 332 */ 333 PJ_DEF(void) pj_turn_sock_set_log( pj_turn_sock *turn_sock, 334 unsigned flags) 335 { 336 pj_turn_session_set_log(turn_sock->sess, flags); 337 } 338 312 339 /* 313 340 * Initialize. 314 341 */ 315 PJ_DEF(pj_status_t) pj_turn_sock_ init(pj_turn_sock *turn_sock,316 const pj_str_t *domain,317 int default_port,318 pj_dns_resolver *resolver,319 const pj_stun_auth_cred *cred,320 const pj_turn_alloc_param *param)342 PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, 343 const pj_str_t *domain, 344 int default_port, 345 pj_dns_resolver *resolver, 346 const pj_stun_auth_cred *cred, 347 const pj_turn_alloc_param *param) 321 348 { 322 349 pj_status_t status; … … 393 420 * Notification when outgoing TCP socket has been connected. 394 421 */ 395 static void on_connect_complete(pj_ioqueue_key_t *key,396 422 static pj_bool_t on_connect_complete(pj_activesock_t *asock, 423 pj_status_t status) 397 424 { 398 425 pj_turn_sock *turn_sock; 399 426 400 turn_sock = (pj_turn_sock*) pj_ ioqueue_get_user_data(key);427 turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); 401 428 402 429 if (status != PJ_SUCCESS) { 403 430 sess_fail(turn_sock, "TCP connect() error", status); 404 return ;431 return PJ_FALSE; 405 432 } 406 433 … … 410 437 411 438 /* Kick start pending read operation */ 412 pj_ioqueue_op_key_init(&turn_sock->read_key, sizeof(turn_sock->read_key));413 on_read_complete(turn_sock->key, &turn_sock->read_key, INIT);439 status = pj_activesock_start_read(asock, turn_sock->pool, 440 PJ_TURN_MAX_PKT_LEN, 0); 414 441 415 442 /* Init send_key */ … … 420 447 if (status != PJ_SUCCESS) { 421 448 sess_fail(turn_sock, "Error sending ALLOCATE", status); 422 return; 423 } 449 return PJ_FALSE; 450 } 451 452 return PJ_TRUE; 424 453 } 425 454 … … 427 456 * Notification from ioqueue when incoming UDP packet is received. 428 457 */ 429 static void on_read_complete(pj_ioqueue_key_t *key, 430 pj_ioqueue_op_key_t *op_key, 431 pj_ssize_t bytes_read) 432 { 433 enum { MAX_RETRY = 10 }; 458 static pj_bool_t on_data_read(pj_activesock_t *asock, 459 void *data, 460 pj_size_t size, 461 pj_status_t status, 462 pj_size_t *remainder) 463 { 434 464 pj_turn_sock *turn_sock; 435 int retry = 0; 436 pj_status_t status; 437 438 turn_sock = (pj_turn_sock*) pj_ioqueue_get_user_data(key); 465 pj_bool_t ret = PJ_TRUE; 466 467 turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); 439 468 pj_lock_acquire(turn_sock->lock); 440 469 441 do { 442 if (bytes_read == INIT) { 443 /* Special instruction to initialize pending read() */ 444 } else if (bytes_read > 0 && turn_sock->sess) { 445 /* Report incoming packet to TURN session */ 446 pj_turn_session_on_rx_pkt(turn_sock->sess, turn_sock->pkt, 447 bytes_read, 448 turn_sock->conn_type == PJ_TURN_TP_UDP); 449 } else if (bytes_read <= 0 && turn_sock->conn_type != PJ_TURN_TP_UDP) { 450 sess_fail(turn_sock, "TCP connection closed", -bytes_read); 451 goto on_return; 452 } 453 454 /* Read next packet */ 455 bytes_read = sizeof(turn_sock->pkt); 456 status = pj_ioqueue_recv(turn_sock->key, op_key, 457 turn_sock->pkt, &bytes_read, 0); 458 459 if (status != PJ_EPENDING && status != PJ_SUCCESS) { 460 char errmsg[PJ_ERR_MSG_SIZE]; 461 462 pj_strerror(status, errmsg, sizeof(errmsg)); 463 sess_fail(turn_sock, "Socket recv() error", status); 464 goto on_return; 465 } 466 467 } while (status != PJ_EPENDING && status != PJ_ECANCELLED && 468 ++retry < MAX_RETRY); 470 if (status == PJ_SUCCESS && turn_sock->sess) { 471 /* Report incoming packet to TURN session */ 472 PJ_TODO(REPORT_PARSED_LEN); 473 pj_turn_session_on_rx_pkt(turn_sock->sess, data, size); 474 } else if (status != PJ_SUCCESS && 475 turn_sock->conn_type != PJ_TURN_TP_UDP) 476 { 477 sess_fail(turn_sock, "TCP connection closed", status); 478 ret = PJ_FALSE; 479 goto on_return; 480 } 469 481 470 482 on_return: 471 483 pj_lock_release(turn_sock->lock); 484 485 return ret; 472 486 } 473 487 … … 483 497 { 484 498 pj_turn_sock *turn_sock = (pj_turn_sock*) 485 pj_turn_session_get_user_data(sess);499 pj_turn_session_get_user_data(sess); 486 500 pj_ssize_t len = pkt_len; 487 501 pj_status_t status; … … 496 510 PJ_UNUSED_ARG(dst_addr_len); 497 511 498 status = pj_ ioqueue_send(turn_sock->key, &turn_sock->send_key,499 512 status = pj_activesock_send(turn_sock->active_sock, &turn_sock->send_key, 513 pkt, &len, 0); 500 514 if (status != PJ_SUCCESS && status != PJ_EPENDING) { 501 515 show_err(turn_sock, "socket send()", status); … … 525 539 */ 526 540 static void turn_on_rx_data(pj_turn_session *sess, 527 const pj_uint8_t*pkt,541 void *pkt, 528 542 unsigned pkt_len, 529 543 const pj_sockaddr_t *peer_addr, … … 560 574 } 561 575 562 if (new_state == PJ_TURN_STATE_RESOLVED) { 576 /* Notify app first */ 577 if (turn_sock->cb.on_state) { 578 (*turn_sock->cb.on_state)(turn_sock, old_state, new_state); 579 } 580 581 /* Make sure user hasn't destroyed us in the callback */ 582 if (turn_sock->sess && new_state == PJ_TURN_STATE_RESOLVED) { 583 pj_turn_session_info info; 584 pj_turn_session_get_info(turn_sock->sess, &info); 585 new_state = info.state; 586 } 587 588 if (turn_sock->sess && new_state == PJ_TURN_STATE_RESOLVED) { 563 589 /* 564 590 * Once server has been resolved, initiate outgoing TCP … … 568 594 char addrtxt[PJ_INET6_ADDRSTRLEN+8]; 569 595 int sock_type; 570 pj_ioqueue_callback ioq_cb; 596 pj_sock_t sock; 597 pj_activesock_cb asock_cb; 571 598 572 599 /* Close existing connection, if any. This happens when … … 574 601 * connection or ALLOCATE request failed. 575 602 */ 576 if (turn_sock->key) { 577 pj_ioqueue_unregister(turn_sock->key); 578 turn_sock->key = NULL; 579 turn_sock->sock = 0; 580 } else if (turn_sock->sock) { 581 pj_sock_close(turn_sock->sock); 582 turn_sock->sock = 0; 603 if (turn_sock->active_sock) { 604 pj_activesock_close(turn_sock->active_sock); 605 turn_sock->active_sock = NULL; 583 606 } 584 607 … … 592 615 593 616 /* Init socket */ 594 status = pj_sock_socket(turn_sock->af, sock_type, 0, 595 &turn_sock->sock); 617 status = pj_sock_socket(turn_sock->af, sock_type, 0, &sock); 596 618 if (status != PJ_SUCCESS) { 597 619 pj_turn_sock_destroy(turn_sock); … … 599 621 } 600 622 601 /* Register to ioqeuue */ 602 pj_bzero(&ioq_cb, sizeof(ioq_cb)); 603 ioq_cb.on_read_complete = &on_read_complete; 604 ioq_cb.on_connect_complete = &on_connect_complete; 605 status = pj_ioqueue_register_sock(turn_sock->pool, turn_sock->cfg.ioqueue, 606 turn_sock->sock, turn_sock, 607 &ioq_cb, &turn_sock->key); 623 /* Create active socket */ 624 pj_bzero(&asock_cb, sizeof(asock_cb)); 625 asock_cb.on_data_read = &on_data_read; 626 asock_cb.on_connect_complete = &on_connect_complete; 627 status = pj_activesock_create(turn_sock->pool, sock, 628 sock_type, NULL, 629 turn_sock->cfg.ioqueue, &asock_cb, 630 turn_sock, 631 &turn_sock->active_sock); 608 632 if (status != PJ_SUCCESS) { 609 633 pj_turn_sock_destroy(turn_sock); … … 617 641 618 642 /* Initiate non-blocking connect */ 619 status = pj_ioqueue_connect(turn_sock->key, &info.server, 620 pj_sockaddr_get_len(&info.server)); 643 status=pj_activesock_start_connect(turn_sock->active_sock, 644 turn_sock->pool, 645 &info.server, 646 pj_sockaddr_get_len(&info.server)); 621 647 if (status == PJ_SUCCESS) { 622 on_connect_complete(turn_sock-> key, PJ_SUCCESS);648 on_connect_complete(turn_sock->active_sock, PJ_SUCCESS); 623 649 } else if (status != PJ_EPENDING) { 624 650 pj_turn_sock_destroy(turn_sock); … … 631 657 } 632 658 633 if (turn_sock->cb.on_state) {634 (*turn_sock->cb.on_state)(turn_sock, old_state, new_state);635 }636 637 659 if (new_state >= PJ_TURN_STATE_DESTROYING && turn_sock->sess) { 638 660 pj_time_val delay = {0, 0};
Note: See TracChangeset
for help on using the changeset viewer.