Changeset 563 for pjproject/trunk/pjsip/src/pjsip/sip_transport_tcp.c
- Timestamp:
- Jun 28, 2006 4:46:49 PM (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjsip/src/pjsip/sip_transport_tcp.c
r554 r563 21 21 #include <pjsip/sip_errno.h> 22 22 #include <pj/compat/socket.h> 23 #include <pj/addr_resolv.h> 23 24 #include <pj/assert.h> 24 25 #include <pj/ioqueue.h> … … 34 35 #define MAX_ASYNC_CNT 16 35 36 #define POOL_LIS_INIT 4000 36 #define POOL_LIS_INC 400 037 #define POOL_LIS_INC 4001 37 38 #define POOL_TP_INIT 4000 38 #define POOL_TP_INC 400 039 #define POOL_TP_INC 4002 39 40 40 41 … … 43 44 44 45 46 /* 47 * This structure is "descendant" of pj_ioqueue_op_key_t, and it is used to 48 * track pending/asynchronous accept() operation. TCP transport may have 49 * more than one pending accept() operations, depending on the value of 50 * async_cnt. 51 */ 45 52 struct pending_accept 46 53 { … … 53 60 }; 54 61 55 struct pending_connect 56 { 57 pj_ioqueue_op_key_t op_key; 58 struct tcp_transport *transport; 59 }; 60 61 62 63 /* 64 * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the 65 * SIP transport factory). 66 */ 62 67 struct tcp_listener 63 68 { 64 69 pjsip_tpfactory factory; 65 char name[PJ_MAX_OBJ_NAME];66 pj_bool_t active;70 char obj_name[PJ_MAX_OBJ_NAME]; 71 pj_bool_t is_registered; 67 72 pjsip_endpoint *endpt; 68 73 pjsip_tpmgr *tpmgr; … … 74 79 75 80 76 struct pending_tdata 77 { 78 PJ_DECL_LIST_MEMBER(struct pending_tdata); 81 /* 82 * This structure is used to keep delayed transmit operation in a list. 83 * A delayed transmission occurs when application sends tx_data when 84 * the TCP connect/establishment is still in progress. These delayed 85 * transmission will be "flushed" once the socket is connected (either 86 * successfully or with errors). 87 */ 88 struct delayed_tdata 89 { 90 PJ_DECL_LIST_MEMBER(struct delayed_tdata); 79 91 pjsip_tx_data_op_key *tdata_op_key; 80 92 }; 81 93 82 94 95 /* 96 * This structure describes the TCP transport, and it's descendant of 97 * pjsip_transport. 98 */ 83 99 struct tcp_transport 84 100 { 85 101 pjsip_transport base; 102 pj_bool_t is_server; 86 103 struct tcp_listener *listener; 87 104 pj_bool_t is_registered; 88 105 pj_bool_t is_closing; 106 pj_status_t close_reason; 89 107 pj_sock_t sock; 90 108 pj_ioqueue_key_t *key; 91 109 pj_bool_t has_pending_connect; 92 struct pending_connect connect_op;93 110 94 111 … … 100 117 101 118 /* Pending transmission list. */ 102 struct pending_tdata tx_list;119 struct delayed_tdata delayed_list; 103 120 }; 104 121 105 122 106 /* 107 * This callback is called when #pj_ioqueue_accept completes. 108 */ 123 /**************************************************************************** 124 * PROTOTYPES 125 */ 126 127 /* This callback is called when pending accept() operation completes. */ 109 128 static void on_accept_complete( pj_ioqueue_key_t *key, 110 129 pj_ioqueue_op_key_t *op_key, … … 112 131 pj_status_t status); 113 132 114 static pj_status_t lis_destroy(struct tcp_listener *listener); 133 /* This callback is called by transport manager to destroy listener */ 134 static pj_status_t lis_destroy(pjsip_tpfactory *factory); 135 136 /* This callback is called by transport manager to create transport */ 115 137 static pj_status_t lis_create_transport(pjsip_tpfactory *factory, 116 138 pjsip_tpmgr *mgr, … … 120 142 pjsip_transport **transport); 121 143 122 123 static pj_status_t create_tcp_transport(struct tcp_listener *listener,124 pj_sock_t sock,125 126 127 144 /* Common function to create and initialize transport */ 145 static pj_status_t tcp_create(struct tcp_listener *listener, 146 pj_sock_t sock, pj_bool_t is_server, 147 const pj_sockaddr_in *local, 148 const pj_sockaddr_in *remote, 149 struct tcp_transport **p_tcp); 128 150 129 151 … … 139 161 140 162 163 static void sockaddr_to_host_port( pj_pool_t *pool, 164 pjsip_host_port *host_port, 165 const pj_sockaddr_in *addr ) 166 { 167 host_port->host.ptr = pj_pool_alloc(pool, 48); 168 host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s", 169 pj_inet_ntoa(addr->sin_addr)); 170 host_port->port = pj_ntohs(addr->sin_port); 171 } 172 173 174 175 /**************************************************************************** 176 * The TCP listener/transport factory. 177 */ 178 179 /* 180 * This is the public API to create, initialize, register, and start the 181 * TCP listener. 182 */ 141 183 PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, 142 184 const pj_sockaddr_in *local, 143 unsigned async_cnt) 185 unsigned async_cnt, 186 pjsip_tpfactory **p_factory) 144 187 { 145 188 pj_pool_t *pool; 146 189 struct tcp_listener *listener; 147 190 pj_ioqueue_callback listener_cb; 191 pj_sockaddr_in *listener_addr; 192 int addr_len; 148 193 unsigned i; 149 194 pj_status_t status; 150 195 151 196 /* Sanity check */ 152 PJ_ASSERT_RETURN(endpt && local &&async_cnt, PJ_EINVAL);197 PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL); 153 198 154 199 … … 159 204 160 205 listener = pj_pool_zalloc(pool, sizeof(struct tcp_listener)); 161 pj_ansi_sprintf(listener->name, "tcp:%d", (int)pj_ntohs(local->sin_port));162 206 listener->factory.pool = pool; 163 207 listener->factory.type = PJSIP_TRANSPORT_TCP; 164 pj_ansi_strcpy(listener->factory.type_name, "tcp");208 listener->factory.type_name = "tcp"; 165 209 listener->factory.flag = 166 210 pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); 167 211 listener->sock = PJ_INVALID_SOCKET; 212 213 pj_ansi_strcpy(listener->obj_name, "tcp"); 168 214 169 215 status = pj_lock_create_recursive_mutex(pool, "tcplis", … … 178 224 goto on_error; 179 225 180 pj_memcpy(&listener->factory.local_addr, local, sizeof(pj_sockaddr_in)); 181 status = pj_sock_bind(listener->sock, local, sizeof(*local)); 226 listener_addr = (pj_sockaddr_in*)&listener->factory.local_addr; 227 if (local) { 228 pj_memcpy(listener_addr, local, sizeof(pj_sockaddr_in)); 229 } else { 230 pj_sockaddr_in_init(listener_addr, NULL, 0); 231 } 232 233 status = pj_sock_bind(listener->sock, listener_addr, 234 sizeof(pj_sockaddr_in)); 182 235 if (status != PJ_SUCCESS) 183 236 goto on_error; 237 238 /* Retrieve the bound address */ 239 addr_len = sizeof(pj_sockaddr_in); 240 status = pj_sock_getsockname(listener->sock, listener_addr, &addr_len); 241 if (status != PJ_SUCCESS) 242 goto on_error; 243 244 /* If the address returns 0.0.0.0, use the first interface address 245 * as the transport's address. 246 */ 247 if (listener_addr->sin_addr.s_addr == 0) { 248 const pj_str_t *hostname; 249 struct pj_hostent he; 250 251 hostname = pj_gethostname(); 252 status = pj_gethostbyname(hostname, &he); 253 if (status != PJ_SUCCESS) 254 goto on_error; 255 256 listener_addr->sin_addr = *(pj_in_addr*)he.h_addr; 257 } 258 259 pj_ansi_sprintf(listener->obj_name, "tcp:%d", 260 (int)pj_ntohs(listener_addr->sin_port)); 261 262 /* Save the address name */ 263 sockaddr_to_host_port(listener->factory.pool, 264 &listener->factory.addr_name, listener_addr); 265 266 /* Start listening to the address */ 267 status = pj_sock_listen(listener->sock, PJSIP_TCP_TRANSPORT_BACKLOG); 268 if (status != PJ_SUCCESS) 269 goto on_error; 270 184 271 185 272 /* Register socket to ioqeuue */ … … 192 279 goto on_error; 193 280 194 /* Start pending accept() operation */ 281 /* Register to transport manager */ 282 listener->endpt = endpt; 283 listener->tpmgr = pjsip_endpt_get_tpmgr(endpt); 284 listener->factory.create_transport = lis_create_transport; 285 listener->factory.destroy = lis_destroy; 286 listener->is_registered = PJ_TRUE; 287 status = pjsip_tpmgr_register_tpfactory(listener->tpmgr, 288 &listener->factory); 289 if (status != PJ_SUCCESS) { 290 listener->is_registered = PJ_FALSE; 291 goto on_error; 292 } 293 294 295 /* Start pending accept() operations */ 195 296 if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; 196 297 listener->async_cnt = async_cnt; … … 201 302 listener->accept_op[i].listener = listener; 202 303 203 status = pj_ioqueue_accept(listener->key, 204 &listener->accept_op[i].op_key, 205 &listener->accept_op[i].new_sock, 206 &listener->accept_op[i].local_addr, 207 &listener->accept_op[i].remote_addr, 208 &listener->accept_op[i].addr_len); 209 if (status != PJ_SUCCESS && status != PJ_EPENDING) 210 goto on_error; 211 } 212 213 /* Register to transport manager */ 214 listener->endpt = endpt; 215 listener->tpmgr = pjsip_endpt_get_tpmgr(endpt); 216 listener->factory.create_transport = lis_create_transport; 217 status = pjsip_tpmgr_register_tpfactory(listener->tpmgr, 218 &listener->factory); 219 if (status != PJ_SUCCESS) 220 goto on_error; 221 222 /* Done! */ 223 listener->active = PJ_TRUE; 224 225 PJ_LOG(4,(listener->name, 226 "SIP TCP transport listening for incoming connections at %s:%d", 227 pj_inet_ntoa(local->sin_addr), (int)pj_ntohs(local->sin_port))); 304 on_accept_complete(listener->key, &listener->accept_op[i].op_key, 305 listener->sock, PJ_EPENDING); 306 } 307 308 PJ_LOG(4,(listener->obj_name, 309 "SIP TCP listener ready for incoming connections at %s:%d", 310 pj_inet_ntoa(listener_addr->sin_addr), 311 (int)pj_ntohs(listener_addr->sin_port))); 312 313 /* Return the pointer to user */ 314 if (p_factory) *p_factory = &listener->factory; 228 315 229 316 return PJ_SUCCESS; 230 317 231 318 on_error: 232 lis_destroy( listener);319 lis_destroy(&listener->factory); 233 320 return status; 234 321 } 235 322 236 323 237 static pj_status_t lis_destroy(struct tcp_listener *listener) 238 { 239 if (listener->active) { 324 /* This callback is called by transport manager to destroy listener */ 325 static pj_status_t lis_destroy(pjsip_tpfactory *factory) 326 { 327 struct tcp_listener *listener = (struct tcp_listener *)factory; 328 329 if (listener->is_registered) { 240 330 pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory); 241 listener-> active= PJ_FALSE;331 listener->is_registered = PJ_FALSE; 242 332 } 243 333 … … 259 349 260 350 if (listener->factory.pool) { 261 PJ_LOG(4,(listener->name, "SIP TCP transport destroyed")); 262 pj_pool_release(listener->factory.pool); 351 pj_pool_t *pool = listener->factory.pool; 352 353 PJ_LOG(4,(listener->obj_name, "SIP TCP listener destroyed")); 354 263 355 listener->factory.pool = NULL; 356 pj_pool_release(pool); 264 357 } 265 358 … … 289 382 static pj_status_t tcp_shutdown(pjsip_transport *transport); 290 383 291 /* Called by transport manager to destroy */ 292 static pj_status_t tcp_destroy(pjsip_transport *transport); 384 /* Called by transport manager to destroy transport */ 385 static pj_status_t tcp_destroy_transport(pjsip_transport *transport); 386 387 /* Utility to destroy transport */ 388 static pj_status_t tcp_destroy(pjsip_transport *transport, 389 pj_status_t reason); 293 390 294 391 /* Callback from ioqueue on incoming packet */ … … 307 404 308 405 309 static void sockaddr_to_host_port( pj_pool_t *pool,310 pjsip_host_port *host_port,311 const pj_sockaddr_in *addr )312 {313 host_port->host.ptr = pj_pool_alloc(pool, 48);314 host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s",315 pj_inet_ntoa(addr->sin_addr));316 host_port->port = pj_ntohs(addr->sin_port);317 }318 319 320 406 /* 321 * Utilities to create TCP transport. 322 */ 323 static pj_status_t create_tcp_transport(struct tcp_listener *listener, 324 pj_sock_t sock, 325 const pj_sockaddr_in *local, 326 const pj_sockaddr_in *remote, 327 struct tcp_transport **p_tcp) 407 * Common function to create TCP transport, called when pending accept() and 408 * pending connect() complete. 409 */ 410 static pj_status_t tcp_create( struct tcp_listener *listener, 411 pj_sock_t sock, pj_bool_t is_server, 412 const pj_sockaddr_in *local, 413 const pj_sockaddr_in *remote, 414 struct tcp_transport **p_tcp) 328 415 { 329 416 struct tcp_transport *tcp; … … 333 420 pj_status_t status; 334 421 335 pool = pjsip_endpt_create_pool(listener->endpt, "tcp", 422 423 PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL); 424 425 426 pool = pjsip_endpt_create_pool(listener->endpt, "tcp", 336 427 POOL_TP_INIT, POOL_TP_INC); 428 PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM); 337 429 430 338 431 /* 339 432 * Create and initialize basic transport structure. … … 341 434 tcp = pj_pool_zalloc(pool, sizeof(*tcp)); 342 435 tcp->sock = sock; 436 tcp->is_server = is_server; 343 437 tcp->listener = listener; 344 pj_list_init(&tcp->tx_list); 345 346 347 pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, "tcp%p", tcp); 438 pj_list_init(&tcp->delayed_list); 348 439 tcp->base.pool = pool; 349 440 441 pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, 442 (is_server ? "tcps%p" :"tcpc%p"), tcp); 443 350 444 status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt); 351 if (status != PJ_SUCCESS) 445 if (status != PJ_SUCCESS) { 352 446 goto on_error; 447 } 353 448 354 449 status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock); 355 if (status != PJ_SUCCESS) 450 if (status != PJ_SUCCESS) { 356 451 goto on_error; 452 } 357 453 358 454 tcp->base.key.type = PJSIP_TRANSPORT_TCP; … … 375 471 tcp->base.send_msg = &tcp_send_msg; 376 472 tcp->base.do_shutdown = &tcp_shutdown; 377 tcp->base.destroy = &tcp_destroy ;473 tcp->base.destroy = &tcp_destroy_transport; 378 474 379 475 … … 387 483 status = pj_ioqueue_register_sock(pool, ioqueue, sock, 388 484 tcp, &tcp_callback, &tcp->key); 389 if (status != PJ_SUCCESS) 485 if (status != PJ_SUCCESS) { 390 486 goto on_error; 487 } 391 488 392 489 /* Register transport to transport manager */ 393 490 status = pjsip_transport_register(listener->tpmgr, &tcp->base); 394 if (status != PJ_SUCCESS) 491 if (status != PJ_SUCCESS) { 395 492 goto on_error; 493 } 396 494 397 495 tcp->is_registered = PJ_TRUE; … … 400 498 *p_tcp = tcp; 401 499 500 PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created", 501 (tcp->is_server ? "server" : "client"))); 502 503 return PJ_SUCCESS; 504 402 505 on_error: 403 tcp_destroy(&tcp->base );506 tcp_destroy(&tcp->base, status); 404 507 return status; 405 508 } 406 509 407 510 408 /* Flush all pending send operations*/409 static tcp_flush_pending_tx(struct tcp_transport *tcp)511 /* Flush all delayed transmision once the socket is connected. */ 512 static void tcp_flush_pending_tx(struct tcp_transport *tcp) 410 513 { 411 514 pj_lock_acquire(tcp->base.lock); 412 while (!pj_list_empty(&tcp-> tx_list)) {413 struct pending_tdata *pending_tx;515 while (!pj_list_empty(&tcp->delayed_list)) { 516 struct delayed_tdata *pending_tx; 414 517 pjsip_tx_data *tdata; 415 518 pj_ioqueue_op_key_t *op_key; … … 417 520 pj_status_t status; 418 521 419 pending_tx = tcp-> tx_list.next;522 pending_tx = tcp->delayed_list.next; 420 523 pj_list_erase(pending_tx); 421 524 … … 437 540 438 541 542 /* Called by transport manager to destroy transport */ 543 static pj_status_t tcp_destroy_transport(pjsip_transport *transport) 544 { 545 struct tcp_transport *tcp = (struct tcp_transport*)transport; 546 547 /* Transport would have been unregistered by now since this callback 548 * is called by transport manager. 549 */ 550 tcp->is_registered = PJ_FALSE; 551 552 return tcp_destroy(transport, tcp->close_reason); 553 } 554 439 555 440 556 /* Destroy TCP transport */ 441 static pj_status_t tcp_destroy(pjsip_transport *transport) 557 static pj_status_t tcp_destroy(pjsip_transport *transport, 558 pj_status_t reason) 442 559 { 443 560 struct tcp_transport *tcp = (struct tcp_transport*)transport; 444 561 445 /* Cancel all pending transmits */ 446 while (!pj_list_empty(&tcp->tx_list)) { 447 struct pending_tdata *pending_tx; 562 if (tcp->close_reason == 0) 563 tcp->close_reason = reason; 564 565 if (tcp->is_registered) { 566 tcp->is_registered = PJ_FALSE; 567 pjsip_transport_destroy(transport); 568 569 /* pjsip_transport_destroy will recursively call this function 570 * again. 571 */ 572 return PJ_SUCCESS; 573 } 574 575 /* Mark transport as closing */ 576 tcp->is_closing = PJ_TRUE; 577 578 /* Cancel all delayed transmits */ 579 while (!pj_list_empty(&tcp->delayed_list)) { 580 struct delayed_tdata *pending_tx; 448 581 pj_ioqueue_op_key_t *op_key; 449 582 450 pending_tx = tcp-> tx_list.next;583 pending_tx = tcp->delayed_list.next; 451 584 pj_list_erase(pending_tx); 452 585 453 586 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 454 587 455 on_write_complete(tcp->key, op_key, 456 -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN)); 457 } 458 459 if (tcp->is_registered) { 460 pjsip_transport_destroy(transport); 461 tcp->is_registered = PJ_FALSE; 588 on_write_complete(tcp->key, op_key, -reason); 462 589 } 463 590 … … 470 597 pj_ioqueue_unregister(tcp->key); 471 598 tcp->key = NULL; 599 tcp->sock = PJ_INVALID_SOCKET; 600 } 601 602 if (tcp->sock != PJ_INVALID_SOCKET) { 603 pj_sock_close(tcp->sock); 604 tcp->sock = PJ_INVALID_SOCKET; 472 605 } 473 606 … … 483 616 484 617 if (tcp->base.pool) { 485 PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed")); 486 pj_pool_release(tcp->base.pool); 618 pj_pool_t *pool; 619 620 if (reason != PJ_SUCCESS) { 621 char errmsg[PJ_ERR_MSG_SIZE]; 622 623 pj_strerror(reason, errmsg, sizeof(errmsg)); 624 PJ_LOG(4,(tcp->base.obj_name, 625 "TCP transport destroyed with reason %d: %s", 626 reason, errmsg)); 627 628 } else { 629 630 PJ_LOG(4,(tcp->base.obj_name, 631 "TCP transport destroyed normally")); 632 633 } 634 635 pool = tcp->base.pool; 487 636 tcp->base.pool = NULL; 637 pj_pool_release(pool); 488 638 } 489 639 … … 494 644 /* 495 645 * This utility function creates receive data buffers and start 496 * asynchronous recv() operations from the socket. 646 * asynchronous recv() operations from the socket. It is called after 647 * accept() or connect() operation complete. 497 648 */ 498 649 static pj_status_t tcp_start_read(struct tcp_transport *tcp) … … 532 683 tcp->rdata.pkt_info.packet, &size, 533 684 PJ_IOQUEUE_ALWAYS_ASYNC); 534 if (status != PJ_SUCCESS ) {685 if (status != PJ_SUCCESS && status != PJ_EPENDING) { 535 686 tcp_perror(tcp->base.obj_name, "ioqueue recv() error", status); 536 687 return status; … … 594 745 595 746 /* Create the transport descriptor */ 596 status = create_tcp_transport(listener, sock, &local_addr,597 747 status = tcp_create(listener, sock, PJ_FALSE, &local_addr, 748 (pj_sockaddr_in*)rem_addr, &tcp); 598 749 if (status != PJ_SUCCESS) 599 750 return status; 600 751 752 601 753 /* Start asynchronous connect() operation */ 602 754 tcp->has_pending_connect = PJ_TRUE; 603 pj_ioqueue_op_key_init(&tcp->connect_op.op_key,604 sizeof(tcp->connect_op.op_key));605 tcp->connect_op.transport = tcp;606 755 status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in)); 607 if (status != PJ_SUCCESS) { 608 tcp_destroy(&tcp->base); 756 if (status == PJ_SUCCESS) { 757 tcp->has_pending_connect = PJ_FALSE; 758 } else if (status != PJ_EPENDING) { 759 tcp_destroy(&tcp->base, status); 609 760 return status; 610 761 } … … 630 781 } 631 782 783 if (tcp->has_pending_connect) { 784 PJ_LOG(4,(tcp->base.obj_name, 785 "TCP transport %.*s:%d is connecting to %.*s:%d...", 786 (int)tcp->base.local_name.host.slen, 787 tcp->base.local_name.host.ptr, 788 tcp->base.local_name.port, 789 (int)tcp->base.remote_name.host.slen, 790 tcp->base.remote_name.host.ptr, 791 tcp->base.remote_name.port)); 792 } 793 632 794 /* Done */ 633 795 *p_transport = &tcp->base; … … 654 816 accept_op = (struct pending_accept*) op_key; 655 817 818 /* 819 * Loop while there is immediate connection or when there is error. 820 */ 656 821 do { 657 if (status != PJ_SUCCESS) { 658 tcp_perror(listener->name, "Error in accept()", status); 659 822 if (status == PJ_EPENDING) { 823 /* 824 * This can only happen when this function is called during 825 * initialization to kick off asynchronous accept(). 826 */ 827 828 } else if (status != PJ_SUCCESS) { 829 830 /* 831 * Error in accept(). 832 */ 833 tcp_perror(listener->obj_name, "Error in accept()", status); 834 835 /* 836 * Prevent endless accept() error loop by limiting the 837 * number of consecutive errors. Once the number of errors 838 * is equal to maximum, we treat this as permanent error, and 839 * we stop the accept() operation. 840 */ 660 841 ++err_cnt; 661 if (err_cnt >= 5) {662 PJ_LOG(1, (listener-> name,842 if (err_cnt >= 10) { 843 PJ_LOG(1, (listener->obj_name, 663 844 "Too many errors, listener stopping")); 664 845 } 665 846 666 goto start_next_accept; 667 } 668 669 status = create_tcp_transport( listener, sock, 670 &accept_op->local_addr, 671 &accept_op->remote_addr, &tcp); 672 if (status == PJ_SUCCESS) { 673 status = tcp_start_read(tcp); 674 if (status != PJ_SUCCESS) { 675 PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); 676 tcp_destroy(&tcp->base); 847 } else { 848 849 if (sock == PJ_INVALID_SOCKET) { 850 sock = accept_op->new_sock; 851 PJ_LOG(4,(listener->obj_name, 852 "Warning: ioqueue reports -1 in on_accept_complete()" 853 " sock argument")); 854 } 855 856 PJ_LOG(4,(listener->obj_name, 857 "TCP listener %.*s:%d: got incoming TCP connection " 858 "from %s:%d, sock=%d", 859 (int)listener->factory.addr_name.host.slen, 860 listener->factory.addr_name.host.ptr, 861 listener->factory.addr_name.port, 862 pj_inet_ntoa(accept_op->remote_addr.sin_addr), 863 pj_ntohs(accept_op->remote_addr.sin_port), 864 sock)); 865 866 /* 867 * Incoming connections! 868 * Create TCP transport for the new socket. 869 */ 870 status = tcp_create( listener, sock, PJ_TRUE, 871 &accept_op->local_addr, 872 &accept_op->remote_addr, &tcp); 873 if (status == PJ_SUCCESS) { 874 status = tcp_start_read(tcp); 875 if (status != PJ_SUCCESS) { 876 PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); 877 tcp_destroy(&tcp->base, status); 878 } 677 879 } 678 880 } 679 881 680 start_next_accept: 882 /* 883 * Start the next asynchronous accept() operation. 884 */ 885 accept_op->addr_len = sizeof(pj_sockaddr_in); 886 accept_op->new_sock = PJ_INVALID_SOCKET; 681 887 682 888 status = pj_ioqueue_accept(listener->key, … … 687 893 &accept_op->addr_len); 688 894 895 /* 896 * Loop while we have immediate connection or when there is error. 897 */ 898 689 899 } while (status != PJ_EPENDING); 690 900 } 691 901 692 902 693 /* Callback from ioqueue when packet is sent */ 903 /* 904 * Callback from ioqueue when packet is sent. 905 */ 694 906 static void on_write_complete(pj_ioqueue_key_t *key, 695 907 pj_ioqueue_op_key_t *op_key, 696 908 pj_ssize_t bytes_sent) 697 909 { 698 struct tcp_transport *t p = pj_ioqueue_get_user_data(key);910 struct tcp_transport *tcp = pj_ioqueue_get_user_data(key); 699 911 pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; 700 912 701 913 tdata_op_key->tdata = NULL; 702 914 915 /* Check for error/closure */ 916 if (bytes_sent <= 0) { 917 pj_status_t status; 918 919 PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 920 bytes_sent)); 921 922 status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) : 923 -bytes_sent; 924 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 925 pjsip_transport_shutdown(&tcp->base); 926 } 927 703 928 if (tdata_op_key->callback) { 704 tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent); 705 } 706 } 707 708 709 /* This callback is called by transport manager to send SIP message */ 929 /* 930 * Notify sip_transport.c that packet has been sent. 931 */ 932 tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent); 933 } 934 } 935 936 937 /* 938 * This callback is called by transport manager to send SIP message 939 */ 710 940 static pj_status_t tcp_send_msg(pjsip_transport *transport, 711 941 pjsip_tx_data *tdata, … … 719 949 struct tcp_transport *tcp = (struct tcp_transport*)transport; 720 950 pj_ssize_t size; 721 pj_status_t status; 951 pj_bool_t delayed = PJ_FALSE; 952 pj_status_t status = PJ_SUCCESS; 722 953 723 954 /* Sanity check */ … … 738 969 739 970 /* If asynchronous connect() has not completed yet, just put the 740 * transmit data in the pending transmission list. 971 * transmit data in the pending transmission list since we can not 972 * use the socket yet. 741 973 */ 742 pj_lock_acquire(tcp->base.lock);743 744 974 if (tcp->has_pending_connect) { 745 struct pending_tdata *pending_tdata; 746 747 /* Pust to list */ 748 pending_tdata = pj_pool_alloc(tdata->pool, sizeof(*pending_tdata)); 749 pending_tdata->tdata_op_key = &tdata->op_key; 750 751 pj_list_push_back(&tcp->tx_list, pending_tdata); 752 status = PJ_EPENDING; 753 754 } else { 755 /* send to ioqueue! */ 975 976 /* 977 * Looks like connect() is still in progress. Check again (this time 978 * with holding the lock) to be sure. 979 */ 980 pj_lock_acquire(tcp->base.lock); 981 982 if (tcp->has_pending_connect) { 983 struct delayed_tdata *delayed_tdata; 984 985 /* 986 * connect() is still in progress. Put the transmit data to 987 * the delayed list. 988 */ 989 delayed_tdata = pj_pool_alloc(tdata->pool, 990 sizeof(*delayed_tdata)); 991 delayed_tdata->tdata_op_key = &tdata->op_key; 992 993 pj_list_push_back(&tcp->delayed_list, delayed_tdata); 994 status = PJ_EPENDING; 995 996 /* Prevent pj_ioqueue_send() to be called below */ 997 delayed = PJ_TRUE; 998 } 999 1000 pj_lock_release(tcp->base.lock); 1001 } 1002 1003 if (!delayed) { 1004 /* 1005 * Transport is ready to go. Send the packet to ioqueue to be 1006 * sent asynchronously. 1007 */ 756 1008 size = tdata->buf.cur - tdata->buf.start; 757 1009 status = pj_ioqueue_send(tcp->key, … … 759 1011 tdata->buf.start, &size, 0); 760 1012 761 if (status != PJ_EPENDING) 1013 if (status != PJ_EPENDING) { 1014 /* Not pending (could be immediate success or error) */ 762 1015 tdata->op_key.tdata = NULL; 763 } 764 765 pj_lock_release(tcp->base.lock); 1016 1017 /* Shutdown transport on closure/errors */ 1018 if (size <= 0) { 1019 1020 PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 1021 size)); 1022 1023 if (status == PJ_SUCCESS) 1024 status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); 1025 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 1026 pjsip_transport_shutdown(&tcp->base); 1027 } 1028 } 1029 } 766 1030 767 1031 return status; … … 769 1033 770 1034 771 /* This callback is called by transport manager to shutdown transport */ 1035 /* 1036 * This callback is called by transport manager to shutdown transport. 1037 * This normally is only used by UDP transport. 1038 */ 772 1039 static pj_status_t tcp_shutdown(pjsip_transport *transport) 773 1040 { … … 780 1047 781 1048 782 /* Callback from ioqueue on incoming packet */ 1049 /* 1050 * Callback from ioqueue that an incoming data is received from the socket. 1051 */ 783 1052 static void on_read_complete(pj_ioqueue_key_t *key, 784 1053 pj_ioqueue_op_key_t *op_key, … … 788 1057 pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; 789 1058 pjsip_rx_data *rdata = rdata_op_key->rdata; 790 struct tcp_transport *tp = (struct tcp_transport*)rdata->tp_info.transport; 1059 struct tcp_transport *tcp = 1060 (struct tcp_transport*)rdata->tp_info.transport; 791 1061 int i; 792 1062 pj_status_t status; 793 1063 794 1064 /* Don't do anything if transport is closing. */ 795 if (t p->is_closing) {796 t p->is_closing++;1065 if (tcp->is_closing) { 1066 tcp->is_closing++; 797 1067 return; 798 1068 } … … 807 1077 pj_uint32_t flags; 808 1078 809 /* Report the packet to transport manager. */ 1079 /* Houston, we have packet! Report the packet to transport manager 1080 * to be parsed. 1081 */ 810 1082 if (bytes_read > 0) { 811 1083 pj_size_t size_eaten; … … 816 1088 pj_gettimeofday(&rdata->pkt_info.timestamp); 817 1089 1090 /* Report to transport manager. 1091 * The transport manager will tell us how many bytes of the packet 1092 * have been processed (as valid SIP message). 1093 */ 818 1094 size_eaten = 819 1095 pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, … … 834 1110 835 1111 /* Transport is closed */ 836 PJ_LOG(4,(tp->base.obj_name, "tcp connection closed")); 837 tcp_destroy(&tp->base); 1112 PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); 1113 1114 /* We can not destroy the transport since high level objects may 1115 * still keep reference to this transport. So we can only 1116 * instruct transport manager to gracefully start the shutdown 1117 * procedure for this transport. 1118 */ 1119 if (tcp->close_reason==PJ_SUCCESS) 1120 tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); 1121 pjsip_transport_shutdown(&tcp->base); 1122 838 1123 return; 839 1124 840 } else if (bytes_read < 0) { 1125 //} else if (bytes_read < 0) { 1126 } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && 1127 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && 1128 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) 1129 { 841 1130 842 1131 /* Report error to endpoint. */ 843 1132 PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, 844 1133 rdata->tp_info.transport->obj_name, 845 -bytes_read, "tcp recv() error")); 846 847 /* Transport error, close transport */ 848 tcp_destroy(&tp->base); 1134 -bytes_read, "TCP recv() error")); 1135 1136 /* We can not destroy the transport since high level objects may 1137 * still keep reference to this transport. So we can only 1138 * instruct transport manager to gracefully start the shutdown 1139 * procedure for this transport. 1140 */ 1141 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read; 1142 pjsip_transport_shutdown(&tcp->base); 1143 849 1144 return; 850 1145 } 851 1146 852 1147 if (i >= MAX_IMMEDIATE_PACKET) { 853 /* Force ioqueue_recv() to return PJ_EPENDING */ 1148 /* Receive quota reached. Force ioqueue_recv() to 1149 * return PJ_EPENDING 1150 */ 854 1151 flags = PJ_IOQUEUE_ALWAYS_ASYNC; 855 1152 } else { … … 868 1165 869 1166 if (status == PJ_SUCCESS) { 1167 870 1168 /* Continue loop. */ 871 1169 pj_assert(i < MAX_IMMEDIATE_PACKET); … … 880 1178 status, "tcp recv() error")); 881 1179 882 /* Transport error, close transport */ 883 tcp_destroy(&tp->base); 1180 /* We can not destroy the transport since high level objects may 1181 * still keep reference to this transport. So we can only 1182 * instruct transport manager to gracefully start the shutdown 1183 * procedure for this transport. 1184 */ 1185 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 1186 pjsip_transport_shutdown(&tcp->base); 1187 884 1188 return; 885 1189 } … … 888 1192 889 1193 890 /* Callback from ioqueue when connect completes */ 1194 /* 1195 * Callback from ioqueue when asynchronous connect() operation completes. 1196 */ 891 1197 static void on_connect_complete(pj_ioqueue_key_t *key, 892 1198 pj_status_t status) 893 1199 { 894 struct pending_connect *connect_op = (struct pending_connect *)key; 895 struct tcp_transport *tcp = connect_op->transport; 1200 struct tcp_transport *tcp; 896 1201 pj_sockaddr_in addr; 897 1202 int addrlen; 898 1203 1204 tcp = pj_ioqueue_get_user_data(key); 1205 1206 PJ_LOG(4,(tcp->base.obj_name, 1207 "TCP transport %.*s:%d is connected to %.*s:%d", 1208 (int)tcp->base.local_name.host.slen, 1209 tcp->base.local_name.host.ptr, 1210 tcp->base.local_name.port, 1211 (int)tcp->base.remote_name.host.slen, 1212 tcp->base.remote_name.host.ptr, 1213 tcp->base.remote_name.port)); 1214 899 1215 /* Mark that pending connect() operation has completed. */ 900 1216 tcp->has_pending_connect = PJ_FALSE; … … 902 1218 /* Check connect() status */ 903 1219 if (status != PJ_SUCCESS) { 1220 904 1221 tcp_perror(tcp->base.obj_name, "TCP connect() error", status); 905 tcp_destroy(&tcp->base); 1222 1223 /* We can not destroy the transport since high level objects may 1224 * still keep reference to this transport. So we can only 1225 * instruct transport manager to gracefully start the shutdown 1226 * procedure for this transport. 1227 */ 1228 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 1229 pjsip_transport_shutdown(&tcp->base); 906 1230 return; 907 1231 } … … 926 1250 status = tcp_start_read(tcp); 927 1251 if (status != PJ_SUCCESS) { 928 tcp_destroy(&tcp->base); 1252 /* We can not destroy the transport since high level objects may 1253 * still keep reference to this transport. So we can only 1254 * instruct transport manager to gracefully start the shutdown 1255 * procedure for this transport. 1256 */ 1257 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 1258 pjsip_transport_shutdown(&tcp->base); 929 1259 return; 930 1260 }
Note: See TracChangeset
for help on using the changeset viewer.