Changeset 554
- Timestamp:
- Jun 26, 2006 10:05:37 AM (18 years ago)
- Location:
- pjproject/trunk/pjsip
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjsip/build/pjsip_core.dsp
r550 r554 148 148 149 149 SOURCE=..\src\pjsip\sip_transport_tcp.c 150 # PROP Exclude_From_Build 1151 150 # End Source File 152 151 # Begin Source File -
pjproject/trunk/pjsip/src/pjsip/sip_transport_tcp.c
r550 r554 19 19 #include <pjsip/sip_transport_tcp.h> 20 20 #include <pjsip/sip_endpoint.h> 21 #include <pjsip/sip_errno.h> 22 #include <pj/compat/socket.h> 21 23 #include <pj/assert.h> 22 #include <pj/errno.h>23 24 #include <pj/ioqueue.h> 24 25 #include <pj/lock.h> 26 #include <pj/log.h> 27 #include <pj/os.h> 25 28 #include <pj/pool.h> 26 29 #include <pj/string.h> 27 30 28 31 32 #define THIS_FILE "sip_transport_tcp.c" 33 29 34 #define MAX_ASYNC_CNT 16 30 #define POOL_INIT 4000 31 #define POOL_INC 4000 35 #define POOL_LIS_INIT 4000 36 #define POOL_LIS_INC 4000 37 #define POOL_TP_INIT 4000 38 #define POOL_TP_INC 4000 32 39 33 40 34 41 struct tcp_listener; 42 struct tcp_transport; 43 35 44 36 45 struct pending_accept 37 46 { 38 pj_ioqueue_op_key_t op_key;39 struct tcp_listener *listener;40 pj_sock_t new_sock;41 int addr_len;42 pj_sockaddr_in local_addr;43 pj_sockaddr_in remote_addr;47 pj_ioqueue_op_key_t op_key; 48 struct tcp_listener *listener; 49 pj_sock_t new_sock; 50 int addr_len; 51 pj_sockaddr_in local_addr; 52 pj_sockaddr_in remote_addr; 44 53 }; 45 54 55 struct pending_connect 56 { 57 pj_ioqueue_op_key_t op_key; 58 struct tcp_transport *transport; 59 }; 60 61 46 62 struct tcp_listener 47 63 { 48 64 pjsip_tpfactory factory; 65 char name[PJ_MAX_OBJ_NAME]; 49 66 pj_bool_t active; 67 pjsip_endpoint *endpt; 50 68 pjsip_tpmgr *tpmgr; 51 69 pj_sock_t sock; … … 56 74 57 75 76 struct pending_tdata 77 { 78 PJ_DECL_LIST_MEMBER(struct pending_tdata); 79 pjsip_tx_data_op_key *tdata_op_key; 80 }; 81 82 58 83 struct tcp_transport 59 84 { 60 85 pjsip_transport base; 86 struct tcp_listener *listener; 87 pj_bool_t is_registered; 88 pj_bool_t is_closing; 61 89 pj_sock_t sock; 90 pj_ioqueue_key_t *key; 91 pj_bool_t has_pending_connect; 92 struct pending_connect connect_op; 93 94 95 /* TCP transport can only have one rdata! 96 * Otherwise chunks of incoming PDU may be received on different 97 * buffer. 98 */ 99 pjsip_rx_data rdata; 100 101 /* Pending transmission list. */ 102 struct pending_tdata tx_list; 62 103 }; 63 104 … … 71 112 pj_status_t status); 72 113 73 static pj_status_t destroy_listener(struct tcp_listener *listener); 74 75 static pj_status_t create_transport(pjsip_tpfactory *factory, 76 pjsip_tpmgr *mgr, 77 pjsip_endpoint *endpt, 78 const pj_sockaddr *rem_addr, 79 int addr_len, 80 pjsip_transport **transport); 114 static pj_status_t lis_destroy(struct tcp_listener *listener); 115 static pj_status_t lis_create_transport(pjsip_tpfactory *factory, 116 pjsip_tpmgr *mgr, 117 pjsip_endpoint *endpt, 118 const pj_sockaddr *rem_addr, 119 int addr_len, 120 pjsip_transport **transport); 121 122 123 static pj_status_t create_tcp_transport(struct tcp_listener *listener, 124 pj_sock_t sock, 125 const pj_sockaddr_in *local, 126 const pj_sockaddr_in *remote, 127 struct tcp_transport **p_tcp); 128 129 130 static void tcp_perror(const char *sender, const char *title, 131 pj_status_t status) 132 { 133 char errmsg[PJ_ERR_MSG_SIZE]; 134 135 pj_strerror(status, errmsg, sizeof(errmsg)); 136 137 PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status)); 138 } 139 81 140 82 141 PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, … … 94 153 95 154 96 pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_INIT, POOL_INC); 155 pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT, 156 POOL_LIS_INC); 97 157 PJ_ASSERT_RETURN(pool, PJ_ENOMEM); 98 158 99 159 100 160 listener = pj_pool_zalloc(pool, sizeof(struct tcp_listener)); 161 pj_ansi_sprintf(listener->name, "tcp:%d", (int)pj_ntohs(local->sin_port)); 101 162 listener->factory.pool = pool; 102 163 listener->factory.type = PJSIP_TRANSPORT_TCP; … … 146 207 &listener->accept_op[i].remote_addr, 147 208 &listener->accept_op[i].addr_len); 148 if (status != PJ_SUCCESS )209 if (status != PJ_SUCCESS && status != PJ_EPENDING) 149 210 goto on_error; 150 211 } 151 212 152 213 /* Register to transport manager */ 214 listener->endpt = endpt; 153 215 listener->tpmgr = pjsip_endpt_get_tpmgr(endpt); 216 listener->factory.create_transport = lis_create_transport; 154 217 status = pjsip_tpmgr_register_tpfactory(listener->tpmgr, 155 218 &listener->factory); … … 160 223 listener->active = PJ_TRUE; 161 224 225 PJ_LOG(4,(listener->name, 226 "SIP TCP transport listening for incoming connections at %s:%d", 227 pj_inet_ntoa(local->sin_addr), (int)pj_ntohs(local->sin_port))); 228 162 229 return PJ_SUCCESS; 163 230 164 231 on_error: 165 destroy_listener(listener);232 lis_destroy(listener); 166 233 return status; 167 234 } 168 235 169 236 170 171 172 static pj_status_t destroy_listener(struct tcp_listener *listener) 237 static pj_status_t lis_destroy(struct tcp_listener *listener) 173 238 { 174 239 if (listener->active) { … … 194 259 195 260 if (listener->factory.pool) { 261 PJ_LOG(4,(listener->name, "SIP TCP transport destroyed")); 196 262 pj_pool_release(listener->factory.pool); 197 263 listener->factory.pool = NULL; … … 202 268 203 269 270 /***************************************************************************/ 271 /* 272 * TCP Transport 273 */ 274 275 /* 276 * Prototypes. 277 */ 278 /* Called by transport manager to send message */ 279 static pj_status_t tcp_send_msg(pjsip_transport *transport, 280 pjsip_tx_data *tdata, 281 const pj_sockaddr_t *rem_addr, 282 int addr_len, 283 void *token, 284 void (*callback)(pjsip_transport *transport, 285 void *token, 286 pj_ssize_t sent_bytes)); 287 288 /* Called by transport manager to shutdown */ 289 static pj_status_t tcp_shutdown(pjsip_transport *transport); 290 291 /* Called by transport manager to destroy */ 292 static pj_status_t tcp_destroy(pjsip_transport *transport); 293 294 /* Callback from ioqueue on incoming packet */ 295 static void on_read_complete(pj_ioqueue_key_t *key, 296 pj_ioqueue_op_key_t *op_key, 297 pj_ssize_t bytes_read); 298 299 /* Callback from ioqueue when packet is sent */ 300 static void on_write_complete(pj_ioqueue_key_t *key, 301 pj_ioqueue_op_key_t *op_key, 302 pj_ssize_t bytes_sent); 303 304 /* Callback from ioqueue when connect completes */ 305 static void on_connect_complete(pj_ioqueue_key_t *key, 306 pj_status_t status); 307 308 309 static void sockaddr_to_host_port( pj_pool_t *pool, 310 pjsip_host_port *host_port, 311 const pj_sockaddr_in *addr ) 312 { 313 host_port->host.ptr = pj_pool_alloc(pool, 48); 314 host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s", 315 pj_inet_ntoa(addr->sin_addr)); 316 host_port->port = pj_ntohs(addr->sin_port); 317 } 318 319 320 /* 321 * Utilities to create TCP transport. 322 */ 323 static pj_status_t create_tcp_transport(struct tcp_listener *listener, 324 pj_sock_t sock, 325 const pj_sockaddr_in *local, 326 const pj_sockaddr_in *remote, 327 struct tcp_transport **p_tcp) 328 { 329 struct tcp_transport *tcp; 330 pj_pool_t *pool; 331 pj_ioqueue_t *ioqueue; 332 pj_ioqueue_callback tcp_callback; 333 pj_status_t status; 334 335 pool = pjsip_endpt_create_pool(listener->endpt, "tcp", 336 POOL_TP_INIT, POOL_TP_INC); 337 338 /* 339 * Create and initialize basic transport structure. 340 */ 341 tcp = pj_pool_zalloc(pool, sizeof(*tcp)); 342 tcp->sock = sock; 343 tcp->listener = listener; 344 pj_list_init(&tcp->tx_list); 345 346 347 pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, "tcp%p", tcp); 348 tcp->base.pool = pool; 349 350 status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt); 351 if (status != PJ_SUCCESS) 352 goto on_error; 353 354 status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock); 355 if (status != PJ_SUCCESS) 356 goto on_error; 357 358 tcp->base.key.type = PJSIP_TRANSPORT_TCP; 359 pj_memcpy(&tcp->base.key.rem_addr, remote, sizeof(pj_sockaddr_in)); 360 tcp->base.type_name = "tcp"; 361 tcp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); 362 363 tcp->base.info = pj_pool_alloc(pool, 64); 364 pj_ansi_snprintf(tcp->base.info, 64, "TCP to %s:%d", 365 pj_inet_ntoa(remote->sin_addr), 366 (int)pj_ntohs(remote->sin_port)); 367 368 tcp->base.addr_len = sizeof(pj_sockaddr_in); 369 pj_memcpy(&tcp->base.local_addr, local, sizeof(pj_sockaddr_in)); 370 sockaddr_to_host_port(pool, &tcp->base.local_name, local); 371 sockaddr_to_host_port(pool, &tcp->base.remote_name, remote); 372 373 tcp->base.endpt = listener->endpt; 374 tcp->base.tpmgr = listener->tpmgr; 375 tcp->base.send_msg = &tcp_send_msg; 376 tcp->base.do_shutdown = &tcp_shutdown; 377 tcp->base.destroy = &tcp_destroy; 378 379 380 /* Register socket to ioqueue */ 381 pj_memset(&tcp_callback, 0, sizeof(pj_ioqueue_callback)); 382 tcp_callback.on_read_complete = &on_read_complete; 383 tcp_callback.on_write_complete = &on_write_complete; 384 tcp_callback.on_connect_complete = &on_connect_complete; 385 386 ioqueue = pjsip_endpt_get_ioqueue(listener->endpt); 387 status = pj_ioqueue_register_sock(pool, ioqueue, sock, 388 tcp, &tcp_callback, &tcp->key); 389 if (status != PJ_SUCCESS) 390 goto on_error; 391 392 /* Register transport to transport manager */ 393 status = pjsip_transport_register(listener->tpmgr, &tcp->base); 394 if (status != PJ_SUCCESS) 395 goto on_error; 396 397 tcp->is_registered = PJ_TRUE; 398 399 /* Done setting up basic transport. */ 400 *p_tcp = tcp; 401 402 on_error: 403 tcp_destroy(&tcp->base); 404 return status; 405 } 406 407 408 /* Flush all pending send operations */ 409 static tcp_flush_pending_tx(struct tcp_transport *tcp) 410 { 411 pj_lock_acquire(tcp->base.lock); 412 while (!pj_list_empty(&tcp->tx_list)) { 413 struct pending_tdata *pending_tx; 414 pjsip_tx_data *tdata; 415 pj_ioqueue_op_key_t *op_key; 416 pj_ssize_t size; 417 pj_status_t status; 418 419 pending_tx = tcp->tx_list.next; 420 pj_list_erase(pending_tx); 421 422 tdata = pending_tx->tdata_op_key->tdata; 423 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 424 425 /* send to ioqueue! */ 426 size = tdata->buf.cur - tdata->buf.start; 427 status = pj_ioqueue_send(tcp->key, op_key, 428 tdata->buf.start, &size, 0); 429 430 if (status != PJ_EPENDING) { 431 on_write_complete(tcp->key, op_key, size); 432 } 433 434 } 435 pj_lock_release(tcp->base.lock); 436 } 437 438 439 440 /* Destroy TCP transport */ 441 static pj_status_t tcp_destroy(pjsip_transport *transport) 442 { 443 struct tcp_transport *tcp = (struct tcp_transport*)transport; 444 445 /* Cancel all pending transmits */ 446 while (!pj_list_empty(&tcp->tx_list)) { 447 struct pending_tdata *pending_tx; 448 pj_ioqueue_op_key_t *op_key; 449 450 pending_tx = tcp->tx_list.next; 451 pj_list_erase(pending_tx); 452 453 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 454 455 on_write_complete(tcp->key, op_key, 456 -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN)); 457 } 458 459 if (tcp->is_registered) { 460 pjsip_transport_destroy(transport); 461 tcp->is_registered = PJ_FALSE; 462 } 463 464 if (tcp->rdata.tp_info.pool) { 465 pj_pool_release(tcp->rdata.tp_info.pool); 466 tcp->rdata.tp_info.pool = NULL; 467 } 468 469 if (tcp->key) { 470 pj_ioqueue_unregister(tcp->key); 471 tcp->key = NULL; 472 } 473 474 if (tcp->base.lock) { 475 pj_lock_destroy(tcp->base.lock); 476 tcp->base.lock = NULL; 477 } 478 479 if (tcp->base.ref_cnt) { 480 pj_atomic_destroy(tcp->base.ref_cnt); 481 tcp->base.ref_cnt = NULL; 482 } 483 484 if (tcp->base.pool) { 485 PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed")); 486 pj_pool_release(tcp->base.pool); 487 tcp->base.pool = NULL; 488 } 489 490 return PJ_SUCCESS; 491 } 492 493 494 /* 495 * This utility function creates receive data buffers and start 496 * asynchronous recv() operations from the socket. 497 */ 498 static pj_status_t tcp_start_read(struct tcp_transport *tcp) 499 { 500 pj_pool_t *pool; 501 pj_ssize_t size; 502 pj_sockaddr_in *rem_addr; 503 pj_status_t status; 504 505 /* Init rdata */ 506 pool = pjsip_endpt_create_pool(tcp->listener->endpt, 507 "rtd%p", 508 PJSIP_POOL_RDATA_LEN, 509 PJSIP_POOL_RDATA_INC); 510 if (!pool) { 511 tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM); 512 return PJ_ENOMEM; 513 } 514 515 tcp->rdata.tp_info.pool = pool; 516 517 tcp->rdata.tp_info.transport = &tcp->base; 518 tcp->rdata.tp_info.tp_data = tcp; 519 tcp->rdata.tp_info.op_key.rdata = &tcp->rdata; 520 pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key, 521 sizeof(pj_ioqueue_op_key_t)); 522 523 tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr; 524 tcp->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in); 525 rem_addr = (pj_sockaddr_in*) &tcp->base.key.rem_addr; 526 pj_ansi_strcpy(tcp->rdata.pkt_info.src_name, 527 pj_inet_ntoa(rem_addr->sin_addr)); 528 tcp->rdata.pkt_info.src_port = pj_ntohs(rem_addr->sin_port); 529 530 size = sizeof(tcp->rdata.pkt_info.packet); 531 status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key, 532 tcp->rdata.pkt_info.packet, &size, 533 PJ_IOQUEUE_ALWAYS_ASYNC); 534 if (status != PJ_SUCCESS) { 535 tcp_perror(tcp->base.obj_name, "ioqueue recv() error", status); 536 return status; 537 } 538 539 return PJ_SUCCESS; 540 } 541 542 543 /* This callback is called by transport manager for the TCP factory 544 * to create outgoing transport to the specified destination. 545 */ 546 static pj_status_t lis_create_transport(pjsip_tpfactory *factory, 547 pjsip_tpmgr *mgr, 548 pjsip_endpoint *endpt, 549 const pj_sockaddr *rem_addr, 550 int addr_len, 551 pjsip_transport **p_transport) 552 { 553 struct tcp_listener *listener; 554 struct tcp_transport *tcp; 555 pj_sock_t sock; 556 pj_sockaddr_in local_addr; 557 pj_status_t status; 558 559 /* Sanity checks */ 560 PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr && 561 addr_len && p_transport, PJ_EINVAL); 562 563 /* Check that address is a sockaddr_in */ 564 PJ_ASSERT_RETURN(rem_addr->sa_family == PJ_AF_INET && 565 addr_len == sizeof(pj_sockaddr_in), PJ_EINVAL); 566 567 568 listener = (struct tcp_listener*)factory; 569 570 571 /* Create socket */ 572 status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock); 573 if (status != PJ_SUCCESS) 574 return status; 575 576 /* Bind to any port */ 577 status = pj_sock_bind_in(sock, 0, 0); 578 if (status != PJ_SUCCESS) { 579 pj_sock_close(sock); 580 return status; 581 } 582 583 /* Get the local port */ 584 addr_len = sizeof(pj_sockaddr_in); 585 status = pj_sock_getsockname(sock, &local_addr, &addr_len); 586 if (status != PJ_SUCCESS) { 587 pj_sock_close(sock); 588 return status; 589 } 590 591 /* Initially set the address from the listener's address */ 592 local_addr.sin_addr.s_addr = 593 ((pj_sockaddr_in*)&listener->factory.local_addr)->sin_addr.s_addr; 594 595 /* Create the transport descriptor */ 596 status = create_tcp_transport(listener, sock, &local_addr, 597 (pj_sockaddr_in*)rem_addr, &tcp); 598 if (status != PJ_SUCCESS) 599 return status; 600 601 /* Start asynchronous connect() operation */ 602 tcp->has_pending_connect = PJ_TRUE; 603 pj_ioqueue_op_key_init(&tcp->connect_op.op_key, 604 sizeof(tcp->connect_op.op_key)); 605 tcp->connect_op.transport = tcp; 606 status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in)); 607 if (status != PJ_SUCCESS) { 608 tcp_destroy(&tcp->base); 609 return status; 610 } 611 612 /* Update (again) local address, just in case local address currently 613 * set is different now that asynchronous connect() is started. 614 */ 615 addr_len = sizeof(pj_sockaddr_in); 616 if (pj_sock_getsockname(tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) { 617 pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; 618 619 /* Some systems (like old Win32 perhaps) may not set local address 620 * properly before socket is fully connected. 621 */ 622 if (tp_addr->sin_addr.s_addr != local_addr.sin_addr.s_addr && 623 local_addr.sin_addr.s_addr != 0) 624 { 625 tp_addr->sin_addr.s_addr = local_addr.sin_addr.s_addr; 626 tp_addr->sin_port = local_addr.sin_port; 627 sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name, 628 &local_addr); 629 } 630 } 631 632 /* Done */ 633 *p_transport = &tcp->base; 634 635 return PJ_SUCCESS; 636 } 637 638 639 /* 640 * This callback is called by ioqueue when pending accept() operation has 641 * completed. 642 */ 204 643 static void on_accept_complete( pj_ioqueue_key_t *key, 205 644 pj_ioqueue_op_key_t *op_key, … … 207 646 pj_status_t status) 208 647 { 209 } 210 211 212 static pj_status_t create_transport(pjsip_tpfactory *factory, 213 pjsip_tpmgr *mgr, 214 pjsip_endpoint *endpt, 215 const pj_sockaddr *rem_addr, 216 int addr_len, 217 pjsip_transport **transport) 218 { 219 } 220 648 struct tcp_listener *listener; 649 struct tcp_transport *tcp; 650 struct pending_accept *accept_op; 651 int err_cnt = 0; 652 653 listener = pj_ioqueue_get_user_data(key); 654 accept_op = (struct pending_accept*) op_key; 655 656 do { 657 if (status != PJ_SUCCESS) { 658 tcp_perror(listener->name, "Error in accept()", status); 659 660 ++err_cnt; 661 if (err_cnt >= 5) { 662 PJ_LOG(1, (listener->name, 663 "Too many errors, listener stopping")); 664 } 665 666 goto start_next_accept; 667 } 668 669 status = create_tcp_transport( listener, sock, 670 &accept_op->local_addr, 671 &accept_op->remote_addr, &tcp); 672 if (status == PJ_SUCCESS) { 673 status = tcp_start_read(tcp); 674 if (status != PJ_SUCCESS) { 675 PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); 676 tcp_destroy(&tcp->base); 677 } 678 } 679 680 start_next_accept: 681 682 status = pj_ioqueue_accept(listener->key, 683 &accept_op->op_key, 684 &accept_op->new_sock, 685 &accept_op->local_addr, 686 &accept_op->remote_addr, 687 &accept_op->addr_len); 688 689 } while (status != PJ_EPENDING); 690 } 691 692 693 /* Callback from ioqueue when packet is sent */ 694 static void on_write_complete(pj_ioqueue_key_t *key, 695 pj_ioqueue_op_key_t *op_key, 696 pj_ssize_t bytes_sent) 697 { 698 struct tcp_transport *tp = pj_ioqueue_get_user_data(key); 699 pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; 700 701 tdata_op_key->tdata = NULL; 702 703 if (tdata_op_key->callback) { 704 tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent); 705 } 706 } 707 708 709 /* This callback is called by transport manager to send SIP message */ 710 static pj_status_t tcp_send_msg(pjsip_transport *transport, 711 pjsip_tx_data *tdata, 712 const pj_sockaddr_t *rem_addr, 713 int addr_len, 714 void *token, 715 void (*callback)(pjsip_transport *transport, 716 void *token, 717 pj_ssize_t sent_bytes)) 718 { 719 struct tcp_transport *tcp = (struct tcp_transport*)transport; 720 pj_ssize_t size; 721 pj_status_t status; 722 723 /* Sanity check */ 724 PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL); 725 726 /* Check that there's no pending operation associated with the tdata */ 727 PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX); 728 729 /* Check the address is supported */ 730 PJ_ASSERT_RETURN(rem_addr && addr_len==sizeof(pj_sockaddr_in), PJ_EINVAL); 731 732 733 734 /* Init op key. */ 735 tdata->op_key.tdata = tdata; 736 tdata->op_key.token = token; 737 tdata->op_key.callback = callback; 738 739 /* If asynchronous connect() has not completed yet, just put the 740 * transmit data in the pending transmission list. 741 */ 742 pj_lock_acquire(tcp->base.lock); 743 744 if (tcp->has_pending_connect) { 745 struct pending_tdata *pending_tdata; 746 747 /* Pust to list */ 748 pending_tdata = pj_pool_alloc(tdata->pool, sizeof(*pending_tdata)); 749 pending_tdata->tdata_op_key = &tdata->op_key; 750 751 pj_list_push_back(&tcp->tx_list, pending_tdata); 752 status = PJ_EPENDING; 753 754 } else { 755 /* send to ioqueue! */ 756 size = tdata->buf.cur - tdata->buf.start; 757 status = pj_ioqueue_send(tcp->key, 758 (pj_ioqueue_op_key_t*)&tdata->op_key, 759 tdata->buf.start, &size, 0); 760 761 if (status != PJ_EPENDING) 762 tdata->op_key.tdata = NULL; 763 } 764 765 pj_lock_release(tcp->base.lock); 766 767 return status; 768 } 769 770 771 /* This callback is called by transport manager to shutdown transport */ 772 static pj_status_t tcp_shutdown(pjsip_transport *transport) 773 { 774 775 PJ_UNUSED_ARG(transport); 776 777 /* Nothing to do for TCP */ 778 return PJ_SUCCESS; 779 } 780 781 782 /* Callback from ioqueue on incoming packet */ 783 static void on_read_complete(pj_ioqueue_key_t *key, 784 pj_ioqueue_op_key_t *op_key, 785 pj_ssize_t bytes_read) 786 { 787 enum { MAX_IMMEDIATE_PACKET = 10 }; 788 pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; 789 pjsip_rx_data *rdata = rdata_op_key->rdata; 790 struct tcp_transport *tp = (struct tcp_transport*)rdata->tp_info.transport; 791 int i; 792 pj_status_t status; 793 794 /* Don't do anything if transport is closing. */ 795 if (tp->is_closing) { 796 tp->is_closing++; 797 return; 798 } 799 800 /* 801 * The idea of the loop is to process immediate data received by 802 * pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When 803 * i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to 804 * complete asynchronously, to allow other sockets to get their data. 805 */ 806 for (i=0;; ++i) { 807 pj_uint32_t flags; 808 809 /* Report the packet to transport manager. */ 810 if (bytes_read > 0) { 811 pj_size_t size_eaten; 812 813 /* Init pkt_info part. */ 814 rdata->pkt_info.len += bytes_read; 815 rdata->pkt_info.zero = 0; 816 pj_gettimeofday(&rdata->pkt_info.timestamp); 817 818 size_eaten = 819 pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, 820 rdata); 821 822 pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); 823 824 /* Move unprocessed data to the front of the buffer */ 825 if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) { 826 pj_memmove(rdata->pkt_info.packet, 827 rdata->pkt_info.packet + size_eaten, 828 rdata->pkt_info.len - size_eaten); 829 } 830 831 rdata->pkt_info.len -= size_eaten; 832 833 } else if (bytes_read == 0) { 834 835 /* Transport is closed */ 836 PJ_LOG(4,(tp->base.obj_name, "tcp connection closed")); 837 tcp_destroy(&tp->base); 838 return; 839 840 } else if (bytes_read < 0) { 841 842 /* Report error to endpoint. */ 843 PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, 844 rdata->tp_info.transport->obj_name, 845 -bytes_read, "tcp recv() error")); 846 847 /* Transport error, close transport */ 848 tcp_destroy(&tp->base); 849 return; 850 } 851 852 if (i >= MAX_IMMEDIATE_PACKET) { 853 /* Force ioqueue_recv() to return PJ_EPENDING */ 854 flags = PJ_IOQUEUE_ALWAYS_ASYNC; 855 } else { 856 flags = 0; 857 } 858 859 /* Reset pool. */ 860 pj_pool_reset(rdata->tp_info.pool); 861 862 /* Read next packet. */ 863 bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len; 864 rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr); 865 status = pj_ioqueue_recv(key, op_key, 866 rdata->pkt_info.packet+rdata->pkt_info.len, 867 &bytes_read, flags); 868 869 if (status == PJ_SUCCESS) { 870 /* Continue loop. */ 871 pj_assert(i < MAX_IMMEDIATE_PACKET); 872 873 } else if (status == PJ_EPENDING) { 874 break; 875 876 } else { 877 /* Report error to endpoint */ 878 PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, 879 rdata->tp_info.transport->obj_name, 880 status, "tcp recv() error")); 881 882 /* Transport error, close transport */ 883 tcp_destroy(&tp->base); 884 return; 885 } 886 } 887 } 888 889 890 /* Callback from ioqueue when connect completes */ 891 static void on_connect_complete(pj_ioqueue_key_t *key, 892 pj_status_t status) 893 { 894 struct pending_connect *connect_op = (struct pending_connect *)key; 895 struct tcp_transport *tcp = connect_op->transport; 896 pj_sockaddr_in addr; 897 int addrlen; 898 899 /* Mark that pending connect() operation has completed. */ 900 tcp->has_pending_connect = PJ_FALSE; 901 902 /* Check connect() status */ 903 if (status != PJ_SUCCESS) { 904 tcp_perror(tcp->base.obj_name, "TCP connect() error", status); 905 tcp_destroy(&tcp->base); 906 return; 907 } 908 909 /* Update (again) local address, just in case local address currently 910 * set is different now that the socket is connected (could happen 911 * on some systems, like old Win32 probably?). 912 */ 913 addrlen = sizeof(pj_sockaddr_in); 914 if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) { 915 pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; 916 917 if (tp_addr->sin_addr.s_addr != addr.sin_addr.s_addr) { 918 tp_addr->sin_addr.s_addr = addr.sin_addr.s_addr; 919 tp_addr->sin_port = addr.sin_port; 920 sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name, 921 tp_addr); 922 } 923 } 924 925 /* Start pending read */ 926 status = tcp_start_read(tcp); 927 if (status != PJ_SUCCESS) { 928 tcp_destroy(&tcp->base); 929 return; 930 } 931 932 /* Flush all pending send operations */ 933 tcp_flush_pending_tx(tcp); 934 } 935
Note: See TracChangeset
for help on using the changeset viewer.