Ignore:
Timestamp:
Jul 29, 2008 8:15:15 PM (16 years ago)
Author:
bennylp
Message:

Initial work for ticket #579: added option to make the active socket sends all the (TCP) data before calling completion callback

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/activesock.c

    r2177 r2185  
    5252}; 
    5353 
     54struct send_data 
     55{ 
     56    pj_uint8_t          *data; 
     57    pj_ssize_t           len; 
     58    pj_ssize_t           sent; 
     59    unsigned             flags; 
     60}; 
     61 
    5462struct pj_activesock_t 
    5563{ 
    5664    pj_ioqueue_key_t    *key; 
    5765    pj_bool_t            stream_oriented; 
     66    pj_bool_t            whole_data; 
    5867    pj_ioqueue_t        *ioqueue; 
    5968    void                *user_data; 
     
    6170    unsigned             max_loop; 
    6271    pj_activesock_cb     cb; 
     72 
     73    struct send_data     send_data; 
    6374 
    6475    struct read_op      *read_op; 
     
    90101    cfg->async_cnt = 1; 
    91102    cfg->concurrency = -1; 
     103    cfg->whole_data = PJ_TRUE; 
    92104} 
    93105 
     
    116128    asock->stream_oriented = (sock_type == pj_SOCK_STREAM()); 
    117129    asock->async_count = (opt? opt->async_cnt : 1); 
     130    asock->whole_data = (opt? opt->whole_data : 1); 
    118131    asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP; 
    119132    asock->user_data = user_data; 
     
    135148    } 
    136149     
    137     if (opt && opt->concurrency >= 0) { 
     150    if (asock->whole_data) { 
     151        /* Must disable concurrency otherwise there is a race condition */ 
     152        pj_ioqueue_set_concurrency(asock->key, 0); 
     153    } else if (opt && opt->concurrency >= 0) { 
    138154        pj_ioqueue_set_concurrency(asock->key, opt->concurrency); 
    139155    } 
     
    444460 
    445461 
     462static pj_status_t send_remaining(pj_activesock_t *asock,  
     463                                  pj_ioqueue_op_key_t *send_key) 
     464{ 
     465    struct send_data *sd = (struct send_data*)send_key->activesock_data; 
     466    pj_status_t status; 
     467 
     468    do { 
     469        pj_ssize_t size; 
     470 
     471        size = sd->len - sd->sent; 
     472        status = pj_ioqueue_send(asock->key, send_key,  
     473                                 sd->data+sd->sent, &size, sd->flags); 
     474        if (status != PJ_SUCCESS) { 
     475            /* Pending or error */ 
     476            break; 
     477        } 
     478 
     479        sd->sent += size; 
     480        if (sd->sent == sd->len) { 
     481            /* The whole data has been sent. */ 
     482            return PJ_SUCCESS; 
     483        } 
     484 
     485    } while (sd->sent < sd->len); 
     486 
     487    return status; 
     488} 
     489 
     490 
    446491PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, 
    447492                                        pj_ioqueue_op_key_t *send_key, 
     
    452497    PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); 
    453498 
    454     return pj_ioqueue_send(asock->key, send_key, data, size, flags); 
     499    send_key->activesock_data = NULL; 
     500 
     501    if (asock->whole_data) { 
     502        pj_ssize_t whole; 
     503        pj_status_t status; 
     504 
     505        whole = *size; 
     506 
     507        status = pj_ioqueue_send(asock->key, send_key, data, size, flags); 
     508        if (status != PJ_SUCCESS) { 
     509            /* Pending or error */ 
     510            return status; 
     511        } 
     512 
     513        if (*size == whole) { 
     514            /* The whole data has been sent. */ 
     515            return PJ_SUCCESS; 
     516        } 
     517 
     518        /* Data was partially sent */ 
     519        asock->send_data.data = (pj_uint8_t*)data; 
     520        asock->send_data.len = whole; 
     521        asock->send_data.sent = *size; 
     522        asock->send_data.flags = flags; 
     523        send_key->activesock_data = &asock->send_data; 
     524 
     525        /* Try again */ 
     526        status = send_remaining(asock, send_key); 
     527        if (status == PJ_SUCCESS) { 
     528            *size = whole; 
     529        } 
     530        return status; 
     531 
     532    } else { 
     533        return pj_ioqueue_send(asock->key, send_key, data, size, flags); 
     534    } 
    455535} 
    456536 
     
    479559 
    480560    asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); 
     561 
     562    if (bytes_sent > 0 && op_key->activesock_data) { 
     563        /* whole_data is requested. Make sure we send all the data */ 
     564        struct send_data *sd = (struct send_data*)op_key->activesock_data; 
     565 
     566        sd->sent += bytes_sent; 
     567        if (sd->sent == sd->len) { 
     568            /* all has been sent */ 
     569            bytes_sent = sd->sent; 
     570            op_key->activesock_data = NULL; 
     571        } else { 
     572            /* send remaining data */ 
     573            pj_status_t status; 
     574 
     575            status = send_remaining(asock, op_key); 
     576            if (status == PJ_EPENDING) 
     577                return; 
     578            else if (status == PJ_SUCCESS) 
     579                bytes_sent = sd->sent; 
     580            else 
     581                bytes_sent = -status; 
     582 
     583            op_key->activesock_data = NULL; 
     584        } 
     585    }  
    481586 
    482587    if (asock->cb.on_data_sent) { 
Note: See TracChangeset for help on using the changeset viewer.