Changeset 2188


Ignore:
Timestamp:
Aug 4, 2008 10:52:51 AM (11 years ago)
Author:
bennylp
Message:

Changed SIP transport to use active socket to fix ticket #579: "Data loss with TCP sockets (thanks Helmut Wolf for the report)". Also added SIP more TCP transport tests to reproduce the bug

Location:
pjproject/trunk/pjsip/src
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjsip/src/pjsip/sip_transport_tcp.c

    r2180 r2188  
    2222#include <pj/compat/socket.h> 
    2323#include <pj/addr_resolv.h> 
     24#include <pj/activesock.h> 
    2425#include <pj/assert.h> 
    25 #include <pj/ioqueue.h> 
    2626#include <pj/lock.h> 
    2727#include <pj/log.h> 
     
    4747 
    4848/* 
    49  * This structure is "descendant" of pj_ioqueue_op_key_t, and it is used to 
    50  * track pending/asynchronous accept() operation. TCP transport may have 
    51  * more than one pending accept() operations, depending on the value of 
    52  * async_cnt. 
    53  */ 
    54 struct pending_accept 
    55 { 
    56     pj_ioqueue_op_key_t      op_key; 
    57     struct tcp_listener     *listener; 
    58     unsigned                 index; 
    59     pj_pool_t               *pool; 
    60     pj_sock_t                new_sock; 
    61     int                      addr_len; 
    62     pj_sockaddr_in           local_addr; 
    63     pj_sockaddr_in           remote_addr; 
    64 }; 
    65  
    66  
    67 /* 
    6849 * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the 
    6950 * SIP transport factory). 
     
    7556    pjsip_endpoint          *endpt; 
    7657    pjsip_tpmgr             *tpmgr; 
    77     pj_sock_t                sock; 
    78     pj_ioqueue_key_t        *key; 
    79     unsigned                 async_cnt; 
    80     struct pending_accept   *accept_op[MAX_ASYNC_CNT]; 
     58    pj_activesock_t         *asock; 
    8159}; 
    8260 
     
    11593    pj_status_t              close_reason; 
    11694    pj_sock_t                sock; 
    117     pj_ioqueue_key_t        *key; 
     95    pj_activesock_t         *asock; 
    11896    pj_bool_t                has_pending_connect; 
    11997 
     
    140118 
    141119/* This callback is called when pending accept() operation completes. */ 
    142 static void on_accept_complete( pj_ioqueue_key_t *key,  
    143                                 pj_ioqueue_op_key_t *op_key,  
    144                                 pj_sock_t sock,  
    145                                 pj_status_t status); 
    146  
    147 /* Handle accept() completion */ 
    148 static pj_status_t handle_accept(pj_ioqueue_key_t *key,  
    149                                  pj_ioqueue_op_key_t *op_key,  
    150                                  pj_sock_t sock,  
    151                                  pj_status_t status); 
     120static pj_bool_t on_accept_complete(pj_activesock_t *asock, 
     121                                    pj_sock_t newsock, 
     122                                    const pj_sockaddr_t *src_addr, 
     123                                    int src_addr_len); 
    152124 
    153125/* This callback is called by transport manager to destroy listener */ 
     
    186158                                   const pj_sockaddr_in *addr ) 
    187159{ 
    188     enum { M = 48 }; 
    189     host_port->host.ptr = (char*) pj_pool_alloc(pool, M); 
    190     host_port->host.slen = pj_ansi_snprintf( host_port->host.ptr, M, "%s",  
    191                                             pj_inet_ntoa(addr->sin_addr)); 
    192     host_port->port = pj_ntohs(addr->sin_port); 
     160    host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4); 
     161    pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 2); 
     162    host_port->host.slen = pj_ansi_strlen(host_port->host.ptr); 
     163    host_port->port = pj_sockaddr_get_port(addr); 
    193164} 
    194165 
     
    210181{ 
    211182    pj_pool_t *pool; 
     183    pj_sock_t sock = PJ_INVALID_SOCKET; 
    212184    struct tcp_listener *listener; 
    213     pj_ioqueue_callback listener_cb; 
     185    pj_activesock_cfg asock_cfg; 
     186    pj_activesock_cb listener_cb; 
    214187    pj_sockaddr_in *listener_addr; 
    215188    int addr_len; 
    216     unsigned i; 
    217189    pj_status_t status; 
    218190 
     
    245217    listener->factory.flag =  
    246218        pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); 
    247     listener->sock = PJ_INVALID_SOCKET; 
    248219 
    249220    pj_ansi_strcpy(listener->factory.obj_name, "tcplis"); 
     
    256227 
    257228    /* Create and bind socket */ 
    258     status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,  
    259                             &listener->sock); 
     229    status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock); 
    260230    if (status != PJ_SUCCESS) 
    261231        goto on_error; 
     
    268238    } 
    269239 
    270     status = pj_sock_bind(listener->sock, listener_addr,  
    271                           sizeof(pj_sockaddr_in)); 
     240    status = pj_sock_bind(sock, listener_addr, sizeof(pj_sockaddr_in)); 
    272241    if (status != PJ_SUCCESS) 
    273242        goto on_error; 
     
    275244    /* Retrieve the bound address */ 
    276245    addr_len = sizeof(pj_sockaddr_in); 
    277     status = pj_sock_getsockname(listener->sock, listener_addr, &addr_len); 
     246    status = pj_sock_getsockname(sock, listener_addr, &addr_len); 
    278247    if (status != PJ_SUCCESS) 
    279248        goto on_error; 
     
    321290 
    322291    /* Start listening to the address */ 
    323     status = pj_sock_listen(listener->sock, PJSIP_TCP_TRANSPORT_BACKLOG); 
     292    status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG); 
    324293    if (status != PJ_SUCCESS) 
    325294        goto on_error; 
    326295 
    327296 
    328     /* Register socket to ioqeuue */ 
     297    /* Create active socket */ 
     298    if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; 
     299    pj_activesock_cfg_default(&asock_cfg); 
     300    asock_cfg.async_cnt = async_cnt; 
     301 
    329302    pj_bzero(&listener_cb, sizeof(listener_cb)); 
    330303    listener_cb.on_accept_complete = &on_accept_complete; 
    331     status = pj_ioqueue_register_sock(pool, pjsip_endpt_get_ioqueue(endpt), 
    332                                       listener->sock, listener, 
    333                                       &listener_cb, &listener->key); 
    334     if (status != PJ_SUCCESS) 
    335         goto on_error; 
     304    status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg, 
     305                                  pjsip_endpt_get_ioqueue(endpt),  
     306                                  &listener_cb, listener, 
     307                                  &listener->asock); 
    336308 
    337309    /* Register to transport manager */ 
     
    348320    } 
    349321 
    350  
    351322    /* Start pending accept() operations */ 
    352     if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; 
    353     listener->async_cnt = async_cnt; 
    354  
    355     for (i=0; i<async_cnt; ++i) { 
    356         pj_pool_t *pool; 
    357  
    358         pool = pjsip_endpt_create_pool(endpt, "tcps%p", POOL_TP_INIT,  
    359                                        POOL_TP_INIT); 
    360         if (!pool) { 
    361             status = PJ_ENOMEM; 
    362             goto on_error; 
    363         } 
    364  
    365         listener->accept_op[i] = PJ_POOL_ZALLOC_T(pool,  
    366                                                   struct pending_accept); 
    367         pj_ioqueue_op_key_init(&listener->accept_op[i]->op_key,  
    368                                 sizeof(listener->accept_op[i]->op_key)); 
    369         listener->accept_op[i]->pool = pool; 
    370         listener->accept_op[i]->listener = listener; 
    371         listener->accept_op[i]->index = i; 
    372  
    373         status = handle_accept(listener->key, &listener->accept_op[i]->op_key, 
    374                                listener->sock, PJ_EPENDING); 
    375         if (status != PJ_SUCCESS) 
    376             goto on_error; 
    377     } 
     323    status = pj_activesock_start_accept(listener->asock, pool); 
     324    if (status != PJ_SUCCESS) 
     325        goto on_error; 
    378326 
    379327    PJ_LOG(4,(listener->factory.obj_name,  
     
    389337 
    390338on_error: 
     339    if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET) 
     340        pj_sock_close(sock); 
    391341    lis_destroy(&listener->factory); 
    392342    return status; 
     
    411361{ 
    412362    struct tcp_listener *listener = (struct tcp_listener *)factory; 
    413     unsigned i; 
    414363 
    415364    if (listener->is_registered) { 
     
    418367    } 
    419368 
    420     if (listener->key) { 
    421         pj_ioqueue_unregister(listener->key); 
    422         listener->key = NULL; 
    423         listener->sock = PJ_INVALID_SOCKET; 
    424     } 
    425  
    426     if (listener->sock != PJ_INVALID_SOCKET) { 
    427         pj_sock_close(listener->sock); 
    428         listener->sock = PJ_INVALID_SOCKET; 
     369    if (listener->asock) { 
     370        pj_activesock_close(listener->asock); 
     371        listener->asock = NULL; 
    429372    } 
    430373 
     
    432375        pj_lock_destroy(listener->factory.lock); 
    433376        listener->factory.lock = NULL; 
    434     } 
    435  
    436     for (i=0; i<PJ_ARRAY_SIZE(listener->accept_op); ++i) { 
    437         if (listener->accept_op[i] && listener->accept_op[i]->pool) { 
    438             pj_pool_t *pool = listener->accept_op[i]->pool; 
    439             listener->accept_op[i]->pool = NULL; 
    440             pj_pool_release(pool); 
    441         } 
    442377    } 
    443378 
     
    481416                               pj_status_t reason); 
    482417 
    483 /* Callback from ioqueue on incoming packet */ 
    484 static void on_read_complete(pj_ioqueue_key_t *key,  
    485                              pj_ioqueue_op_key_t *op_key,  
    486                              pj_ssize_t bytes_read); 
    487  
    488 /* Callback from ioqueue when packet is sent */ 
    489 static void on_write_complete(pj_ioqueue_key_t *key,  
    490                               pj_ioqueue_op_key_t *op_key,  
    491                               pj_ssize_t bytes_sent); 
    492  
    493 /* Callback from ioqueue when connect completes */ 
    494 static void on_connect_complete(pj_ioqueue_key_t *key,  
    495                                 pj_status_t status); 
     418/* Callback on incoming data */ 
     419static pj_bool_t on_data_read(pj_activesock_t *asock, 
     420                              void *data, 
     421                              pj_size_t size, 
     422                              pj_status_t status, 
     423                              pj_size_t *remainder); 
     424 
     425/* Callback when packet is sent */ 
     426static pj_bool_t on_data_sent(pj_activesock_t *asock, 
     427                              pj_ioqueue_op_key_t *send_key, 
     428                              pj_ssize_t sent); 
     429 
     430/* Callback when connect completes */ 
     431static pj_bool_t on_connect_complete(pj_activesock_t *asock, 
     432                                     pj_status_t status); 
    496433 
    497434/* TCP keep-alive timer callback */ 
     
    511448    struct tcp_transport *tcp; 
    512449    pj_ioqueue_t *ioqueue; 
    513     pj_ioqueue_callback tcp_callback; 
     450    pj_activesock_cfg asock_cfg; 
     451    pj_activesock_cb tcp_callback; 
    514452    const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA; 
    515453    pj_status_t status; 
     
    529467     */ 
    530468    tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport); 
     469    tcp->is_server = is_server; 
    531470    tcp->sock = sock; 
    532     tcp->is_server = is_server; 
    533471    /*tcp->listener = listener;*/ 
    534472    pj_list_init(&tcp->delayed_list); 
     
    570508 
    571509 
    572     /* Register socket to ioqueue */ 
    573     pj_bzero(&tcp_callback, sizeof(pj_ioqueue_callback)); 
    574     tcp_callback.on_read_complete = &on_read_complete; 
    575     tcp_callback.on_write_complete = &on_write_complete; 
     510    /* Create active socket */ 
     511    pj_activesock_cfg_default(&asock_cfg); 
     512    asock_cfg.async_cnt = 1; 
     513 
     514    pj_bzero(&tcp_callback, sizeof(tcp_callback)); 
     515    tcp_callback.on_data_read = &on_data_read; 
     516    tcp_callback.on_data_sent = &on_data_sent; 
    576517    tcp_callback.on_connect_complete = &on_connect_complete; 
    577518 
    578519    ioqueue = pjsip_endpt_get_ioqueue(listener->endpt); 
    579     status = pj_ioqueue_register_sock(pool, ioqueue, sock,  
    580                                       tcp, &tcp_callback, &tcp->key); 
     520    status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg, 
     521                                  ioqueue, &tcp_callback, tcp, &tcp->asock); 
    581522    if (status != PJ_SUCCESS) { 
    582523        goto on_error; 
     
    628569        op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 
    629570 
    630         /* send to ioqueue! */ 
     571        /* send! */ 
    631572        size = tdata->buf.cur - tdata->buf.start; 
    632         status = pj_ioqueue_send(tcp->key, op_key, 
    633                                  tdata->buf.start, &size, 0); 
    634  
     573        status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start,  
     574                                    &size, 0); 
    635575        if (status != PJ_EPENDING) { 
    636             on_write_complete(tcp->key, op_key, size); 
     576            on_data_sent(tcp->asock, op_key, size); 
    637577        } 
    638578 
     
    694634        op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 
    695635 
    696         on_write_complete(tcp->key, op_key, -reason); 
     636        on_data_sent(tcp->asock, op_key, -reason); 
    697637    } 
    698638 
     
    702642    } 
    703643 
    704     if (tcp->key) { 
    705         pj_ioqueue_unregister(tcp->key); 
    706         tcp->key = NULL; 
     644    if (tcp->asock) { 
     645        pj_activesock_close(tcp->asock); 
     646        tcp->asock = NULL; 
    707647        tcp->sock = PJ_INVALID_SOCKET; 
    708     } 
    709  
    710     if (tcp->sock != PJ_INVALID_SOCKET) { 
     648    } else if (tcp->sock != PJ_INVALID_SOCKET) { 
    711649        pj_sock_close(tcp->sock); 
    712650        tcp->sock = PJ_INVALID_SOCKET; 
     
    760698    pj_ssize_t size; 
    761699    pj_sockaddr_in *rem_addr; 
     700    void *readbuf[1]; 
    762701    pj_status_t status; 
    763702 
     
    788727 
    789728    size = sizeof(tcp->rdata.pkt_info.packet); 
    790     status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key, 
    791                              tcp->rdata.pkt_info.packet, &size, 
    792                              PJ_IOQUEUE_ALWAYS_ASYNC); 
     729    readbuf[0] = tcp->rdata.pkt_info.packet; 
     730    status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size, 
     731                                       readbuf, 0); 
    793732    if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
    794         PJ_LOG(4, (tcp->base.obj_name, "ioqueue recv() error, status=%d",  
     733        PJ_LOG(4, (tcp->base.obj_name,  
     734                   "pj_activesock_start_read() error, status=%d",  
    795735                   status)); 
    796736        return status; 
     
    862802    /* Start asynchronous connect() operation */ 
    863803    tcp->has_pending_connect = PJ_TRUE; 
    864     status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in)); 
     804    status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr, 
     805                                         sizeof(pj_sockaddr_in)); 
    865806    if (status == PJ_SUCCESS) { 
    866807        tcp->has_pending_connect = PJ_FALSE; 
     
    874815     */ 
    875816    addr_len = sizeof(pj_sockaddr_in); 
    876     if (pj_sock_getsockname(tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) { 
     817    if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) { 
    877818        pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; 
    878819 
     
    909850 
    910851/* 
    911  * This callback is called by ioqueue when pending accept() operation has 
    912  * completed. 
    913  */ 
    914 static void on_accept_complete( pj_ioqueue_key_t *key,  
    915                                 pj_ioqueue_op_key_t *op_key,  
    916                                 pj_sock_t sock,  
    917                                 pj_status_t status) 
    918 { 
    919     handle_accept(key, op_key, sock, status); 
    920 } 
    921  
    922  
    923 /* Handle accept() completion */ 
    924 static pj_status_t handle_accept(pj_ioqueue_key_t *key,  
    925                                  pj_ioqueue_op_key_t *op_key,  
    926                                  pj_sock_t sock,  
    927                                  pj_status_t status) 
     852 * This callback is called by active socket when pending accept() operation 
     853 * has completed. 
     854 */ 
     855static pj_bool_t on_accept_complete(pj_activesock_t *asock, 
     856                                    pj_sock_t sock, 
     857                                    const pj_sockaddr_t *src_addr, 
     858                                    int src_addr_len) 
    928859{ 
    929860    struct tcp_listener *listener; 
    930861    struct tcp_transport *tcp; 
    931     struct pending_accept *accept_op; 
    932     int err_cnt = 0; 
    933  
    934     listener = (struct tcp_listener*) pj_ioqueue_get_user_data(key); 
    935     accept_op = (struct pending_accept*) op_key; 
    936  
    937     /* 
    938      * Loop while there is immediate connection or when there is error. 
     862    char addr[PJ_INET6_ADDRSTRLEN+10]; 
     863    pj_status_t status; 
     864 
     865    PJ_UNUSED_ARG(src_addr_len); 
     866 
     867    listener = (struct tcp_listener*) pj_activesock_get_user_data(asock); 
     868 
     869    PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE); 
     870 
     871    PJ_LOG(4,(listener->factory.obj_name,  
     872              "TCP listener %.*s:%d: got incoming TCP connection " 
     873              "from %s, sock=%d", 
     874              (int)listener->factory.addr_name.host.slen, 
     875              listener->factory.addr_name.host.ptr, 
     876              listener->factory.addr_name.port, 
     877              pj_sockaddr_print(src_addr, addr, sizeof(addr), 3), 
     878              sock)); 
     879 
     880    /*  
     881     * Incoming connection! 
     882     * Create TCP transport for the new socket. 
    939883     */ 
    940     do { 
    941         if (status == PJ_EPENDING) { 
    942             /* 
    943              * This can only happen when this function is called during 
    944              * initialization to kick off asynchronous accept(). 
    945              */ 
    946  
    947         } else if (status != PJ_SUCCESS) { 
    948  
    949             /* 
    950              * Error in accept(). 
    951              */ 
    952             tcp_perror(listener->factory.obj_name, "Error in accept()",  
    953                        status); 
    954  
    955             /* 
    956              * Prevent endless accept() error loop by limiting the 
    957              * number of consecutive errors. Once the number of errors 
    958              * is equal to maximum, we treat this as permanent error, and 
    959              * we stop the accept() operation. 
    960              */ 
    961             ++err_cnt; 
    962             if (err_cnt >= 20) { 
    963                 PJ_LOG(1, (listener->factory.obj_name,  
    964                            "Too many errors, LISTENER IS STOPPING!")); 
    965                 return status; 
     884    status = tcp_create( listener, NULL, sock, PJ_TRUE, 
     885                         (const pj_sockaddr_in*)&listener->factory.local_addr, 
     886                         (const pj_sockaddr_in*)src_addr, &tcp); 
     887    if (status == PJ_SUCCESS) { 
     888        status = tcp_start_read(tcp); 
     889        if (status != PJ_SUCCESS) { 
     890            PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); 
     891            tcp_destroy(&tcp->base, status); 
     892        } else { 
     893            /* Start keep-alive timer */ 
     894            if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) { 
     895                pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0}; 
     896                pjsip_endpt_schedule_timer(listener->endpt,  
     897                                           &tcp->ka_timer,  
     898                                           &delay); 
     899                tcp->ka_timer.id = PJ_TRUE; 
     900                pj_gettimeofday(&tcp->last_activity); 
    966901            } 
    967  
    968         } else { 
    969             pj_pool_t *pool; 
    970             struct pending_accept *new_op; 
    971  
    972             if (sock == PJ_INVALID_SOCKET) { 
    973                 sock = accept_op->new_sock; 
    974             } 
    975  
    976             if (sock == PJ_INVALID_SOCKET) { 
    977                 pj_assert(!"Should not happen. status should be error"); 
    978                 goto next_accept; 
    979             } 
    980  
    981             PJ_LOG(4,(listener->factory.obj_name,  
    982                       "TCP listener %.*s:%d: got incoming TCP connection " 
    983                       "from %s:%d, sock=%d", 
    984                       (int)listener->factory.addr_name.host.slen, 
    985                       listener->factory.addr_name.host.ptr, 
    986                       listener->factory.addr_name.port, 
    987                       pj_inet_ntoa(accept_op->remote_addr.sin_addr), 
    988                       pj_ntohs(accept_op->remote_addr.sin_port), 
    989                       sock)); 
    990  
    991             /* Create new accept_opt */ 
    992             pool = pjsip_endpt_create_pool(listener->endpt, "tcps%p",  
    993                                            POOL_TP_INIT, POOL_TP_INC); 
    994             new_op = PJ_POOL_ZALLOC_T(pool, struct pending_accept); 
    995             new_op->pool = pool; 
    996             new_op->listener = listener; 
    997             new_op->index = accept_op->index; 
    998             pj_ioqueue_op_key_init(&new_op->op_key, sizeof(new_op->op_key)); 
    999             listener->accept_op[accept_op->index] = new_op; 
    1000  
    1001             /*  
    1002              * Incoming connections! 
    1003              * Create TCP transport for the new socket. 
    1004              */ 
    1005             status = tcp_create( listener, accept_op->pool, sock, PJ_TRUE, 
    1006                                  &accept_op->local_addr,  
    1007                                  &accept_op->remote_addr, &tcp); 
    1008             if (status == PJ_SUCCESS) { 
    1009                 status = tcp_start_read(tcp); 
    1010                 if (status != PJ_SUCCESS) { 
    1011                     PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); 
    1012                     tcp_destroy(&tcp->base, status); 
    1013                 } else { 
    1014                     /* Start keep-alive timer */ 
    1015                     if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) { 
    1016                         pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0}; 
    1017                         pjsip_endpt_schedule_timer(listener->endpt,  
    1018                                                    &tcp->ka_timer,  
    1019                                                    &delay); 
    1020                         tcp->ka_timer.id = PJ_TRUE; 
    1021                         pj_gettimeofday(&tcp->last_activity); 
    1022                     } 
    1023                 } 
    1024             } 
    1025  
    1026             accept_op = new_op; 
    1027902        } 
    1028  
    1029 next_accept: 
    1030         /* 
    1031          * Start the next asynchronous accept() operation. 
    1032          */ 
    1033         accept_op->addr_len = sizeof(pj_sockaddr_in); 
    1034         accept_op->new_sock = PJ_INVALID_SOCKET; 
    1035  
    1036         status = pj_ioqueue_accept(listener->key,  
    1037                                    &accept_op->op_key, 
    1038                                    &accept_op->new_sock, 
    1039                                    &accept_op->local_addr, 
    1040                                    &accept_op->remote_addr, 
    1041                                    &accept_op->addr_len); 
    1042  
    1043         /* 
    1044          * Loop while we have immediate connection or when there is error. 
    1045          */ 
    1046  
    1047     } while (status != PJ_EPENDING); 
    1048  
    1049     return PJ_SUCCESS; 
     903    } 
     904 
     905    return PJ_TRUE; 
    1050906} 
    1051907 
     
    1054910 * Callback from ioqueue when packet is sent. 
    1055911 */ 
    1056 static void on_write_complete(pj_ioqueue_key_t *key,  
    1057                               pj_ioqueue_op_key_t *op_key,  
    1058                               pj_ssize_t bytes_sent) 
     912static pj_bool_t on_data_sent(pj_activesock_t *asock, 
     913                              pj_ioqueue_op_key_t *op_key, 
     914                              pj_ssize_t bytes_sent) 
    1059915{ 
    1060916    struct tcp_transport *tcp = (struct tcp_transport*)  
    1061                                 pj_ioqueue_get_user_data(key); 
     917                                pj_activesock_get_user_data(asock); 
    1062918    pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; 
    1063919 
     
    1079935        /* Mark last activity time */ 
    1080936        pj_gettimeofday(&tcp->last_activity); 
     937 
    1081938    } 
    1082939 
     
    1092949        if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 
    1093950        pjsip_transport_shutdown(&tcp->base); 
    1094     } 
    1095  
     951 
     952        return PJ_FALSE; 
     953    } 
     954 
     955    return PJ_TRUE; 
    1096956} 
    1097957 
     
    11671027         */ 
    11681028        size = tdata->buf.cur - tdata->buf.start; 
    1169         status = pj_ioqueue_send(tcp->key,  
    1170                                  (pj_ioqueue_op_key_t*)&tdata->op_key, 
    1171                                  tdata->buf.start, &size, 0); 
     1029        status = pj_activesock_send(tcp->asock,  
     1030                                    (pj_ioqueue_op_key_t*)&tdata->op_key, 
     1031                                    tdata->buf.start, &size, 0); 
    11721032 
    11731033        if (status != PJ_EPENDING) { 
     
    12131073 * Callback from ioqueue that an incoming data is received from the socket. 
    12141074 */ 
    1215 static void on_read_complete(pj_ioqueue_key_t *key,  
    1216                              pj_ioqueue_op_key_t *op_key,  
    1217                              pj_ssize_t bytes_read) 
     1075static pj_bool_t on_data_read(pj_activesock_t *asock, 
     1076                              void *data, 
     1077                              pj_size_t size, 
     1078                              pj_status_t status, 
     1079                              pj_size_t *remainder) 
    12181080{ 
    12191081    enum { MAX_IMMEDIATE_PACKET = 10 }; 
    1220     pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; 
    1221     pjsip_rx_data *rdata = rdata_op_key->rdata; 
    1222     struct tcp_transport *tcp =  
    1223         (struct tcp_transport*)rdata->tp_info.transport; 
    1224     int i; 
    1225     pj_status_t status; 
     1082    struct tcp_transport *tcp; 
     1083    pjsip_rx_data *rdata; 
     1084 
     1085    PJ_UNUSED_ARG(data); 
     1086 
     1087    tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock); 
     1088    rdata = &tcp->rdata; 
    12261089 
    12271090    /* Don't do anything if transport is closing. */ 
    12281091    if (tcp->is_closing) { 
    12291092        tcp->is_closing++; 
    1230         return; 
    1231     } 
    1232  
    1233     /* 
    1234      * The idea of the loop is to process immediate data received by 
    1235      * pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When 
    1236      * i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to 
    1237      * complete asynchronously, to allow other sockets to get their data. 
     1093        return PJ_FALSE; 
     1094    } 
     1095 
     1096    /* Houston, we have packet! Report the packet to transport manager 
     1097     * to be parsed. 
    12381098     */ 
    1239     for (i=0;; ++i) { 
    1240         pj_uint32_t flags; 
    1241  
    1242         /* Houston, we have packet! Report the packet to transport manager 
    1243          * to be parsed. 
     1099    if (status == PJ_SUCCESS) { 
     1100        pj_size_t size_eaten; 
     1101 
     1102        /* Mark this as an activity */ 
     1103        pj_gettimeofday(&tcp->last_activity); 
     1104 
     1105        pj_assert((void*)rdata->pkt_info.packet == data); 
     1106 
     1107        /* Init pkt_info part. */ 
     1108        rdata->pkt_info.len = size; 
     1109        rdata->pkt_info.zero = 0; 
     1110        pj_gettimeofday(&rdata->pkt_info.timestamp); 
     1111 
     1112        /* Report to transport manager. 
     1113         * The transport manager will tell us how many bytes of the packet 
     1114         * have been processed (as valid SIP message). 
    12441115         */ 
    1245         if (bytes_read > 0) { 
    1246             pj_size_t size_eaten; 
    1247  
    1248             /* Mark this as an activity */ 
    1249             pj_gettimeofday(&tcp->last_activity); 
    1250  
    1251             /* Init pkt_info part. */ 
    1252             rdata->pkt_info.len += bytes_read; 
    1253             rdata->pkt_info.zero = 0; 
    1254             pj_gettimeofday(&rdata->pkt_info.timestamp); 
    1255  
    1256             /* Report to transport manager. 
    1257              * The transport manager will tell us how many bytes of the packet 
    1258              * have been processed (as valid SIP message). 
    1259              */ 
    1260             size_eaten =  
    1261                 pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,  
    1262                                            rdata); 
    1263  
    1264             pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); 
    1265  
    1266             /* Move unprocessed data to the front of the buffer */ 
    1267             if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) { 
    1268                 pj_memmove(rdata->pkt_info.packet, 
    1269                            rdata->pkt_info.packet + size_eaten, 
    1270                            rdata->pkt_info.len - size_eaten); 
    1271             } 
    1272              
    1273             rdata->pkt_info.len -= size_eaten; 
    1274  
    1275         } else if (bytes_read == 0) { 
    1276  
    1277             /* Transport is closed */ 
    1278             PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); 
    1279              
    1280             /* We can not destroy the transport since high level objects may 
    1281              * still keep reference to this transport. So we can only  
    1282              * instruct transport manager to gracefully start the shutdown 
    1283              * procedure for this transport. 
    1284              */ 
    1285             if (tcp->close_reason==PJ_SUCCESS)  
    1286                 tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); 
    1287             pjsip_transport_shutdown(&tcp->base); 
    1288  
    1289             return; 
    1290  
    1291         //} else if (bytes_read < 0)  { 
    1292         } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && 
    1293                    -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&  
    1294                    -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))  
    1295         { 
    1296  
    1297             /* Socket error. */ 
    1298             PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset")); 
    1299  
    1300             /* We can not destroy the transport since high level objects may 
    1301              * still keep reference to this transport. So we can only  
    1302              * instruct transport manager to gracefully start the shutdown 
    1303              * procedure for this transport. 
    1304              */ 
    1305             if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read; 
    1306             pjsip_transport_shutdown(&tcp->base); 
    1307  
    1308             return; 
     1116        size_eaten =  
     1117            pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,  
     1118                                       rdata); 
     1119 
     1120        pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); 
     1121 
     1122        /* Move unprocessed data to the front of the buffer */ 
     1123        *remainder = size - size_eaten; 
     1124        if (*remainder > 0 && *remainder != size) { 
     1125            pj_memmove(rdata->pkt_info.packet, 
     1126                       rdata->pkt_info.packet + size_eaten, 
     1127                       *remainder); 
    13091128        } 
    13101129 
    1311         if (i >= MAX_IMMEDIATE_PACKET) { 
    1312             /* Receive quota reached. Force ioqueue_recv() to  
    1313              * return PJ_EPENDING  
    1314              */ 
    1315             flags = PJ_IOQUEUE_ALWAYS_ASYNC; 
    1316         } else { 
    1317             flags = 0; 
    1318         } 
    1319  
    1320         /* Reset pool. */ 
    1321         pj_pool_reset(rdata->tp_info.pool); 
    1322  
    1323         /* Read next packet. */ 
    1324         bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len; 
    1325         rdata->pkt_info.src_addr_len = sizeof(pj_sockaddr_in); 
    1326         status = pj_ioqueue_recv(key, op_key,  
    1327                                  rdata->pkt_info.packet+rdata->pkt_info.len, 
    1328                                  &bytes_read, flags); 
    1329  
    1330         if (status == PJ_SUCCESS) { 
    1331  
    1332             /* Continue loop. */ 
    1333             pj_assert(i < MAX_IMMEDIATE_PACKET); 
    1334  
    1335         } else if (status == PJ_EPENDING) { 
    1336             break; 
    1337  
    1338         } else { 
    1339             /* Socket error */ 
    1340             PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset")); 
    1341  
    1342             /* We can not destroy the transport since high level objects may 
    1343              * still keep reference to this transport. So we can only  
    1344              * instruct transport manager to gracefully start the shutdown 
    1345              * procedure for this transport. 
    1346              */ 
    1347             if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 
    1348             pjsip_transport_shutdown(&tcp->base); 
    1349  
    1350             return; 
    1351         } 
    1352     } 
     1130    } else { 
     1131 
     1132        /* Transport is closed */ 
     1133        PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); 
     1134         
     1135        /* We can not destroy the transport since high level objects may 
     1136         * still keep reference to this transport. So we can only  
     1137         * instruct transport manager to gracefully start the shutdown 
     1138         * procedure for this transport. 
     1139         */ 
     1140        if (tcp->close_reason==PJ_SUCCESS)  
     1141            tcp->close_reason = status; 
     1142        pjsip_transport_shutdown(&tcp->base); 
     1143 
     1144        return PJ_FALSE; 
     1145 
     1146    } 
     1147 
     1148    /* Reset pool. */ 
     1149    pj_pool_reset(rdata->tp_info.pool); 
     1150 
     1151    return PJ_TRUE; 
    13531152} 
    13541153 
     
    13571156 * Callback from ioqueue when asynchronous connect() operation completes. 
    13581157 */ 
    1359 static void on_connect_complete(pj_ioqueue_key_t *key,  
    1360                                 pj_status_t status) 
     1158static pj_bool_t on_connect_complete(pj_activesock_t *asock, 
     1159                                     pj_status_t status) 
    13611160{ 
    13621161    struct tcp_transport *tcp; 
     
    13641163    int addrlen; 
    13651164 
    1366     tcp = (struct tcp_transport*) pj_ioqueue_get_user_data(key); 
     1165    tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock); 
    13671166 
    13681167    /* Mark that pending connect() operation has completed. */ 
     
    13841183            op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 
    13851184 
    1386             on_write_complete(tcp->key, op_key, -status); 
     1185            on_data_sent(tcp->asock, op_key, -status); 
    13871186        } 
    13881187 
     
    13941193        if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 
    13951194        pjsip_transport_shutdown(&tcp->base); 
    1396         return; 
     1195        return PJ_FALSE; 
    13971196    } 
    13981197 
     
    14331232        if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 
    14341233        pjsip_transport_shutdown(&tcp->base); 
    1435         return; 
     1234        return PJ_FALSE; 
    14361235    } 
    14371236 
     
    14471246        pj_gettimeofday(&tcp->last_activity); 
    14481247    } 
     1248 
     1249    return PJ_TRUE; 
    14491250} 
    14501251 
     
    14831284    /* Send the data */ 
    14841285    size = tcp->ka_pkt.slen; 
    1485     status = pj_ioqueue_send(tcp->key, &tcp->ka_op_key.key, 
    1486                              tcp->ka_pkt.ptr, &size, 0); 
     1286    status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key, 
     1287                                tcp->ka_pkt.ptr, &size, 0); 
    14871288 
    14881289    if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
  • pjproject/trunk/pjsip/src/test-pjsip/transport_tcp_test.c

    r2039 r2188  
    8484    /* Check again that reference counter is 1. */ 
    8585    if (pj_atomic_get(tcp->ref_cnt) != 1) 
    86         return -70; 
     86        return -40; 
     87 
     88    /* Load test */ 
     89    if (transport_load_test(url) != 0) 
     90        return -60; 
    8791 
    8892    /* Basic transport's send/receive loopback test. */ 
  • pjproject/trunk/pjsip/src/test-pjsip/transport_test.c

    r2039 r2188  
    642642    return 0; 
    643643} 
     644 
     645/////////////////////////////////////////////////////////////////////////////// 
     646/* 
     647 * Transport load testing 
     648 */ 
     649static pj_bool_t load_on_rx_request(pjsip_rx_data *rdata); 
     650 
     651static struct mod_load_test 
     652{ 
     653    pjsip_module    mod; 
     654    pj_uint32_t     next_seq; 
     655    pj_bool_t       err; 
     656} mod_load =  
     657{ 
     658    { 
     659    NULL, NULL,                         /* prev and next        */ 
     660    { "mod-load-test", 13},             /* Name.                */ 
     661    -1,                                 /* Id                   */ 
     662    PJSIP_MOD_PRIORITY_TSX_LAYER-1,     /* Priority             */ 
     663    NULL,                               /* load()               */ 
     664    NULL,                               /* start()              */ 
     665    NULL,                               /* stop()               */ 
     666    NULL,                               /* unload()             */ 
     667    &load_on_rx_request,                /* on_rx_request()      */ 
     668    NULL,                               /* on_rx_response()     */ 
     669    NULL,                               /* tsx_handler()        */ 
     670    } 
     671}; 
     672 
     673 
     674static pj_bool_t load_on_rx_request(pjsip_rx_data *rdata) 
     675{ 
     676    if (rdata->msg_info.cseq->cseq != mod_load.next_seq) { 
     677        PJ_LOG(1,("THIS_FILE", "    err: expecting cseq %u, got %u",  
     678                  mod_load.next_seq, rdata->msg_info.cseq->cseq)); 
     679        mod_load.err = PJ_TRUE; 
     680        mod_load.next_seq = rdata->msg_info.cseq->cseq + 1; 
     681    } else  
     682        mod_load.next_seq++; 
     683    return PJ_TRUE; 
     684} 
     685 
     686int transport_load_test(char *target_url) 
     687{ 
     688    enum { COUNT = 2000 }; 
     689    unsigned i; 
     690    pj_status_t status; 
     691 
     692    /* exhaust packets */ 
     693    do { 
     694        pj_time_val delay = {1, 0}; 
     695        i = 0; 
     696        pjsip_endpt_handle_events2(endpt, &delay, &i); 
     697    } while (i != 0); 
     698 
     699    PJ_LOG(3,(THIS_FILE, "  transport load test...")); 
     700 
     701    if (mod_load.mod.id == -1) { 
     702        status = pjsip_endpt_register_module( endpt, &mod_load.mod); 
     703        if (status != PJ_SUCCESS) { 
     704            app_perror("error registering module", status); 
     705            return -1; 
     706        } 
     707    } 
     708    mod_load.err = PJ_FALSE; 
     709    mod_load.next_seq = 0; 
     710 
     711    for (i=0; i<COUNT && !mod_load.err; ++i) { 
     712        pj_str_t target, from, call_id; 
     713        pjsip_tx_data *tdata; 
     714 
     715        target = pj_str(target_url); 
     716        from = pj_str("<sip:user@host>"); 
     717        call_id = pj_str("thecallid"); 
     718        status = pjsip_endpt_create_request(endpt, &pjsip_invite_method,  
     719                                            &target, &from,  
     720                                            &target, &from, &call_id,  
     721                                            i, NULL, &tdata ); 
     722        if (status != PJ_SUCCESS) { 
     723            app_perror("error creating request", status); 
     724            goto on_return; 
     725        } 
     726 
     727        status = pjsip_endpt_send_request_stateless(endpt, tdata, NULL, NULL); 
     728        if (status != PJ_SUCCESS) { 
     729            app_perror("error sending request", status); 
     730            goto on_return; 
     731        } 
     732    } 
     733 
     734    do { 
     735        pj_time_val delay = {1, 0}; 
     736        i = 0; 
     737        pjsip_endpt_handle_events2(endpt, &delay, &i); 
     738    } while (i != 0); 
     739 
     740    if (mod_load.next_seq != COUNT) { 
     741        PJ_LOG(1,("THIS_FILE", "    err: expecting %u msg, got only %u",  
     742                  COUNT, mod_load.next_seq)); 
     743        status = -2; 
     744        goto on_return; 
     745    } 
     746 
     747on_return: 
     748    if (mod_load.mod.id != -1) { 
     749        pjsip_endpt_unregister_module( endpt, &mod_load.mod); 
     750        mod_load.mod.id = -1; 
     751    } 
     752    if (status != PJ_SUCCESS || mod_load.err) { 
     753        return -2; 
     754    } 
     755    PJ_LOG(3,(THIS_FILE, "   success")); 
     756    return 0; 
     757} 
     758 
     759 
Note: See TracChangeset for help on using the changeset viewer.