Ignore:
Timestamp:
Jul 29, 2008 8:15:15 PM (16 years ago)
Author:
bennylp
Message:

Initial work for ticket #579: added option to make the active socket sends all the (TCP) data before calling completion callback

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pjlib-test/activesock.c

    r2039 r2185  
    247247 
    248248 
     249 
     250#define SIGNATURE   0xdeadbeef 
     251struct tcp_pkt 
     252{ 
     253    pj_uint32_t signature; 
     254    pj_uint32_t seq; 
     255    char        fill[513]; 
     256}; 
     257 
     258struct tcp_state 
     259{ 
     260    pj_bool_t   err; 
     261    pj_bool_t   sent; 
     262    pj_uint32_t next_recv_seq; 
     263    pj_uint8_t  pkt[600]; 
     264}; 
     265 
     266struct send_key 
     267{ 
     268    pj_ioqueue_op_key_t op_key; 
     269}; 
     270 
     271 
     272static pj_bool_t tcp_on_data_read(pj_activesock_t *asock, 
     273                                  void *data, 
     274                                  pj_size_t size, 
     275                                  pj_status_t status, 
     276                                  pj_size_t *remainder) 
     277{ 
     278    struct tcp_state *st = (struct tcp_state*) pj_activesock_get_user_data(asock); 
     279    char *next = (char*) data; 
     280 
     281    if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
     282        PJ_LOG(1,("", "   err: status=%d", status)); 
     283        st->err = PJ_TRUE; 
     284        return PJ_FALSE; 
     285    } 
     286 
     287    while (size >= sizeof(struct tcp_pkt)) { 
     288        struct tcp_pkt *tcp_pkt = (struct tcp_pkt*) next; 
     289 
     290        if (tcp_pkt->signature != SIGNATURE) { 
     291            PJ_LOG(1,("", "   err: invalid signature at seq=%d",  
     292                          st->next_recv_seq)); 
     293            st->err = PJ_TRUE; 
     294            return PJ_FALSE; 
     295        } 
     296        if (tcp_pkt->seq != st->next_recv_seq) { 
     297            PJ_LOG(1,("", "   err: wrong sequence")); 
     298            st->err = PJ_TRUE; 
     299            return PJ_FALSE; 
     300        } 
     301 
     302        st->next_recv_seq++; 
     303        next += sizeof(struct tcp_pkt); 
     304        size -= sizeof(struct tcp_pkt); 
     305    } 
     306 
     307    if (size) { 
     308        pj_memmove(data, next, size); 
     309        *remainder = size; 
     310    } 
     311 
     312    return PJ_TRUE; 
     313} 
     314 
     315static pj_bool_t tcp_on_data_sent(pj_activesock_t *asock, 
     316                                  pj_ioqueue_op_key_t *op_key, 
     317                                  pj_ssize_t sent) 
     318{ 
     319    struct tcp_state *st=(struct tcp_state*)pj_activesock_get_user_data(asock); 
     320 
     321    st->sent = 1; 
     322 
     323    if (sent < 1) { 
     324        st->err = PJ_TRUE; 
     325        return PJ_FALSE; 
     326    } 
     327 
     328    return PJ_TRUE; 
     329} 
     330 
     331static int tcp_perf_test(void) 
     332{ 
     333    enum { COUNT=100000 }; 
     334    pj_pool_t *pool = NULL; 
     335    pj_ioqueue_t *ioqueue = NULL; 
     336    pj_sock_t sock1=PJ_INVALID_SOCKET, sock2=PJ_INVALID_SOCKET; 
     337    pj_activesock_t *asock1 = NULL, *asock2 = NULL; 
     338    pj_activesock_cb cb; 
     339    struct tcp_state *state1, *state2; 
     340    unsigned i; 
     341    pj_status_t status; 
     342 
     343    pool = pj_pool_create(mem, "tcpperf", 256, 256, NULL); 
     344 
     345    status = app_socketpair(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock1,  
     346                            &sock2); 
     347    if (status != PJ_SUCCESS) { 
     348        status = -100; 
     349        goto on_return; 
     350    } 
     351 
     352    status = pj_ioqueue_create(pool, 4, &ioqueue); 
     353    if (status != PJ_SUCCESS) { 
     354        status = -110; 
     355        goto on_return; 
     356    } 
     357 
     358    pj_bzero(&cb, sizeof(cb)); 
     359    cb.on_data_read = &tcp_on_data_read; 
     360    cb.on_data_sent = &tcp_on_data_sent; 
     361 
     362    state1 = PJ_POOL_ZALLOC_T(pool, struct tcp_state); 
     363    status = pj_activesock_create(pool, sock1, pj_SOCK_STREAM(), NULL, ioqueue, 
     364                                  &cb, state1, &asock1); 
     365    if (status != PJ_SUCCESS) { 
     366        status = -120; 
     367        goto on_return; 
     368    } 
     369 
     370    state2 = PJ_POOL_ZALLOC_T(pool, struct tcp_state); 
     371    status = pj_activesock_create(pool, sock2, pj_SOCK_STREAM(), NULL, ioqueue, 
     372                                  &cb, state2, &asock2); 
     373    if (status != PJ_SUCCESS) { 
     374        status = -130; 
     375        goto on_return; 
     376    } 
     377 
     378    status = pj_activesock_start_read(asock1, pool, 1000, 0); 
     379    if (status != PJ_SUCCESS) { 
     380        status = -140; 
     381        goto on_return; 
     382    } 
     383 
     384    /* Send packet as quickly as possible */ 
     385    for (i=0; i<COUNT && !state1->err && !state2->err; ++i) { 
     386        struct tcp_pkt *pkt; 
     387        struct send_key send_key[2], *op_key; 
     388        pj_ssize_t len; 
     389 
     390        pkt = (struct tcp_pkt*)state2->pkt; 
     391        pkt->signature = SIGNATURE; 
     392        pkt->seq = i; 
     393        pj_memset(pkt->fill, 'a', sizeof(pkt->fill)); 
     394 
     395        op_key = &send_key[i%2]; 
     396        pj_ioqueue_op_key_init(&op_key->op_key, sizeof(*op_key)); 
     397 
     398        state2->sent = PJ_FALSE; 
     399        len = sizeof(*pkt); 
     400        status = pj_activesock_send(asock2, &op_key->op_key, pkt, &len, 0); 
     401        if (status == PJ_EPENDING) { 
     402            do { 
     403                pj_ioqueue_poll(ioqueue, NULL); 
     404            } while (!state2->sent); 
     405        } else if (status != PJ_SUCCESS) { 
     406                PJ_LOG(1,("", "   err: send status=%d", status)); 
     407            status = -180; 
     408            break; 
     409        } else if (status == PJ_SUCCESS) { 
     410            if (len != sizeof(*pkt)) { 
     411                PJ_LOG(1,("", "   err: shouldn't report partial sent")); 
     412                status = -190; 
     413                break; 
     414            } 
     415        } 
     416    } 
     417 
     418    /* Wait until everything has been sent/received */ 
     419    if (state1->next_recv_seq < COUNT) { 
     420        pj_time_val delay = {0, 100}; 
     421        while (pj_ioqueue_poll(ioqueue, &delay) > 0) 
     422            ; 
     423    } 
     424 
     425    if (status == PJ_EPENDING) 
     426        status = PJ_SUCCESS; 
     427 
     428    if (status != 0) 
     429        goto on_return; 
     430 
     431    if (state1->err) { 
     432        status = -183; 
     433        goto on_return; 
     434    } 
     435    if (state2->err) { 
     436        status = -186; 
     437        goto on_return; 
     438    } 
     439    if (state1->next_recv_seq != COUNT) { 
     440        PJ_LOG(3,("", "   err: only %u packets received, expecting %u",  
     441                      state1->next_recv_seq, COUNT)); 
     442        status = -195; 
     443        goto on_return; 
     444    } 
     445 
     446on_return: 
     447    if (asock2) 
     448        pj_activesock_close(asock2); 
     449    if (asock1) 
     450        pj_activesock_close(asock1); 
     451    if (ioqueue) 
     452        pj_ioqueue_destroy(ioqueue); 
     453    if (pool) 
     454        pj_pool_release(pool); 
     455 
     456    return status; 
     457} 
     458 
     459 
     460 
    249461int activesock_test(void) 
    250462{ 
    251463    int ret; 
     464 
     465    ret = (int)&udp_ping_pong_test; 
    252466 
    253467    PJ_LOG(3,("", "..udp ping/pong test")); 
    254468    ret = udp_ping_pong_test(); 
     469    if (ret != 0) 
     470        return ret; 
     471 
     472    PJ_LOG(3,("", "..tcp perf test")); 
     473    ret = tcp_perf_test(); 
    255474    if (ret != 0) 
    256475        return ret; 
Note: See TracChangeset for help on using the changeset viewer.