Ignore:
Timestamp:
Jun 28, 2006 4:46:49 PM (18 years ago)
Author:
bennylp
Message:

Major improvements in PJSIP to support TCP. The changes fall into these categories: (1) the TCP transport implementation itself (*.[hc]), (2) bug-fix in SIP transaction when using reliable transports, (3) support for TCP transport in PJSUA-LIB/PJSUA, and (4) changes in PJSIP-TEST to support TCP testing.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjsip/src/pjsip/sip_transport_tcp.c

    r554 r563  
    2121#include <pjsip/sip_errno.h> 
    2222#include <pj/compat/socket.h> 
     23#include <pj/addr_resolv.h> 
    2324#include <pj/assert.h> 
    2425#include <pj/ioqueue.h> 
     
    3435#define MAX_ASYNC_CNT   16 
    3536#define POOL_LIS_INIT   4000 
    36 #define POOL_LIS_INC    4000 
     37#define POOL_LIS_INC    4001 
    3738#define POOL_TP_INIT    4000 
    38 #define POOL_TP_INC     4000 
     39#define POOL_TP_INC     4002 
    3940 
    4041 
     
    4344 
    4445 
     46/* 
     47 * This structure is "descendant" of pj_ioqueue_op_key_t, and it is used to 
     48 * track pending/asynchronous accept() operation. TCP transport may have 
     49 * more than one pending accept() operations, depending on the value of 
     50 * async_cnt. 
     51 */ 
    4552struct pending_accept 
    4653{ 
     
    5360}; 
    5461 
    55 struct pending_connect 
    56 { 
    57     pj_ioqueue_op_key_t      op_key; 
    58     struct tcp_transport    *transport; 
    59 }; 
    60  
    61  
     62 
     63/* 
     64 * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the 
     65 * SIP transport factory). 
     66 */ 
    6267struct tcp_listener 
    6368{ 
    6469    pjsip_tpfactory          factory; 
    65     char                     name[PJ_MAX_OBJ_NAME]; 
    66     pj_bool_t                active; 
     70    char                     obj_name[PJ_MAX_OBJ_NAME]; 
     71    pj_bool_t                is_registered; 
    6772    pjsip_endpoint          *endpt; 
    6873    pjsip_tpmgr             *tpmgr; 
     
    7479 
    7580 
    76 struct pending_tdata 
    77 { 
    78     PJ_DECL_LIST_MEMBER(struct pending_tdata); 
     81/* 
     82 * This structure is used to keep delayed transmit operation in a list. 
     83 * A delayed transmission occurs when application sends tx_data when 
     84 * the TCP connect/establishment is still in progress. These delayed 
     85 * transmission will be "flushed" once the socket is connected (either 
     86 * successfully or with errors). 
     87 */ 
     88struct delayed_tdata 
     89{ 
     90    PJ_DECL_LIST_MEMBER(struct delayed_tdata); 
    7991    pjsip_tx_data_op_key    *tdata_op_key; 
    8092}; 
    8193 
    8294 
     95/* 
     96 * This structure describes the TCP transport, and it's descendant of 
     97 * pjsip_transport. 
     98 */ 
    8399struct tcp_transport 
    84100{ 
    85101    pjsip_transport          base; 
     102    pj_bool_t                is_server; 
    86103    struct tcp_listener     *listener; 
    87104    pj_bool_t                is_registered; 
    88105    pj_bool_t                is_closing; 
     106    pj_status_t              close_reason; 
    89107    pj_sock_t                sock; 
    90108    pj_ioqueue_key_t        *key; 
    91109    pj_bool_t                has_pending_connect; 
    92     struct pending_connect   connect_op; 
    93110 
    94111 
     
    100117 
    101118    /* Pending transmission list. */ 
    102     struct pending_tdata     tx_list; 
     119    struct delayed_tdata     delayed_list; 
    103120}; 
    104121 
    105122 
    106 /* 
    107  * This callback is called when #pj_ioqueue_accept completes. 
    108  */ 
     123/**************************************************************************** 
     124 * PROTOTYPES 
     125 */ 
     126 
     127/* This callback is called when pending accept() operation completes. */ 
    109128static void on_accept_complete( pj_ioqueue_key_t *key,  
    110129                                pj_ioqueue_op_key_t *op_key,  
     
    112131                                pj_status_t status); 
    113132 
    114 static pj_status_t lis_destroy(struct tcp_listener *listener); 
     133/* This callback is called by transport manager to destroy listener */ 
     134static pj_status_t lis_destroy(pjsip_tpfactory *factory); 
     135 
     136/* This callback is called by transport manager to create transport */ 
    115137static pj_status_t lis_create_transport(pjsip_tpfactory *factory, 
    116138                                        pjsip_tpmgr *mgr, 
     
    120142                                        pjsip_transport **transport); 
    121143 
    122  
    123 static pj_status_t create_tcp_transport(struct tcp_listener *listener, 
    124                                         pj_sock_t sock, 
    125                                         const pj_sockaddr_in *local, 
    126                                         const pj_sockaddr_in *remote, 
    127                                         struct tcp_transport **p_tcp); 
     144/* Common function to create and initialize transport */ 
     145static pj_status_t tcp_create(struct tcp_listener *listener, 
     146                              pj_sock_t sock, pj_bool_t is_server, 
     147                              const pj_sockaddr_in *local, 
     148                              const pj_sockaddr_in *remote, 
     149                              struct tcp_transport **p_tcp); 
    128150 
    129151 
     
    139161 
    140162 
     163static void sockaddr_to_host_port( pj_pool_t *pool, 
     164                                   pjsip_host_port *host_port, 
     165                                   const pj_sockaddr_in *addr ) 
     166{ 
     167    host_port->host.ptr = pj_pool_alloc(pool, 48); 
     168    host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s",  
     169                                            pj_inet_ntoa(addr->sin_addr)); 
     170    host_port->port = pj_ntohs(addr->sin_port); 
     171} 
     172 
     173 
     174 
     175/**************************************************************************** 
     176 * The TCP listener/transport factory. 
     177 */ 
     178 
     179/* 
     180 * This is the public API to create, initialize, register, and start the 
     181 * TCP listener. 
     182 */ 
    141183PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, 
    142184                                               const pj_sockaddr_in *local, 
    143                                                unsigned async_cnt) 
     185                                               unsigned async_cnt, 
     186                                               pjsip_tpfactory **p_factory) 
    144187{ 
    145188    pj_pool_t *pool; 
    146189    struct tcp_listener *listener; 
    147190    pj_ioqueue_callback listener_cb; 
     191    pj_sockaddr_in *listener_addr; 
     192    int addr_len; 
    148193    unsigned i; 
    149194    pj_status_t status; 
    150195 
    151196    /* Sanity check */ 
    152     PJ_ASSERT_RETURN(endpt && local && async_cnt, PJ_EINVAL); 
     197    PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL); 
    153198 
    154199 
     
    159204 
    160205    listener = pj_pool_zalloc(pool, sizeof(struct tcp_listener)); 
    161     pj_ansi_sprintf(listener->name, "tcp:%d", (int)pj_ntohs(local->sin_port)); 
    162206    listener->factory.pool = pool; 
    163207    listener->factory.type = PJSIP_TRANSPORT_TCP; 
    164     pj_ansi_strcpy(listener->factory.type_name, "tcp"); 
     208    listener->factory.type_name = "tcp"; 
    165209    listener->factory.flag =  
    166210        pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); 
    167211    listener->sock = PJ_INVALID_SOCKET; 
     212 
     213    pj_ansi_strcpy(listener->obj_name, "tcp"); 
    168214 
    169215    status = pj_lock_create_recursive_mutex(pool, "tcplis",  
     
    178224        goto on_error; 
    179225 
    180     pj_memcpy(&listener->factory.local_addr, local, sizeof(pj_sockaddr_in)); 
    181     status = pj_sock_bind(listener->sock, local, sizeof(*local)); 
     226    listener_addr = (pj_sockaddr_in*)&listener->factory.local_addr; 
     227    if (local) { 
     228        pj_memcpy(listener_addr, local, sizeof(pj_sockaddr_in)); 
     229    } else { 
     230        pj_sockaddr_in_init(listener_addr, NULL, 0); 
     231    } 
     232 
     233    status = pj_sock_bind(listener->sock, listener_addr,  
     234                          sizeof(pj_sockaddr_in)); 
    182235    if (status != PJ_SUCCESS) 
    183236        goto on_error; 
     237 
     238    /* Retrieve the bound address */ 
     239    addr_len = sizeof(pj_sockaddr_in); 
     240    status = pj_sock_getsockname(listener->sock, listener_addr, &addr_len); 
     241    if (status != PJ_SUCCESS) 
     242        goto on_error; 
     243 
     244    /* If the address returns 0.0.0.0, use the first interface address 
     245     * as the transport's address. 
     246     */ 
     247    if (listener_addr->sin_addr.s_addr == 0) { 
     248        const pj_str_t *hostname; 
     249        struct pj_hostent he; 
     250 
     251        hostname = pj_gethostname(); 
     252        status = pj_gethostbyname(hostname, &he); 
     253        if (status != PJ_SUCCESS) 
     254            goto on_error; 
     255 
     256        listener_addr->sin_addr = *(pj_in_addr*)he.h_addr; 
     257    } 
     258 
     259    pj_ansi_sprintf(listener->obj_name, "tcp:%d",  
     260                     (int)pj_ntohs(listener_addr->sin_port)); 
     261 
     262    /* Save the address name */ 
     263    sockaddr_to_host_port(listener->factory.pool,  
     264                          &listener->factory.addr_name, listener_addr); 
     265 
     266    /* Start listening to the address */ 
     267    status = pj_sock_listen(listener->sock, PJSIP_TCP_TRANSPORT_BACKLOG); 
     268    if (status != PJ_SUCCESS) 
     269        goto on_error; 
     270 
    184271 
    185272    /* Register socket to ioqeuue */ 
     
    192279        goto on_error; 
    193280 
    194     /* Start pending accept() operation */ 
     281    /* Register to transport manager */ 
     282    listener->endpt = endpt; 
     283    listener->tpmgr = pjsip_endpt_get_tpmgr(endpt); 
     284    listener->factory.create_transport = lis_create_transport; 
     285    listener->factory.destroy = lis_destroy; 
     286    listener->is_registered = PJ_TRUE; 
     287    status = pjsip_tpmgr_register_tpfactory(listener->tpmgr, 
     288                                            &listener->factory); 
     289    if (status != PJ_SUCCESS) { 
     290        listener->is_registered = PJ_FALSE; 
     291        goto on_error; 
     292    } 
     293 
     294 
     295    /* Start pending accept() operations */ 
    195296    if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; 
    196297    listener->async_cnt = async_cnt; 
     
    201302        listener->accept_op[i].listener = listener; 
    202303 
    203         status = pj_ioqueue_accept(listener->key,  
    204                                    &listener->accept_op[i].op_key, 
    205                                    &listener->accept_op[i].new_sock, 
    206                                    &listener->accept_op[i].local_addr, 
    207                                    &listener->accept_op[i].remote_addr, 
    208                                    &listener->accept_op[i].addr_len); 
    209         if (status != PJ_SUCCESS && status != PJ_EPENDING) 
    210             goto on_error; 
    211     } 
    212  
    213     /* Register to transport manager */ 
    214     listener->endpt = endpt; 
    215     listener->tpmgr = pjsip_endpt_get_tpmgr(endpt); 
    216     listener->factory.create_transport = lis_create_transport; 
    217     status = pjsip_tpmgr_register_tpfactory(listener->tpmgr, 
    218                                             &listener->factory); 
    219     if (status != PJ_SUCCESS) 
    220         goto on_error; 
    221  
    222     /* Done! */ 
    223     listener->active = PJ_TRUE; 
    224  
    225     PJ_LOG(4,(listener->name,  
    226              "SIP TCP transport listening for incoming connections at %s:%d", 
    227              pj_inet_ntoa(local->sin_addr), (int)pj_ntohs(local->sin_port))); 
     304        on_accept_complete(listener->key, &listener->accept_op[i].op_key, 
     305                           listener->sock, PJ_EPENDING); 
     306    } 
     307 
     308    PJ_LOG(4,(listener->obj_name,  
     309             "SIP TCP listener ready for incoming connections at %s:%d", 
     310             pj_inet_ntoa(listener_addr->sin_addr),  
     311             (int)pj_ntohs(listener_addr->sin_port))); 
     312 
     313    /* Return the pointer to user */ 
     314    if (p_factory) *p_factory = &listener->factory; 
    228315 
    229316    return PJ_SUCCESS; 
    230317 
    231318on_error: 
    232     lis_destroy(listener); 
     319    lis_destroy(&listener->factory); 
    233320    return status; 
    234321} 
    235322 
    236323 
    237 static pj_status_t lis_destroy(struct tcp_listener *listener) 
    238 { 
    239     if (listener->active) { 
     324/* This callback is called by transport manager to destroy listener */ 
     325static pj_status_t lis_destroy(pjsip_tpfactory *factory) 
     326{ 
     327    struct tcp_listener *listener = (struct tcp_listener *)factory; 
     328 
     329    if (listener->is_registered) { 
    240330        pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory); 
    241         listener->active = PJ_FALSE; 
     331        listener->is_registered = PJ_FALSE; 
    242332    } 
    243333 
     
    259349 
    260350    if (listener->factory.pool) { 
    261         PJ_LOG(4,(listener->name,  "SIP TCP transport destroyed")); 
    262         pj_pool_release(listener->factory.pool); 
     351        pj_pool_t *pool = listener->factory.pool; 
     352 
     353        PJ_LOG(4,(listener->obj_name,  "SIP TCP listener destroyed")); 
     354 
    263355        listener->factory.pool = NULL; 
     356        pj_pool_release(pool); 
    264357    } 
    265358 
     
    289382static pj_status_t tcp_shutdown(pjsip_transport *transport); 
    290383 
    291 /* Called by transport manager to destroy */ 
    292 static pj_status_t tcp_destroy(pjsip_transport *transport); 
     384/* Called by transport manager to destroy transport */ 
     385static pj_status_t tcp_destroy_transport(pjsip_transport *transport); 
     386 
     387/* Utility to destroy transport */ 
     388static pj_status_t tcp_destroy(pjsip_transport *transport, 
     389                               pj_status_t reason); 
    293390 
    294391/* Callback from ioqueue on incoming packet */ 
     
    307404 
    308405 
    309 static void sockaddr_to_host_port( pj_pool_t *pool, 
    310                                    pjsip_host_port *host_port, 
    311                                    const pj_sockaddr_in *addr ) 
    312 { 
    313     host_port->host.ptr = pj_pool_alloc(pool, 48); 
    314     host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s",  
    315                                             pj_inet_ntoa(addr->sin_addr)); 
    316     host_port->port = pj_ntohs(addr->sin_port); 
    317 } 
    318  
    319  
    320406/* 
    321  * Utilities to create TCP transport. 
    322  */ 
    323 static pj_status_t create_tcp_transport(struct tcp_listener *listener, 
    324                                         pj_sock_t sock, 
    325                                         const pj_sockaddr_in *local, 
    326                                         const pj_sockaddr_in *remote, 
    327                                         struct tcp_transport **p_tcp) 
     407 * Common function to create TCP transport, called when pending accept() and 
     408 * pending connect() complete. 
     409 */ 
     410static pj_status_t tcp_create( struct tcp_listener *listener, 
     411                               pj_sock_t sock, pj_bool_t is_server, 
     412                               const pj_sockaddr_in *local, 
     413                               const pj_sockaddr_in *remote, 
     414                               struct tcp_transport **p_tcp) 
    328415{ 
    329416    struct tcp_transport *tcp; 
     
    333420    pj_status_t status; 
    334421     
    335     pool = pjsip_endpt_create_pool(listener->endpt, "tcp",  
     422 
     423    PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL); 
     424 
     425 
     426    pool = pjsip_endpt_create_pool(listener->endpt, "tcp", 
    336427                                   POOL_TP_INIT, POOL_TP_INC); 
     428    PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM); 
    337429     
     430 
    338431    /* 
    339432     * Create and initialize basic transport structure. 
     
    341434    tcp = pj_pool_zalloc(pool, sizeof(*tcp)); 
    342435    tcp->sock = sock; 
     436    tcp->is_server = is_server; 
    343437    tcp->listener = listener; 
    344     pj_list_init(&tcp->tx_list); 
    345  
    346  
    347     pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, "tcp%p", tcp); 
     438    pj_list_init(&tcp->delayed_list); 
    348439    tcp->base.pool = pool; 
    349440 
     441    pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME,  
     442                     (is_server ? "tcps%p" :"tcpc%p"), tcp); 
     443 
    350444    status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt); 
    351     if (status != PJ_SUCCESS) 
     445    if (status != PJ_SUCCESS) { 
    352446        goto on_error; 
     447    } 
    353448 
    354449    status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock); 
    355     if (status != PJ_SUCCESS) 
     450    if (status != PJ_SUCCESS) { 
    356451        goto on_error; 
     452    } 
    357453 
    358454    tcp->base.key.type = PJSIP_TRANSPORT_TCP; 
     
    375471    tcp->base.send_msg = &tcp_send_msg; 
    376472    tcp->base.do_shutdown = &tcp_shutdown; 
    377     tcp->base.destroy = &tcp_destroy; 
     473    tcp->base.destroy = &tcp_destroy_transport; 
    378474 
    379475 
     
    387483    status = pj_ioqueue_register_sock(pool, ioqueue, sock,  
    388484                                      tcp, &tcp_callback, &tcp->key); 
    389     if (status != PJ_SUCCESS) 
     485    if (status != PJ_SUCCESS) { 
    390486        goto on_error; 
     487    } 
    391488 
    392489    /* Register transport to transport manager */ 
    393490    status = pjsip_transport_register(listener->tpmgr, &tcp->base); 
    394     if (status != PJ_SUCCESS) 
     491    if (status != PJ_SUCCESS) { 
    395492        goto on_error; 
     493    } 
    396494 
    397495    tcp->is_registered = PJ_TRUE; 
     
    400498    *p_tcp = tcp; 
    401499 
     500    PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created", 
     501              (tcp->is_server ? "server" : "client"))); 
     502 
     503    return PJ_SUCCESS; 
     504 
    402505on_error: 
    403     tcp_destroy(&tcp->base); 
     506    tcp_destroy(&tcp->base, status); 
    404507    return status; 
    405508} 
    406509 
    407510 
    408 /* Flush all pending send operations */ 
    409 static tcp_flush_pending_tx(struct tcp_transport *tcp) 
     511/* Flush all delayed transmision once the socket is connected. */ 
     512static void tcp_flush_pending_tx(struct tcp_transport *tcp) 
    410513{ 
    411514    pj_lock_acquire(tcp->base.lock); 
    412     while (!pj_list_empty(&tcp->tx_list)) { 
    413         struct pending_tdata *pending_tx; 
     515    while (!pj_list_empty(&tcp->delayed_list)) { 
     516        struct delayed_tdata *pending_tx; 
    414517        pjsip_tx_data *tdata; 
    415518        pj_ioqueue_op_key_t *op_key; 
     
    417520        pj_status_t status; 
    418521 
    419         pending_tx = tcp->tx_list.next; 
     522        pending_tx = tcp->delayed_list.next; 
    420523        pj_list_erase(pending_tx); 
    421524 
     
    437540 
    438541 
     542/* Called by transport manager to destroy transport */ 
     543static pj_status_t tcp_destroy_transport(pjsip_transport *transport) 
     544{ 
     545    struct tcp_transport *tcp = (struct tcp_transport*)transport; 
     546 
     547    /* Transport would have been unregistered by now since this callback 
     548     * is called by transport manager. 
     549     */ 
     550    tcp->is_registered = PJ_FALSE; 
     551 
     552    return tcp_destroy(transport, tcp->close_reason); 
     553} 
     554 
    439555 
    440556/* Destroy TCP transport */ 
    441 static pj_status_t tcp_destroy(pjsip_transport *transport) 
     557static pj_status_t tcp_destroy(pjsip_transport *transport,  
     558                               pj_status_t reason) 
    442559{ 
    443560    struct tcp_transport *tcp = (struct tcp_transport*)transport; 
    444561 
    445     /* Cancel all pending transmits */ 
    446     while (!pj_list_empty(&tcp->tx_list)) { 
    447         struct pending_tdata *pending_tx; 
     562    if (tcp->close_reason == 0) 
     563        tcp->close_reason = reason; 
     564 
     565    if (tcp->is_registered) { 
     566        tcp->is_registered = PJ_FALSE; 
     567        pjsip_transport_destroy(transport); 
     568 
     569        /* pjsip_transport_destroy will recursively call this function 
     570         * again. 
     571         */ 
     572        return PJ_SUCCESS; 
     573    } 
     574 
     575    /* Mark transport as closing */ 
     576    tcp->is_closing = PJ_TRUE; 
     577 
     578    /* Cancel all delayed transmits */ 
     579    while (!pj_list_empty(&tcp->delayed_list)) { 
     580        struct delayed_tdata *pending_tx; 
    448581        pj_ioqueue_op_key_t *op_key; 
    449582 
    450         pending_tx = tcp->tx_list.next; 
     583        pending_tx = tcp->delayed_list.next; 
    451584        pj_list_erase(pending_tx); 
    452585 
    453586        op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 
    454587 
    455         on_write_complete(tcp->key, op_key,  
    456                           -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN)); 
    457     } 
    458  
    459     if (tcp->is_registered) { 
    460         pjsip_transport_destroy(transport); 
    461         tcp->is_registered = PJ_FALSE; 
     588        on_write_complete(tcp->key, op_key, -reason); 
    462589    } 
    463590 
     
    470597        pj_ioqueue_unregister(tcp->key); 
    471598        tcp->key = NULL; 
     599        tcp->sock = PJ_INVALID_SOCKET; 
     600    } 
     601 
     602    if (tcp->sock != PJ_INVALID_SOCKET) { 
     603        pj_sock_close(tcp->sock); 
     604        tcp->sock = PJ_INVALID_SOCKET; 
    472605    } 
    473606 
     
    483616 
    484617    if (tcp->base.pool) { 
    485         PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed")); 
    486         pj_pool_release(tcp->base.pool); 
     618        pj_pool_t *pool; 
     619 
     620        if (reason != PJ_SUCCESS) { 
     621            char errmsg[PJ_ERR_MSG_SIZE]; 
     622 
     623            pj_strerror(reason, errmsg, sizeof(errmsg)); 
     624            PJ_LOG(4,(tcp->base.obj_name,  
     625                      "TCP transport destroyed with reason %d: %s",  
     626                      reason, errmsg)); 
     627 
     628        } else { 
     629 
     630            PJ_LOG(4,(tcp->base.obj_name,  
     631                      "TCP transport destroyed normally")); 
     632 
     633        } 
     634 
     635        pool = tcp->base.pool; 
    487636        tcp->base.pool = NULL; 
     637        pj_pool_release(pool); 
    488638    } 
    489639 
     
    494644/* 
    495645 * This utility function creates receive data buffers and start 
    496  * asynchronous recv() operations from the socket. 
     646 * asynchronous recv() operations from the socket. It is called after 
     647 * accept() or connect() operation complete. 
    497648 */ 
    498649static pj_status_t tcp_start_read(struct tcp_transport *tcp) 
     
    532683                             tcp->rdata.pkt_info.packet, &size, 
    533684                             PJ_IOQUEUE_ALWAYS_ASYNC); 
    534     if (status != PJ_SUCCESS) { 
     685    if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
    535686        tcp_perror(tcp->base.obj_name, "ioqueue recv() error", status); 
    536687        return status; 
     
    594745 
    595746    /* Create the transport descriptor */ 
    596     status = create_tcp_transport(listener, sock, &local_addr,  
    597                                   (pj_sockaddr_in*)rem_addr, &tcp); 
     747    status = tcp_create(listener, sock, PJ_FALSE, &local_addr,  
     748                        (pj_sockaddr_in*)rem_addr, &tcp); 
    598749    if (status != PJ_SUCCESS) 
    599750        return status; 
    600          
     751 
     752 
    601753    /* Start asynchronous connect() operation */ 
    602754    tcp->has_pending_connect = PJ_TRUE; 
    603     pj_ioqueue_op_key_init(&tcp->connect_op.op_key,  
    604                            sizeof(tcp->connect_op.op_key)); 
    605     tcp->connect_op.transport = tcp; 
    606755    status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in)); 
    607     if (status != PJ_SUCCESS) { 
    608         tcp_destroy(&tcp->base); 
     756    if (status == PJ_SUCCESS) { 
     757        tcp->has_pending_connect = PJ_FALSE; 
     758    } else if (status != PJ_EPENDING) { 
     759        tcp_destroy(&tcp->base, status); 
    609760        return status; 
    610761    } 
     
    630781    } 
    631782 
     783    if (tcp->has_pending_connect) { 
     784        PJ_LOG(4,(tcp->base.obj_name,  
     785                  "TCP transport %.*s:%d is connecting to %.*s:%d...", 
     786                  (int)tcp->base.local_name.host.slen, 
     787                  tcp->base.local_name.host.ptr, 
     788                  tcp->base.local_name.port, 
     789                  (int)tcp->base.remote_name.host.slen, 
     790                  tcp->base.remote_name.host.ptr, 
     791                  tcp->base.remote_name.port)); 
     792    } 
     793 
    632794    /* Done */ 
    633795    *p_transport = &tcp->base; 
     
    654816    accept_op = (struct pending_accept*) op_key; 
    655817 
     818    /* 
     819     * Loop while there is immediate connection or when there is error. 
     820     */ 
    656821    do { 
    657         if (status != PJ_SUCCESS) { 
    658             tcp_perror(listener->name, "Error in accept()", status); 
    659  
     822        if (status == PJ_EPENDING) { 
     823            /* 
     824             * This can only happen when this function is called during 
     825             * initialization to kick off asynchronous accept(). 
     826             */ 
     827 
     828        } else if (status != PJ_SUCCESS) { 
     829 
     830            /* 
     831             * Error in accept(). 
     832             */ 
     833            tcp_perror(listener->obj_name, "Error in accept()", status); 
     834 
     835            /* 
     836             * Prevent endless accept() error loop by limiting the 
     837             * number of consecutive errors. Once the number of errors 
     838             * is equal to maximum, we treat this as permanent error, and 
     839             * we stop the accept() operation. 
     840             */ 
    660841            ++err_cnt; 
    661             if (err_cnt >= 5) { 
    662                 PJ_LOG(1, (listener->name,  
     842            if (err_cnt >= 10) { 
     843                PJ_LOG(1, (listener->obj_name,  
    663844                           "Too many errors, listener stopping")); 
    664845            } 
    665846 
    666             goto start_next_accept; 
    667         } 
    668  
    669         status = create_tcp_transport( listener, sock,  
    670                                        &accept_op->local_addr,  
    671                                        &accept_op->remote_addr, &tcp); 
    672         if (status == PJ_SUCCESS) { 
    673             status = tcp_start_read(tcp); 
    674             if (status != PJ_SUCCESS) { 
    675                 PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); 
    676                 tcp_destroy(&tcp->base); 
     847        } else { 
     848 
     849            if (sock == PJ_INVALID_SOCKET) { 
     850                sock = accept_op->new_sock; 
     851                PJ_LOG(4,(listener->obj_name,  
     852                          "Warning: ioqueue reports -1 in on_accept_complete()" 
     853                          " sock argument")); 
     854            } 
     855 
     856            PJ_LOG(4,(listener->obj_name,  
     857                      "TCP listener %.*s:%d: got incoming TCP connection " 
     858                      "from %s:%d, sock=%d", 
     859                      (int)listener->factory.addr_name.host.slen, 
     860                      listener->factory.addr_name.host.ptr, 
     861                      listener->factory.addr_name.port, 
     862                      pj_inet_ntoa(accept_op->remote_addr.sin_addr), 
     863                      pj_ntohs(accept_op->remote_addr.sin_port), 
     864                      sock)); 
     865 
     866            /*  
     867             * Incoming connections! 
     868             * Create TCP transport for the new socket. 
     869             */ 
     870            status = tcp_create( listener, sock, PJ_TRUE, 
     871                                 &accept_op->local_addr,  
     872                                 &accept_op->remote_addr, &tcp); 
     873            if (status == PJ_SUCCESS) { 
     874                status = tcp_start_read(tcp); 
     875                if (status != PJ_SUCCESS) { 
     876                    PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); 
     877                    tcp_destroy(&tcp->base, status); 
     878                } 
    677879            } 
    678880        } 
    679881 
    680 start_next_accept: 
     882        /* 
     883         * Start the next asynchronous accept() operation. 
     884         */ 
     885        accept_op->addr_len = sizeof(pj_sockaddr_in); 
     886        accept_op->new_sock = PJ_INVALID_SOCKET; 
    681887 
    682888        status = pj_ioqueue_accept(listener->key,  
     
    687893                                   &accept_op->addr_len); 
    688894 
     895        /* 
     896         * Loop while we have immediate connection or when there is error. 
     897         */ 
     898 
    689899    } while (status != PJ_EPENDING); 
    690900} 
    691901 
    692902 
    693 /* Callback from ioqueue when packet is sent */ 
     903/*  
     904 * Callback from ioqueue when packet is sent. 
     905 */ 
    694906static void on_write_complete(pj_ioqueue_key_t *key,  
    695907                              pj_ioqueue_op_key_t *op_key,  
    696908                              pj_ssize_t bytes_sent) 
    697909{ 
    698     struct tcp_transport *tp = pj_ioqueue_get_user_data(key); 
     910    struct tcp_transport *tcp = pj_ioqueue_get_user_data(key); 
    699911    pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; 
    700912 
    701913    tdata_op_key->tdata = NULL; 
    702914 
     915    /* Check for error/closure */ 
     916    if (bytes_sent <= 0) { 
     917        pj_status_t status; 
     918 
     919        PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",  
     920                  bytes_sent)); 
     921 
     922        status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) : 
     923                                     -bytes_sent; 
     924        if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 
     925        pjsip_transport_shutdown(&tcp->base); 
     926    } 
     927 
    703928    if (tdata_op_key->callback) { 
    704         tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent); 
    705     } 
    706 } 
    707  
    708  
    709 /* This callback is called by transport manager to send SIP message */ 
     929        /* 
     930         * Notify sip_transport.c that packet has been sent. 
     931         */ 
     932        tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent); 
     933    } 
     934} 
     935 
     936 
     937/*  
     938 * This callback is called by transport manager to send SIP message  
     939 */ 
    710940static pj_status_t tcp_send_msg(pjsip_transport *transport,  
    711941                                pjsip_tx_data *tdata, 
     
    719949    struct tcp_transport *tcp = (struct tcp_transport*)transport; 
    720950    pj_ssize_t size; 
    721     pj_status_t status; 
     951    pj_bool_t delayed = PJ_FALSE; 
     952    pj_status_t status = PJ_SUCCESS; 
    722953 
    723954    /* Sanity check */ 
     
    738969 
    739970    /* If asynchronous connect() has not completed yet, just put the 
    740      * transmit data in the pending transmission list. 
     971     * transmit data in the pending transmission list since we can not 
     972     * use the socket yet. 
    741973     */ 
    742     pj_lock_acquire(tcp->base.lock); 
    743  
    744974    if (tcp->has_pending_connect) { 
    745         struct pending_tdata *pending_tdata; 
    746  
    747         /* Pust to list */ 
    748         pending_tdata = pj_pool_alloc(tdata->pool, sizeof(*pending_tdata)); 
    749         pending_tdata->tdata_op_key = &tdata->op_key; 
    750  
    751         pj_list_push_back(&tcp->tx_list, pending_tdata); 
    752         status = PJ_EPENDING; 
    753  
    754     } else { 
    755         /* send to ioqueue! */ 
     975 
     976        /* 
     977         * Looks like connect() is still in progress. Check again (this time 
     978         * with holding the lock) to be sure. 
     979         */ 
     980        pj_lock_acquire(tcp->base.lock); 
     981 
     982        if (tcp->has_pending_connect) { 
     983            struct delayed_tdata *delayed_tdata; 
     984 
     985            /* 
     986             * connect() is still in progress. Put the transmit data to 
     987             * the delayed list. 
     988             */ 
     989            delayed_tdata = pj_pool_alloc(tdata->pool,  
     990                                          sizeof(*delayed_tdata)); 
     991            delayed_tdata->tdata_op_key = &tdata->op_key; 
     992 
     993            pj_list_push_back(&tcp->delayed_list, delayed_tdata); 
     994            status = PJ_EPENDING; 
     995 
     996            /* Prevent pj_ioqueue_send() to be called below */ 
     997            delayed = PJ_TRUE; 
     998        } 
     999 
     1000        pj_lock_release(tcp->base.lock); 
     1001    }  
     1002     
     1003    if (!delayed) { 
     1004        /* 
     1005         * Transport is ready to go. Send the packet to ioqueue to be 
     1006         * sent asynchronously. 
     1007         */ 
    7561008        size = tdata->buf.cur - tdata->buf.start; 
    7571009        status = pj_ioqueue_send(tcp->key,  
     
    7591011                                 tdata->buf.start, &size, 0); 
    7601012 
    761         if (status != PJ_EPENDING) 
     1013        if (status != PJ_EPENDING) { 
     1014            /* Not pending (could be immediate success or error) */ 
    7621015            tdata->op_key.tdata = NULL; 
    763     } 
    764  
    765     pj_lock_release(tcp->base.lock); 
     1016 
     1017            /* Shutdown transport on closure/errors */ 
     1018            if (size <= 0) { 
     1019 
     1020                PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",  
     1021                          size)); 
     1022 
     1023                if (status == PJ_SUCCESS)  
     1024                    status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); 
     1025                if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 
     1026                pjsip_transport_shutdown(&tcp->base); 
     1027            } 
     1028        } 
     1029    } 
    7661030 
    7671031    return status; 
     
    7691033 
    7701034 
    771 /* This callback is called by transport manager to shutdown transport */ 
     1035/*  
     1036 * This callback is called by transport manager to shutdown transport. 
     1037 * This normally is only used by UDP transport. 
     1038 */ 
    7721039static pj_status_t tcp_shutdown(pjsip_transport *transport) 
    7731040{ 
     
    7801047 
    7811048 
    782 /* Callback from ioqueue on incoming packet */ 
     1049/*  
     1050 * Callback from ioqueue that an incoming data is received from the socket. 
     1051 */ 
    7831052static void on_read_complete(pj_ioqueue_key_t *key,  
    7841053                             pj_ioqueue_op_key_t *op_key,  
     
    7881057    pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; 
    7891058    pjsip_rx_data *rdata = rdata_op_key->rdata; 
    790     struct tcp_transport *tp = (struct tcp_transport*)rdata->tp_info.transport; 
     1059    struct tcp_transport *tcp =  
     1060        (struct tcp_transport*)rdata->tp_info.transport; 
    7911061    int i; 
    7921062    pj_status_t status; 
    7931063 
    7941064    /* Don't do anything if transport is closing. */ 
    795     if (tp->is_closing) { 
    796         tp->is_closing++; 
     1065    if (tcp->is_closing) { 
     1066        tcp->is_closing++; 
    7971067        return; 
    7981068    } 
     
    8071077        pj_uint32_t flags; 
    8081078 
    809         /* Report the packet to transport manager. */ 
     1079        /* Houston, we have packet! Report the packet to transport manager 
     1080         * to be parsed. 
     1081         */ 
    8101082        if (bytes_read > 0) { 
    8111083            pj_size_t size_eaten; 
     
    8161088            pj_gettimeofday(&rdata->pkt_info.timestamp); 
    8171089 
     1090            /* Report to transport manager. 
     1091             * The transport manager will tell us how many bytes of the packet 
     1092             * have been processed (as valid SIP message). 
     1093             */ 
    8181094            size_eaten =  
    8191095                pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,  
     
    8341110 
    8351111            /* Transport is closed */ 
    836             PJ_LOG(4,(tp->base.obj_name, "tcp connection closed")); 
    837             tcp_destroy(&tp->base); 
     1112            PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); 
     1113             
     1114            /* We can not destroy the transport since high level objects may 
     1115             * still keep reference to this transport. So we can only  
     1116             * instruct transport manager to gracefully start the shutdown 
     1117             * procedure for this transport. 
     1118             */ 
     1119            if (tcp->close_reason==PJ_SUCCESS)  
     1120                tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); 
     1121            pjsip_transport_shutdown(&tcp->base); 
     1122 
    8381123            return; 
    8391124 
    840         } else if (bytes_read < 0)  { 
     1125        //} else if (bytes_read < 0)  { 
     1126        } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && 
     1127                   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&  
     1128                   -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))  
     1129        { 
    8411130 
    8421131            /* Report error to endpoint. */ 
    8431132            PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, 
    8441133                                   rdata->tp_info.transport->obj_name, 
    845                                    -bytes_read, "tcp recv() error")); 
    846  
    847             /* Transport error, close transport */ 
    848             tcp_destroy(&tp->base); 
     1134                                   -bytes_read, "TCP recv() error")); 
     1135 
     1136            /* We can not destroy the transport since high level objects may 
     1137             * still keep reference to this transport. So we can only  
     1138             * instruct transport manager to gracefully start the shutdown 
     1139             * procedure for this transport. 
     1140             */ 
     1141            if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read; 
     1142            pjsip_transport_shutdown(&tcp->base); 
     1143 
    8491144            return; 
    8501145        } 
    8511146 
    8521147        if (i >= MAX_IMMEDIATE_PACKET) { 
    853             /* Force ioqueue_recv() to return PJ_EPENDING */ 
     1148            /* Receive quota reached. Force ioqueue_recv() to  
     1149             * return PJ_EPENDING  
     1150             */ 
    8541151            flags = PJ_IOQUEUE_ALWAYS_ASYNC; 
    8551152        } else { 
     
    8681165 
    8691166        if (status == PJ_SUCCESS) { 
     1167 
    8701168            /* Continue loop. */ 
    8711169            pj_assert(i < MAX_IMMEDIATE_PACKET); 
     
    8801178                                   status, "tcp recv() error")); 
    8811179 
    882             /* Transport error, close transport */ 
    883             tcp_destroy(&tp->base); 
     1180            /* We can not destroy the transport since high level objects may 
     1181             * still keep reference to this transport. So we can only  
     1182             * instruct transport manager to gracefully start the shutdown 
     1183             * procedure for this transport. 
     1184             */ 
     1185            if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 
     1186            pjsip_transport_shutdown(&tcp->base); 
     1187 
    8841188            return; 
    8851189        } 
     
    8881192 
    8891193 
    890 /* Callback from ioqueue when connect completes */ 
     1194/*  
     1195 * Callback from ioqueue when asynchronous connect() operation completes. 
     1196 */ 
    8911197static void on_connect_complete(pj_ioqueue_key_t *key,  
    8921198                                pj_status_t status) 
    8931199{ 
    894     struct pending_connect *connect_op = (struct pending_connect *)key; 
    895     struct tcp_transport *tcp = connect_op->transport; 
     1200    struct tcp_transport *tcp; 
    8961201    pj_sockaddr_in addr; 
    8971202    int addrlen; 
    8981203 
     1204    tcp = pj_ioqueue_get_user_data(key); 
     1205 
     1206    PJ_LOG(4,(tcp->base.obj_name,  
     1207              "TCP transport %.*s:%d is connected to %.*s:%d", 
     1208              (int)tcp->base.local_name.host.slen, 
     1209              tcp->base.local_name.host.ptr, 
     1210              tcp->base.local_name.port, 
     1211              (int)tcp->base.remote_name.host.slen, 
     1212              tcp->base.remote_name.host.ptr, 
     1213              tcp->base.remote_name.port)); 
     1214 
    8991215    /* Mark that pending connect() operation has completed. */ 
    9001216    tcp->has_pending_connect = PJ_FALSE; 
     
    9021218    /* Check connect() status */ 
    9031219    if (status != PJ_SUCCESS) { 
     1220 
    9041221        tcp_perror(tcp->base.obj_name, "TCP connect() error", status); 
    905         tcp_destroy(&tcp->base); 
     1222 
     1223        /* We can not destroy the transport since high level objects may 
     1224         * still keep reference to this transport. So we can only  
     1225         * instruct transport manager to gracefully start the shutdown 
     1226         * procedure for this transport. 
     1227         */ 
     1228        if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 
     1229        pjsip_transport_shutdown(&tcp->base); 
    9061230        return; 
    9071231    } 
     
    9261250    status = tcp_start_read(tcp); 
    9271251    if (status != PJ_SUCCESS) { 
    928         tcp_destroy(&tcp->base); 
     1252        /* We can not destroy the transport since high level objects may 
     1253         * still keep reference to this transport. So we can only  
     1254         * instruct transport manager to gracefully start the shutdown 
     1255         * procedure for this transport. 
     1256         */ 
     1257        if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 
     1258        pjsip_transport_shutdown(&tcp->base); 
    9291259        return; 
    9301260    } 
Note: See TracChangeset for help on using the changeset viewer.