Changeset 554


Ignore:
Timestamp:
Jun 26, 2006 10:05:37 AM (18 years ago)
Author:
bennylp
Message:

Finished initial SIP TCP transport support

Location:
pjproject/trunk/pjsip
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjsip/build/pjsip_core.dsp

    r550 r554  
    148148 
    149149SOURCE=..\src\pjsip\sip_transport_tcp.c 
    150 # PROP Exclude_From_Build 1 
    151150# End Source File 
    152151# Begin Source File 
  • pjproject/trunk/pjsip/src/pjsip/sip_transport_tcp.c

    r550 r554  
    1919#include <pjsip/sip_transport_tcp.h> 
    2020#include <pjsip/sip_endpoint.h> 
     21#include <pjsip/sip_errno.h> 
     22#include <pj/compat/socket.h> 
    2123#include <pj/assert.h> 
    22 #include <pj/errno.h> 
    2324#include <pj/ioqueue.h> 
    2425#include <pj/lock.h> 
     26#include <pj/log.h> 
     27#include <pj/os.h> 
    2528#include <pj/pool.h> 
    2629#include <pj/string.h> 
    2730 
    2831 
     32#define THIS_FILE       "sip_transport_tcp.c" 
     33 
    2934#define MAX_ASYNC_CNT   16 
    30 #define POOL_INIT       4000 
    31 #define POOL_INC        4000 
     35#define POOL_LIS_INIT   4000 
     36#define POOL_LIS_INC    4000 
     37#define POOL_TP_INIT    4000 
     38#define POOL_TP_INC     4000 
    3239 
    3340 
    3441struct tcp_listener; 
     42struct tcp_transport; 
     43 
    3544 
    3645struct pending_accept 
    3746{ 
    38     pj_ioqueue_op_key_t  op_key; 
    39     struct tcp_listener *listener; 
    40     pj_sock_t            new_sock; 
    41     int                  addr_len; 
    42     pj_sockaddr_in       local_addr; 
    43     pj_sockaddr_in       remote_addr; 
     47    pj_ioqueue_op_key_t      op_key; 
     48    struct tcp_listener     *listener; 
     49    pj_sock_t                new_sock; 
     50    int                      addr_len; 
     51    pj_sockaddr_in           local_addr; 
     52    pj_sockaddr_in           remote_addr; 
    4453}; 
    4554 
     55struct pending_connect 
     56{ 
     57    pj_ioqueue_op_key_t      op_key; 
     58    struct tcp_transport    *transport; 
     59}; 
     60 
     61 
    4662struct tcp_listener 
    4763{ 
    4864    pjsip_tpfactory          factory; 
     65    char                     name[PJ_MAX_OBJ_NAME]; 
    4966    pj_bool_t                active; 
     67    pjsip_endpoint          *endpt; 
    5068    pjsip_tpmgr             *tpmgr; 
    5169    pj_sock_t                sock; 
     
    5674 
    5775 
     76struct pending_tdata 
     77{ 
     78    PJ_DECL_LIST_MEMBER(struct pending_tdata); 
     79    pjsip_tx_data_op_key    *tdata_op_key; 
     80}; 
     81 
     82 
    5883struct tcp_transport 
    5984{ 
    6085    pjsip_transport          base; 
     86    struct tcp_listener     *listener; 
     87    pj_bool_t                is_registered; 
     88    pj_bool_t                is_closing; 
    6189    pj_sock_t                sock; 
     90    pj_ioqueue_key_t        *key; 
     91    pj_bool_t                has_pending_connect; 
     92    struct pending_connect   connect_op; 
     93 
     94 
     95    /* TCP transport can only have  one rdata! 
     96     * Otherwise chunks of incoming PDU may be received on different 
     97     * buffer. 
     98     */ 
     99    pjsip_rx_data            rdata; 
     100 
     101    /* Pending transmission list. */ 
     102    struct pending_tdata     tx_list; 
    62103}; 
    63104 
     
    71112                                pj_status_t status); 
    72113 
    73 static pj_status_t destroy_listener(struct tcp_listener *listener); 
    74  
    75 static pj_status_t create_transport(pjsip_tpfactory *factory, 
    76                                     pjsip_tpmgr *mgr, 
    77                                     pjsip_endpoint *endpt, 
    78                                     const pj_sockaddr *rem_addr, 
    79                                     int addr_len, 
    80                                     pjsip_transport **transport); 
     114static pj_status_t lis_destroy(struct tcp_listener *listener); 
     115static pj_status_t lis_create_transport(pjsip_tpfactory *factory, 
     116                                        pjsip_tpmgr *mgr, 
     117                                        pjsip_endpoint *endpt, 
     118                                        const pj_sockaddr *rem_addr, 
     119                                        int addr_len, 
     120                                        pjsip_transport **transport); 
     121 
     122 
     123static pj_status_t create_tcp_transport(struct tcp_listener *listener, 
     124                                        pj_sock_t sock, 
     125                                        const pj_sockaddr_in *local, 
     126                                        const pj_sockaddr_in *remote, 
     127                                        struct tcp_transport **p_tcp); 
     128 
     129 
     130static void tcp_perror(const char *sender, const char *title, 
     131                       pj_status_t status) 
     132{ 
     133    char errmsg[PJ_ERR_MSG_SIZE]; 
     134 
     135    pj_strerror(status, errmsg, sizeof(errmsg)); 
     136 
     137    PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status)); 
     138} 
     139 
    81140 
    82141PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, 
     
    94153 
    95154 
    96     pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_INIT, POOL_INC); 
     155    pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT,  
     156                                   POOL_LIS_INC); 
    97157    PJ_ASSERT_RETURN(pool, PJ_ENOMEM); 
    98158 
    99159 
    100160    listener = pj_pool_zalloc(pool, sizeof(struct tcp_listener)); 
     161    pj_ansi_sprintf(listener->name, "tcp:%d", (int)pj_ntohs(local->sin_port)); 
    101162    listener->factory.pool = pool; 
    102163    listener->factory.type = PJSIP_TRANSPORT_TCP; 
     
    146207                                   &listener->accept_op[i].remote_addr, 
    147208                                   &listener->accept_op[i].addr_len); 
    148         if (status != PJ_SUCCESS) 
     209        if (status != PJ_SUCCESS && status != PJ_EPENDING) 
    149210            goto on_error; 
    150211    } 
    151212 
    152213    /* Register to transport manager */ 
     214    listener->endpt = endpt; 
    153215    listener->tpmgr = pjsip_endpt_get_tpmgr(endpt); 
     216    listener->factory.create_transport = lis_create_transport; 
    154217    status = pjsip_tpmgr_register_tpfactory(listener->tpmgr, 
    155218                                            &listener->factory); 
     
    160223    listener->active = PJ_TRUE; 
    161224 
     225    PJ_LOG(4,(listener->name,  
     226             "SIP TCP transport listening for incoming connections at %s:%d", 
     227             pj_inet_ntoa(local->sin_addr), (int)pj_ntohs(local->sin_port))); 
     228 
    162229    return PJ_SUCCESS; 
    163230 
    164231on_error: 
    165     destroy_listener(listener); 
     232    lis_destroy(listener); 
    166233    return status; 
    167234} 
    168235 
    169236 
    170  
    171  
    172 static pj_status_t destroy_listener(struct tcp_listener *listener) 
     237static pj_status_t lis_destroy(struct tcp_listener *listener) 
    173238{ 
    174239    if (listener->active) { 
     
    194259 
    195260    if (listener->factory.pool) { 
     261        PJ_LOG(4,(listener->name,  "SIP TCP transport destroyed")); 
    196262        pj_pool_release(listener->factory.pool); 
    197263        listener->factory.pool = NULL; 
     
    202268 
    203269 
     270/***************************************************************************/ 
     271/* 
     272 * TCP Transport 
     273 */ 
     274 
     275/* 
     276 * Prototypes. 
     277 */ 
     278/* Called by transport manager to send message */ 
     279static pj_status_t tcp_send_msg(pjsip_transport *transport,  
     280                                pjsip_tx_data *tdata, 
     281                                const pj_sockaddr_t *rem_addr, 
     282                                int addr_len, 
     283                                void *token, 
     284                                void (*callback)(pjsip_transport *transport, 
     285                                                 void *token,  
     286                                                 pj_ssize_t sent_bytes)); 
     287 
     288/* Called by transport manager to shutdown */ 
     289static pj_status_t tcp_shutdown(pjsip_transport *transport); 
     290 
     291/* Called by transport manager to destroy */ 
     292static pj_status_t tcp_destroy(pjsip_transport *transport); 
     293 
     294/* Callback from ioqueue on incoming packet */ 
     295static void on_read_complete(pj_ioqueue_key_t *key,  
     296                             pj_ioqueue_op_key_t *op_key,  
     297                             pj_ssize_t bytes_read); 
     298 
     299/* Callback from ioqueue when packet is sent */ 
     300static void on_write_complete(pj_ioqueue_key_t *key,  
     301                              pj_ioqueue_op_key_t *op_key,  
     302                              pj_ssize_t bytes_sent); 
     303 
     304/* Callback from ioqueue when connect completes */ 
     305static void on_connect_complete(pj_ioqueue_key_t *key,  
     306                                pj_status_t status); 
     307 
     308 
     309static void sockaddr_to_host_port( pj_pool_t *pool, 
     310                                   pjsip_host_port *host_port, 
     311                                   const pj_sockaddr_in *addr ) 
     312{ 
     313    host_port->host.ptr = pj_pool_alloc(pool, 48); 
     314    host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s",  
     315                                            pj_inet_ntoa(addr->sin_addr)); 
     316    host_port->port = pj_ntohs(addr->sin_port); 
     317} 
     318 
     319 
     320/* 
     321 * Utilities to create TCP transport. 
     322 */ 
     323static pj_status_t create_tcp_transport(struct tcp_listener *listener, 
     324                                        pj_sock_t sock, 
     325                                        const pj_sockaddr_in *local, 
     326                                        const pj_sockaddr_in *remote, 
     327                                        struct tcp_transport **p_tcp) 
     328{ 
     329    struct tcp_transport *tcp; 
     330    pj_pool_t *pool; 
     331    pj_ioqueue_t *ioqueue; 
     332    pj_ioqueue_callback tcp_callback; 
     333    pj_status_t status; 
     334     
     335    pool = pjsip_endpt_create_pool(listener->endpt, "tcp",  
     336                                   POOL_TP_INIT, POOL_TP_INC); 
     337     
     338    /* 
     339     * Create and initialize basic transport structure. 
     340     */ 
     341    tcp = pj_pool_zalloc(pool, sizeof(*tcp)); 
     342    tcp->sock = sock; 
     343    tcp->listener = listener; 
     344    pj_list_init(&tcp->tx_list); 
     345 
     346 
     347    pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, "tcp%p", tcp); 
     348    tcp->base.pool = pool; 
     349 
     350    status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt); 
     351    if (status != PJ_SUCCESS) 
     352        goto on_error; 
     353 
     354    status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock); 
     355    if (status != PJ_SUCCESS) 
     356        goto on_error; 
     357 
     358    tcp->base.key.type = PJSIP_TRANSPORT_TCP; 
     359    pj_memcpy(&tcp->base.key.rem_addr, remote, sizeof(pj_sockaddr_in)); 
     360    tcp->base.type_name = "tcp"; 
     361    tcp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); 
     362 
     363    tcp->base.info = pj_pool_alloc(pool, 64); 
     364    pj_ansi_snprintf(tcp->base.info, 64, "TCP to %s:%d", 
     365                     pj_inet_ntoa(remote->sin_addr),  
     366                     (int)pj_ntohs(remote->sin_port)); 
     367 
     368    tcp->base.addr_len = sizeof(pj_sockaddr_in); 
     369    pj_memcpy(&tcp->base.local_addr, local, sizeof(pj_sockaddr_in)); 
     370    sockaddr_to_host_port(pool, &tcp->base.local_name, local); 
     371    sockaddr_to_host_port(pool, &tcp->base.remote_name, remote); 
     372 
     373    tcp->base.endpt = listener->endpt; 
     374    tcp->base.tpmgr = listener->tpmgr; 
     375    tcp->base.send_msg = &tcp_send_msg; 
     376    tcp->base.do_shutdown = &tcp_shutdown; 
     377    tcp->base.destroy = &tcp_destroy; 
     378 
     379 
     380    /* Register socket to ioqueue */ 
     381    pj_memset(&tcp_callback, 0, sizeof(pj_ioqueue_callback)); 
     382    tcp_callback.on_read_complete = &on_read_complete; 
     383    tcp_callback.on_write_complete = &on_write_complete; 
     384    tcp_callback.on_connect_complete = &on_connect_complete; 
     385 
     386    ioqueue = pjsip_endpt_get_ioqueue(listener->endpt); 
     387    status = pj_ioqueue_register_sock(pool, ioqueue, sock,  
     388                                      tcp, &tcp_callback, &tcp->key); 
     389    if (status != PJ_SUCCESS) 
     390        goto on_error; 
     391 
     392    /* Register transport to transport manager */ 
     393    status = pjsip_transport_register(listener->tpmgr, &tcp->base); 
     394    if (status != PJ_SUCCESS) 
     395        goto on_error; 
     396 
     397    tcp->is_registered = PJ_TRUE; 
     398 
     399    /* Done setting up basic transport. */ 
     400    *p_tcp = tcp; 
     401 
     402on_error: 
     403    tcp_destroy(&tcp->base); 
     404    return status; 
     405} 
     406 
     407 
     408/* Flush all pending send operations */ 
     409static tcp_flush_pending_tx(struct tcp_transport *tcp) 
     410{ 
     411    pj_lock_acquire(tcp->base.lock); 
     412    while (!pj_list_empty(&tcp->tx_list)) { 
     413        struct pending_tdata *pending_tx; 
     414        pjsip_tx_data *tdata; 
     415        pj_ioqueue_op_key_t *op_key; 
     416        pj_ssize_t size; 
     417        pj_status_t status; 
     418 
     419        pending_tx = tcp->tx_list.next; 
     420        pj_list_erase(pending_tx); 
     421 
     422        tdata = pending_tx->tdata_op_key->tdata; 
     423        op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 
     424 
     425        /* send to ioqueue! */ 
     426        size = tdata->buf.cur - tdata->buf.start; 
     427        status = pj_ioqueue_send(tcp->key, op_key, 
     428                                 tdata->buf.start, &size, 0); 
     429 
     430        if (status != PJ_EPENDING) { 
     431            on_write_complete(tcp->key, op_key, size); 
     432        } 
     433 
     434    } 
     435    pj_lock_release(tcp->base.lock); 
     436} 
     437 
     438 
     439 
     440/* Destroy TCP transport */ 
     441static pj_status_t tcp_destroy(pjsip_transport *transport) 
     442{ 
     443    struct tcp_transport *tcp = (struct tcp_transport*)transport; 
     444 
     445    /* Cancel all pending transmits */ 
     446    while (!pj_list_empty(&tcp->tx_list)) { 
     447        struct pending_tdata *pending_tx; 
     448        pj_ioqueue_op_key_t *op_key; 
     449 
     450        pending_tx = tcp->tx_list.next; 
     451        pj_list_erase(pending_tx); 
     452 
     453        op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 
     454 
     455        on_write_complete(tcp->key, op_key,  
     456                          -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN)); 
     457    } 
     458 
     459    if (tcp->is_registered) { 
     460        pjsip_transport_destroy(transport); 
     461        tcp->is_registered = PJ_FALSE; 
     462    } 
     463 
     464    if (tcp->rdata.tp_info.pool) { 
     465        pj_pool_release(tcp->rdata.tp_info.pool); 
     466        tcp->rdata.tp_info.pool = NULL; 
     467    } 
     468 
     469    if (tcp->key) { 
     470        pj_ioqueue_unregister(tcp->key); 
     471        tcp->key = NULL; 
     472    } 
     473 
     474    if (tcp->base.lock) { 
     475        pj_lock_destroy(tcp->base.lock); 
     476        tcp->base.lock = NULL; 
     477    } 
     478 
     479    if (tcp->base.ref_cnt) { 
     480        pj_atomic_destroy(tcp->base.ref_cnt); 
     481        tcp->base.ref_cnt = NULL; 
     482    } 
     483 
     484    if (tcp->base.pool) { 
     485        PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed")); 
     486        pj_pool_release(tcp->base.pool); 
     487        tcp->base.pool = NULL; 
     488    } 
     489 
     490    return PJ_SUCCESS; 
     491} 
     492 
     493 
     494/* 
     495 * This utility function creates receive data buffers and start 
     496 * asynchronous recv() operations from the socket. 
     497 */ 
     498static pj_status_t tcp_start_read(struct tcp_transport *tcp) 
     499{ 
     500    pj_pool_t *pool; 
     501    pj_ssize_t size; 
     502    pj_sockaddr_in *rem_addr; 
     503    pj_status_t status; 
     504 
     505    /* Init rdata */ 
     506    pool = pjsip_endpt_create_pool(tcp->listener->endpt, 
     507                                   "rtd%p", 
     508                                   PJSIP_POOL_RDATA_LEN, 
     509                                   PJSIP_POOL_RDATA_INC); 
     510    if (!pool) { 
     511        tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM); 
     512        return PJ_ENOMEM; 
     513    } 
     514 
     515    tcp->rdata.tp_info.pool = pool; 
     516 
     517    tcp->rdata.tp_info.transport = &tcp->base; 
     518    tcp->rdata.tp_info.tp_data = tcp; 
     519    tcp->rdata.tp_info.op_key.rdata = &tcp->rdata; 
     520    pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key,  
     521                           sizeof(pj_ioqueue_op_key_t)); 
     522 
     523    tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr; 
     524    tcp->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in); 
     525    rem_addr = (pj_sockaddr_in*) &tcp->base.key.rem_addr; 
     526    pj_ansi_strcpy(tcp->rdata.pkt_info.src_name, 
     527                   pj_inet_ntoa(rem_addr->sin_addr)); 
     528    tcp->rdata.pkt_info.src_port = pj_ntohs(rem_addr->sin_port); 
     529 
     530    size = sizeof(tcp->rdata.pkt_info.packet); 
     531    status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key, 
     532                             tcp->rdata.pkt_info.packet, &size, 
     533                             PJ_IOQUEUE_ALWAYS_ASYNC); 
     534    if (status != PJ_SUCCESS) { 
     535        tcp_perror(tcp->base.obj_name, "ioqueue recv() error", status); 
     536        return status; 
     537    } 
     538 
     539    return PJ_SUCCESS; 
     540} 
     541 
     542 
     543/* This callback is called by transport manager for the TCP factory 
     544 * to create outgoing transport to the specified destination. 
     545 */ 
     546static pj_status_t lis_create_transport(pjsip_tpfactory *factory, 
     547                                        pjsip_tpmgr *mgr, 
     548                                        pjsip_endpoint *endpt, 
     549                                        const pj_sockaddr *rem_addr, 
     550                                        int addr_len, 
     551                                        pjsip_transport **p_transport) 
     552{ 
     553    struct tcp_listener *listener; 
     554    struct tcp_transport *tcp; 
     555    pj_sock_t sock; 
     556    pj_sockaddr_in local_addr; 
     557    pj_status_t status; 
     558 
     559    /* Sanity checks */ 
     560    PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr && 
     561                     addr_len && p_transport, PJ_EINVAL); 
     562 
     563    /* Check that address is a sockaddr_in */ 
     564    PJ_ASSERT_RETURN(rem_addr->sa_family == PJ_AF_INET && 
     565                     addr_len == sizeof(pj_sockaddr_in), PJ_EINVAL); 
     566 
     567 
     568    listener = (struct tcp_listener*)factory; 
     569 
     570     
     571    /* Create socket */ 
     572    status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock); 
     573    if (status != PJ_SUCCESS) 
     574        return status; 
     575 
     576    /* Bind to any port */ 
     577    status = pj_sock_bind_in(sock, 0, 0); 
     578    if (status != PJ_SUCCESS) { 
     579        pj_sock_close(sock); 
     580        return status; 
     581    } 
     582 
     583    /* Get the local port */ 
     584    addr_len = sizeof(pj_sockaddr_in); 
     585    status = pj_sock_getsockname(sock, &local_addr, &addr_len); 
     586    if (status != PJ_SUCCESS) { 
     587        pj_sock_close(sock); 
     588        return status; 
     589    } 
     590 
     591    /* Initially set the address from the listener's address */ 
     592    local_addr.sin_addr.s_addr =  
     593        ((pj_sockaddr_in*)&listener->factory.local_addr)->sin_addr.s_addr; 
     594 
     595    /* Create the transport descriptor */ 
     596    status = create_tcp_transport(listener, sock, &local_addr,  
     597                                  (pj_sockaddr_in*)rem_addr, &tcp); 
     598    if (status != PJ_SUCCESS) 
     599        return status; 
     600         
     601    /* Start asynchronous connect() operation */ 
     602    tcp->has_pending_connect = PJ_TRUE; 
     603    pj_ioqueue_op_key_init(&tcp->connect_op.op_key,  
     604                           sizeof(tcp->connect_op.op_key)); 
     605    tcp->connect_op.transport = tcp; 
     606    status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in)); 
     607    if (status != PJ_SUCCESS) { 
     608        tcp_destroy(&tcp->base); 
     609        return status; 
     610    } 
     611 
     612    /* Update (again) local address, just in case local address currently 
     613     * set is different now that asynchronous connect() is started. 
     614     */ 
     615    addr_len = sizeof(pj_sockaddr_in); 
     616    if (pj_sock_getsockname(tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) { 
     617        pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; 
     618 
     619        /* Some systems (like old Win32 perhaps) may not set local address 
     620         * properly before socket is fully connected. 
     621         */ 
     622        if (tp_addr->sin_addr.s_addr != local_addr.sin_addr.s_addr && 
     623            local_addr.sin_addr.s_addr != 0)  
     624        { 
     625            tp_addr->sin_addr.s_addr = local_addr.sin_addr.s_addr; 
     626            tp_addr->sin_port = local_addr.sin_port; 
     627            sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name, 
     628                                  &local_addr); 
     629        } 
     630    } 
     631 
     632    /* Done */ 
     633    *p_transport = &tcp->base; 
     634 
     635    return PJ_SUCCESS; 
     636} 
     637 
     638 
     639/* 
     640 * This callback is called by ioqueue when pending accept() operation has 
     641 * completed. 
     642 */ 
    204643static void on_accept_complete( pj_ioqueue_key_t *key,  
    205644                                pj_ioqueue_op_key_t *op_key,  
     
    207646                                pj_status_t status) 
    208647{ 
    209 } 
    210  
    211  
    212 static pj_status_t create_transport(pjsip_tpfactory *factory, 
    213                                     pjsip_tpmgr *mgr, 
    214                                     pjsip_endpoint *endpt, 
    215                                     const pj_sockaddr *rem_addr, 
    216                                     int addr_len, 
    217                                     pjsip_transport **transport) 
    218 { 
    219 } 
    220  
     648    struct tcp_listener *listener; 
     649    struct tcp_transport *tcp; 
     650    struct pending_accept *accept_op; 
     651    int err_cnt = 0; 
     652 
     653    listener = pj_ioqueue_get_user_data(key); 
     654    accept_op = (struct pending_accept*) op_key; 
     655 
     656    do { 
     657        if (status != PJ_SUCCESS) { 
     658            tcp_perror(listener->name, "Error in accept()", status); 
     659 
     660            ++err_cnt; 
     661            if (err_cnt >= 5) { 
     662                PJ_LOG(1, (listener->name,  
     663                           "Too many errors, listener stopping")); 
     664            } 
     665 
     666            goto start_next_accept; 
     667        } 
     668 
     669        status = create_tcp_transport( listener, sock,  
     670                                       &accept_op->local_addr,  
     671                                       &accept_op->remote_addr, &tcp); 
     672        if (status == PJ_SUCCESS) { 
     673            status = tcp_start_read(tcp); 
     674            if (status != PJ_SUCCESS) { 
     675                PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); 
     676                tcp_destroy(&tcp->base); 
     677            } 
     678        } 
     679 
     680start_next_accept: 
     681 
     682        status = pj_ioqueue_accept(listener->key,  
     683                                   &accept_op->op_key, 
     684                                   &accept_op->new_sock, 
     685                                   &accept_op->local_addr, 
     686                                   &accept_op->remote_addr, 
     687                                   &accept_op->addr_len); 
     688 
     689    } while (status != PJ_EPENDING); 
     690} 
     691 
     692 
     693/* Callback from ioqueue when packet is sent */ 
     694static void on_write_complete(pj_ioqueue_key_t *key,  
     695                              pj_ioqueue_op_key_t *op_key,  
     696                              pj_ssize_t bytes_sent) 
     697{ 
     698    struct tcp_transport *tp = pj_ioqueue_get_user_data(key); 
     699    pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; 
     700 
     701    tdata_op_key->tdata = NULL; 
     702 
     703    if (tdata_op_key->callback) { 
     704        tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent); 
     705    } 
     706} 
     707 
     708 
     709/* This callback is called by transport manager to send SIP message */ 
     710static pj_status_t tcp_send_msg(pjsip_transport *transport,  
     711                                pjsip_tx_data *tdata, 
     712                                const pj_sockaddr_t *rem_addr, 
     713                                int addr_len, 
     714                                void *token, 
     715                                void (*callback)(pjsip_transport *transport, 
     716                                                 void *token,  
     717                                                 pj_ssize_t sent_bytes)) 
     718{ 
     719    struct tcp_transport *tcp = (struct tcp_transport*)transport; 
     720    pj_ssize_t size; 
     721    pj_status_t status; 
     722 
     723    /* Sanity check */ 
     724    PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL); 
     725 
     726    /* Check that there's no pending operation associated with the tdata */ 
     727    PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX); 
     728     
     729    /* Check the address is supported */ 
     730    PJ_ASSERT_RETURN(rem_addr && addr_len==sizeof(pj_sockaddr_in), PJ_EINVAL); 
     731 
     732 
     733 
     734    /* Init op key. */ 
     735    tdata->op_key.tdata = tdata; 
     736    tdata->op_key.token = token; 
     737    tdata->op_key.callback = callback; 
     738 
     739    /* If asynchronous connect() has not completed yet, just put the 
     740     * transmit data in the pending transmission list. 
     741     */ 
     742    pj_lock_acquire(tcp->base.lock); 
     743 
     744    if (tcp->has_pending_connect) { 
     745        struct pending_tdata *pending_tdata; 
     746 
     747        /* Pust to list */ 
     748        pending_tdata = pj_pool_alloc(tdata->pool, sizeof(*pending_tdata)); 
     749        pending_tdata->tdata_op_key = &tdata->op_key; 
     750 
     751        pj_list_push_back(&tcp->tx_list, pending_tdata); 
     752        status = PJ_EPENDING; 
     753 
     754    } else { 
     755        /* send to ioqueue! */ 
     756        size = tdata->buf.cur - tdata->buf.start; 
     757        status = pj_ioqueue_send(tcp->key,  
     758                                 (pj_ioqueue_op_key_t*)&tdata->op_key, 
     759                                 tdata->buf.start, &size, 0); 
     760 
     761        if (status != PJ_EPENDING) 
     762            tdata->op_key.tdata = NULL; 
     763    } 
     764 
     765    pj_lock_release(tcp->base.lock); 
     766 
     767    return status; 
     768} 
     769 
     770 
     771/* This callback is called by transport manager to shutdown transport */ 
     772static pj_status_t tcp_shutdown(pjsip_transport *transport) 
     773{ 
     774 
     775    PJ_UNUSED_ARG(transport); 
     776 
     777    /* Nothing to do for TCP */ 
     778    return PJ_SUCCESS; 
     779} 
     780 
     781 
     782/* Callback from ioqueue on incoming packet */ 
     783static void on_read_complete(pj_ioqueue_key_t *key,  
     784                             pj_ioqueue_op_key_t *op_key,  
     785                             pj_ssize_t bytes_read) 
     786{ 
     787    enum { MAX_IMMEDIATE_PACKET = 10 }; 
     788    pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; 
     789    pjsip_rx_data *rdata = rdata_op_key->rdata; 
     790    struct tcp_transport *tp = (struct tcp_transport*)rdata->tp_info.transport; 
     791    int i; 
     792    pj_status_t status; 
     793 
     794    /* Don't do anything if transport is closing. */ 
     795    if (tp->is_closing) { 
     796        tp->is_closing++; 
     797        return; 
     798    } 
     799 
     800    /* 
     801     * The idea of the loop is to process immediate data received by 
     802     * pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When 
     803     * i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to 
     804     * complete asynchronously, to allow other sockets to get their data. 
     805     */ 
     806    for (i=0;; ++i) { 
     807        pj_uint32_t flags; 
     808 
     809        /* Report the packet to transport manager. */ 
     810        if (bytes_read > 0) { 
     811            pj_size_t size_eaten; 
     812 
     813            /* Init pkt_info part. */ 
     814            rdata->pkt_info.len += bytes_read; 
     815            rdata->pkt_info.zero = 0; 
     816            pj_gettimeofday(&rdata->pkt_info.timestamp); 
     817 
     818            size_eaten =  
     819                pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,  
     820                                           rdata); 
     821 
     822            pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); 
     823 
     824            /* Move unprocessed data to the front of the buffer */ 
     825            if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) { 
     826                pj_memmove(rdata->pkt_info.packet, 
     827                           rdata->pkt_info.packet + size_eaten, 
     828                           rdata->pkt_info.len - size_eaten); 
     829            } 
     830             
     831            rdata->pkt_info.len -= size_eaten; 
     832 
     833        } else if (bytes_read == 0) { 
     834 
     835            /* Transport is closed */ 
     836            PJ_LOG(4,(tp->base.obj_name, "tcp connection closed")); 
     837            tcp_destroy(&tp->base); 
     838            return; 
     839 
     840        } else if (bytes_read < 0)  { 
     841 
     842            /* Report error to endpoint. */ 
     843            PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, 
     844                                   rdata->tp_info.transport->obj_name, 
     845                                   -bytes_read, "tcp recv() error")); 
     846 
     847            /* Transport error, close transport */ 
     848            tcp_destroy(&tp->base); 
     849            return; 
     850        } 
     851 
     852        if (i >= MAX_IMMEDIATE_PACKET) { 
     853            /* Force ioqueue_recv() to return PJ_EPENDING */ 
     854            flags = PJ_IOQUEUE_ALWAYS_ASYNC; 
     855        } else { 
     856            flags = 0; 
     857        } 
     858 
     859        /* Reset pool. */ 
     860        pj_pool_reset(rdata->tp_info.pool); 
     861 
     862        /* Read next packet. */ 
     863        bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len; 
     864        rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr); 
     865        status = pj_ioqueue_recv(key, op_key,  
     866                                 rdata->pkt_info.packet+rdata->pkt_info.len, 
     867                                 &bytes_read, flags); 
     868 
     869        if (status == PJ_SUCCESS) { 
     870            /* Continue loop. */ 
     871            pj_assert(i < MAX_IMMEDIATE_PACKET); 
     872 
     873        } else if (status == PJ_EPENDING) { 
     874            break; 
     875 
     876        } else { 
     877            /* Report error to endpoint */ 
     878            PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, 
     879                                   rdata->tp_info.transport->obj_name, 
     880                                   status, "tcp recv() error")); 
     881 
     882            /* Transport error, close transport */ 
     883            tcp_destroy(&tp->base); 
     884            return; 
     885        } 
     886    } 
     887} 
     888 
     889 
     890/* Callback from ioqueue when connect completes */ 
     891static void on_connect_complete(pj_ioqueue_key_t *key,  
     892                                pj_status_t status) 
     893{ 
     894    struct pending_connect *connect_op = (struct pending_connect *)key; 
     895    struct tcp_transport *tcp = connect_op->transport; 
     896    pj_sockaddr_in addr; 
     897    int addrlen; 
     898 
     899    /* Mark that pending connect() operation has completed. */ 
     900    tcp->has_pending_connect = PJ_FALSE; 
     901 
     902    /* Check connect() status */ 
     903    if (status != PJ_SUCCESS) { 
     904        tcp_perror(tcp->base.obj_name, "TCP connect() error", status); 
     905        tcp_destroy(&tcp->base); 
     906        return; 
     907    } 
     908 
     909    /* Update (again) local address, just in case local address currently 
     910     * set is different now that the socket is connected (could happen 
     911     * on some systems, like old Win32 probably?). 
     912     */ 
     913    addrlen = sizeof(pj_sockaddr_in); 
     914    if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) { 
     915        pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; 
     916 
     917        if (tp_addr->sin_addr.s_addr != addr.sin_addr.s_addr) { 
     918            tp_addr->sin_addr.s_addr = addr.sin_addr.s_addr; 
     919            tp_addr->sin_port = addr.sin_port; 
     920            sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name, 
     921                                  tp_addr); 
     922        } 
     923    } 
     924 
     925    /* Start pending read */ 
     926    status = tcp_start_read(tcp); 
     927    if (status != PJ_SUCCESS) { 
     928        tcp_destroy(&tcp->base); 
     929        return; 
     930    } 
     931 
     932    /* Flush all pending send operations */ 
     933    tcp_flush_pending_tx(tcp); 
     934} 
     935 
Note: See TracChangeset for help on using the changeset viewer.