Changeset 2185


Ignore:
Timestamp:
Jul 29, 2008 8:15:15 PM (11 years ago)
Author:
bennylp
Message:

Initial work for ticket #579: added option to make the active socket sends all the (TCP) data before calling completion callback

Location:
pjproject/trunk/pjlib
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/include/pj/activesock.h

    r2177 r2185  
    202202    int concurrency; 
    203203 
     204    /** 
     205     * If this option is specified, the active socket will make sure that 
     206     * asynchronous send operation with stream oriented socket will only 
     207     * call the callback after all data has been sent. This means that the 
     208     * active socket will automatically resend the remaining data until 
     209     * all data has been sent. 
     210     * 
     211     * Please note that when this option is specified, it is possible that 
     212     * error is reported after partial data has been sent. Also setting 
     213     * this will disable the ioqueue concurrency for the socket. 
     214     * 
     215     * Default value is 1. 
     216     */ 
     217    pj_bool_t whole_data; 
     218 
    204219} pj_activesock_cfg; 
    205220 
  • pjproject/trunk/pjlib/include/pj/ioqueue.h

    r2039 r2185  
    209209{  
    210210    void *internal__[32];           /**< Internal I/O Queue data.   */ 
     211    void *activesock_data;          /**< Active socket data.        */ 
    211212    void *user_data;                /**< Application data.          */ 
    212213} pj_ioqueue_op_key_t; 
  • pjproject/trunk/pjlib/src/pj/activesock.c

    r2177 r2185  
    5252}; 
    5353 
     54struct send_data 
     55{ 
     56    pj_uint8_t          *data; 
     57    pj_ssize_t           len; 
     58    pj_ssize_t           sent; 
     59    unsigned             flags; 
     60}; 
     61 
    5462struct pj_activesock_t 
    5563{ 
    5664    pj_ioqueue_key_t    *key; 
    5765    pj_bool_t            stream_oriented; 
     66    pj_bool_t            whole_data; 
    5867    pj_ioqueue_t        *ioqueue; 
    5968    void                *user_data; 
     
    6170    unsigned             max_loop; 
    6271    pj_activesock_cb     cb; 
     72 
     73    struct send_data     send_data; 
    6374 
    6475    struct read_op      *read_op; 
     
    90101    cfg->async_cnt = 1; 
    91102    cfg->concurrency = -1; 
     103    cfg->whole_data = PJ_TRUE; 
    92104} 
    93105 
     
    116128    asock->stream_oriented = (sock_type == pj_SOCK_STREAM()); 
    117129    asock->async_count = (opt? opt->async_cnt : 1); 
     130    asock->whole_data = (opt? opt->whole_data : 1); 
    118131    asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP; 
    119132    asock->user_data = user_data; 
     
    135148    } 
    136149     
    137     if (opt && opt->concurrency >= 0) { 
     150    if (asock->whole_data) { 
     151        /* Must disable concurrency otherwise there is a race condition */ 
     152        pj_ioqueue_set_concurrency(asock->key, 0); 
     153    } else if (opt && opt->concurrency >= 0) { 
    138154        pj_ioqueue_set_concurrency(asock->key, opt->concurrency); 
    139155    } 
     
    444460 
    445461 
     462static pj_status_t send_remaining(pj_activesock_t *asock,  
     463                                  pj_ioqueue_op_key_t *send_key) 
     464{ 
     465    struct send_data *sd = (struct send_data*)send_key->activesock_data; 
     466    pj_status_t status; 
     467 
     468    do { 
     469        pj_ssize_t size; 
     470 
     471        size = sd->len - sd->sent; 
     472        status = pj_ioqueue_send(asock->key, send_key,  
     473                                 sd->data+sd->sent, &size, sd->flags); 
     474        if (status != PJ_SUCCESS) { 
     475            /* Pending or error */ 
     476            break; 
     477        } 
     478 
     479        sd->sent += size; 
     480        if (sd->sent == sd->len) { 
     481            /* The whole data has been sent. */ 
     482            return PJ_SUCCESS; 
     483        } 
     484 
     485    } while (sd->sent < sd->len); 
     486 
     487    return status; 
     488} 
     489 
     490 
    446491PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, 
    447492                                        pj_ioqueue_op_key_t *send_key, 
     
    452497    PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); 
    453498 
    454     return pj_ioqueue_send(asock->key, send_key, data, size, flags); 
     499    send_key->activesock_data = NULL; 
     500 
     501    if (asock->whole_data) { 
     502        pj_ssize_t whole; 
     503        pj_status_t status; 
     504 
     505        whole = *size; 
     506 
     507        status = pj_ioqueue_send(asock->key, send_key, data, size, flags); 
     508        if (status != PJ_SUCCESS) { 
     509            /* Pending or error */ 
     510            return status; 
     511        } 
     512 
     513        if (*size == whole) { 
     514            /* The whole data has been sent. */ 
     515            return PJ_SUCCESS; 
     516        } 
     517 
     518        /* Data was partially sent */ 
     519        asock->send_data.data = (pj_uint8_t*)data; 
     520        asock->send_data.len = whole; 
     521        asock->send_data.sent = *size; 
     522        asock->send_data.flags = flags; 
     523        send_key->activesock_data = &asock->send_data; 
     524 
     525        /* Try again */ 
     526        status = send_remaining(asock, send_key); 
     527        if (status == PJ_SUCCESS) { 
     528            *size = whole; 
     529        } 
     530        return status; 
     531 
     532    } else { 
     533        return pj_ioqueue_send(asock->key, send_key, data, size, flags); 
     534    } 
    455535} 
    456536 
     
    479559 
    480560    asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); 
     561 
     562    if (bytes_sent > 0 && op_key->activesock_data) { 
     563        /* whole_data is requested. Make sure we send all the data */ 
     564        struct send_data *sd = (struct send_data*)op_key->activesock_data; 
     565 
     566        sd->sent += bytes_sent; 
     567        if (sd->sent == sd->len) { 
     568            /* all has been sent */ 
     569            bytes_sent = sd->sent; 
     570            op_key->activesock_data = NULL; 
     571        } else { 
     572            /* send remaining data */ 
     573            pj_status_t status; 
     574 
     575            status = send_remaining(asock, op_key); 
     576            if (status == PJ_EPENDING) 
     577                return; 
     578            else if (status == PJ_SUCCESS) 
     579                bytes_sent = sd->sent; 
     580            else 
     581                bytes_sent = -status; 
     582 
     583            op_key->activesock_data = NULL; 
     584        } 
     585    }  
    481586 
    482587    if (asock->cb.on_data_sent) { 
  • pjproject/trunk/pjlib/src/pjlib-test/activesock.c

    r2039 r2185  
    247247 
    248248 
     249 
     250#define SIGNATURE   0xdeadbeef 
     251struct tcp_pkt 
     252{ 
     253    pj_uint32_t signature; 
     254    pj_uint32_t seq; 
     255    char        fill[513]; 
     256}; 
     257 
     258struct tcp_state 
     259{ 
     260    pj_bool_t   err; 
     261    pj_bool_t   sent; 
     262    pj_uint32_t next_recv_seq; 
     263    pj_uint8_t  pkt[600]; 
     264}; 
     265 
     266struct send_key 
     267{ 
     268    pj_ioqueue_op_key_t op_key; 
     269}; 
     270 
     271 
     272static pj_bool_t tcp_on_data_read(pj_activesock_t *asock, 
     273                                  void *data, 
     274                                  pj_size_t size, 
     275                                  pj_status_t status, 
     276                                  pj_size_t *remainder) 
     277{ 
     278    struct tcp_state *st = (struct tcp_state*) pj_activesock_get_user_data(asock); 
     279    char *next = (char*) data; 
     280 
     281    if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
     282        PJ_LOG(1,("", "   err: status=%d", status)); 
     283        st->err = PJ_TRUE; 
     284        return PJ_FALSE; 
     285    } 
     286 
     287    while (size >= sizeof(struct tcp_pkt)) { 
     288        struct tcp_pkt *tcp_pkt = (struct tcp_pkt*) next; 
     289 
     290        if (tcp_pkt->signature != SIGNATURE) { 
     291            PJ_LOG(1,("", "   err: invalid signature at seq=%d",  
     292                          st->next_recv_seq)); 
     293            st->err = PJ_TRUE; 
     294            return PJ_FALSE; 
     295        } 
     296        if (tcp_pkt->seq != st->next_recv_seq) { 
     297            PJ_LOG(1,("", "   err: wrong sequence")); 
     298            st->err = PJ_TRUE; 
     299            return PJ_FALSE; 
     300        } 
     301 
     302        st->next_recv_seq++; 
     303        next += sizeof(struct tcp_pkt); 
     304        size -= sizeof(struct tcp_pkt); 
     305    } 
     306 
     307    if (size) { 
     308        pj_memmove(data, next, size); 
     309        *remainder = size; 
     310    } 
     311 
     312    return PJ_TRUE; 
     313} 
     314 
     315static pj_bool_t tcp_on_data_sent(pj_activesock_t *asock, 
     316                                  pj_ioqueue_op_key_t *op_key, 
     317                                  pj_ssize_t sent) 
     318{ 
     319    struct tcp_state *st=(struct tcp_state*)pj_activesock_get_user_data(asock); 
     320 
     321    st->sent = 1; 
     322 
     323    if (sent < 1) { 
     324        st->err = PJ_TRUE; 
     325        return PJ_FALSE; 
     326    } 
     327 
     328    return PJ_TRUE; 
     329} 
     330 
     331static int tcp_perf_test(void) 
     332{ 
     333    enum { COUNT=100000 }; 
     334    pj_pool_t *pool = NULL; 
     335    pj_ioqueue_t *ioqueue = NULL; 
     336    pj_sock_t sock1=PJ_INVALID_SOCKET, sock2=PJ_INVALID_SOCKET; 
     337    pj_activesock_t *asock1 = NULL, *asock2 = NULL; 
     338    pj_activesock_cb cb; 
     339    struct tcp_state *state1, *state2; 
     340    unsigned i; 
     341    pj_status_t status; 
     342 
     343    pool = pj_pool_create(mem, "tcpperf", 256, 256, NULL); 
     344 
     345    status = app_socketpair(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock1,  
     346                            &sock2); 
     347    if (status != PJ_SUCCESS) { 
     348        status = -100; 
     349        goto on_return; 
     350    } 
     351 
     352    status = pj_ioqueue_create(pool, 4, &ioqueue); 
     353    if (status != PJ_SUCCESS) { 
     354        status = -110; 
     355        goto on_return; 
     356    } 
     357 
     358    pj_bzero(&cb, sizeof(cb)); 
     359    cb.on_data_read = &tcp_on_data_read; 
     360    cb.on_data_sent = &tcp_on_data_sent; 
     361 
     362    state1 = PJ_POOL_ZALLOC_T(pool, struct tcp_state); 
     363    status = pj_activesock_create(pool, sock1, pj_SOCK_STREAM(), NULL, ioqueue, 
     364                                  &cb, state1, &asock1); 
     365    if (status != PJ_SUCCESS) { 
     366        status = -120; 
     367        goto on_return; 
     368    } 
     369 
     370    state2 = PJ_POOL_ZALLOC_T(pool, struct tcp_state); 
     371    status = pj_activesock_create(pool, sock2, pj_SOCK_STREAM(), NULL, ioqueue, 
     372                                  &cb, state2, &asock2); 
     373    if (status != PJ_SUCCESS) { 
     374        status = -130; 
     375        goto on_return; 
     376    } 
     377 
     378    status = pj_activesock_start_read(asock1, pool, 1000, 0); 
     379    if (status != PJ_SUCCESS) { 
     380        status = -140; 
     381        goto on_return; 
     382    } 
     383 
     384    /* Send packet as quickly as possible */ 
     385    for (i=0; i<COUNT && !state1->err && !state2->err; ++i) { 
     386        struct tcp_pkt *pkt; 
     387        struct send_key send_key[2], *op_key; 
     388        pj_ssize_t len; 
     389 
     390        pkt = (struct tcp_pkt*)state2->pkt; 
     391        pkt->signature = SIGNATURE; 
     392        pkt->seq = i; 
     393        pj_memset(pkt->fill, 'a', sizeof(pkt->fill)); 
     394 
     395        op_key = &send_key[i%2]; 
     396        pj_ioqueue_op_key_init(&op_key->op_key, sizeof(*op_key)); 
     397 
     398        state2->sent = PJ_FALSE; 
     399        len = sizeof(*pkt); 
     400        status = pj_activesock_send(asock2, &op_key->op_key, pkt, &len, 0); 
     401        if (status == PJ_EPENDING) { 
     402            do { 
     403                pj_ioqueue_poll(ioqueue, NULL); 
     404            } while (!state2->sent); 
     405        } else if (status != PJ_SUCCESS) { 
     406                PJ_LOG(1,("", "   err: send status=%d", status)); 
     407            status = -180; 
     408            break; 
     409        } else if (status == PJ_SUCCESS) { 
     410            if (len != sizeof(*pkt)) { 
     411                PJ_LOG(1,("", "   err: shouldn't report partial sent")); 
     412                status = -190; 
     413                break; 
     414            } 
     415        } 
     416    } 
     417 
     418    /* Wait until everything has been sent/received */ 
     419    if (state1->next_recv_seq < COUNT) { 
     420        pj_time_val delay = {0, 100}; 
     421        while (pj_ioqueue_poll(ioqueue, &delay) > 0) 
     422            ; 
     423    } 
     424 
     425    if (status == PJ_EPENDING) 
     426        status = PJ_SUCCESS; 
     427 
     428    if (status != 0) 
     429        goto on_return; 
     430 
     431    if (state1->err) { 
     432        status = -183; 
     433        goto on_return; 
     434    } 
     435    if (state2->err) { 
     436        status = -186; 
     437        goto on_return; 
     438    } 
     439    if (state1->next_recv_seq != COUNT) { 
     440        PJ_LOG(3,("", "   err: only %u packets received, expecting %u",  
     441                      state1->next_recv_seq, COUNT)); 
     442        status = -195; 
     443        goto on_return; 
     444    } 
     445 
     446on_return: 
     447    if (asock2) 
     448        pj_activesock_close(asock2); 
     449    if (asock1) 
     450        pj_activesock_close(asock1); 
     451    if (ioqueue) 
     452        pj_ioqueue_destroy(ioqueue); 
     453    if (pool) 
     454        pj_pool_release(pool); 
     455 
     456    return status; 
     457} 
     458 
     459 
     460 
    249461int activesock_test(void) 
    250462{ 
    251463    int ret; 
     464 
     465    ret = (int)&udp_ping_pong_test; 
    252466 
    253467    PJ_LOG(3,("", "..udp ping/pong test")); 
    254468    ret = udp_ping_pong_test(); 
     469    if (ret != 0) 
     470        return ret; 
     471 
     472    PJ_LOG(3,("", "..tcp perf test")); 
     473    ret = tcp_perf_test(); 
    255474    if (ret != 0) 
    256475        return ret; 
Note: See TracChangeset for help on using the changeset viewer.