Ignore:
Timestamp:
Nov 6, 2005 1:32:11 PM (18 years ago)
Author:
bennylp
Message:

Put common ioqueue functionalities in separate file to be used by both select() and epoll

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/main/pjlib/src/pj/ioqueue_select.c

    r11 r12  
    2323 
    2424/* 
     25 * Include declaration from common abstraction. 
     26 */ 
     27#include "ioqueue_common_abs.h" 
     28 
     29/* 
    2530 * ISSUES with ioqueue_select() 
    2631 * 
     
    5964 
    6065 
    61  
    62  
    6366/* 
    6467 * During debugging build, VALIDATE_FD_SET is set. 
     
    7174#endif 
    7275 
    73 struct generic_operation 
    74 { 
    75     PJ_DECL_LIST_MEMBER(struct generic_operation); 
    76     pj_ioqueue_operation_e  op; 
     76/* 
     77 * This describes each key. 
     78 */ 
     79struct pj_ioqueue_key_t 
     80{ 
     81    DECLARE_COMMON_KEY 
    7782}; 
    7883 
    79 struct read_operation 
    80 { 
    81     PJ_DECL_LIST_MEMBER(struct read_operation); 
    82     pj_ioqueue_operation_e  op; 
    83  
    84     void                   *buf; 
    85     pj_size_t               size; 
    86     unsigned                flags; 
    87     pj_sockaddr_t          *rmt_addr; 
    88     int                    *rmt_addrlen; 
    89 }; 
    90  
    91 struct write_operation 
    92 { 
    93     PJ_DECL_LIST_MEMBER(struct write_operation); 
    94     pj_ioqueue_operation_e  op; 
    95  
    96     char                   *buf; 
    97     pj_size_t               size; 
    98     pj_ssize_t              written; 
    99     unsigned                flags; 
    100     pj_sockaddr_in          rmt_addr; 
    101     int                     rmt_addrlen; 
    102 }; 
    103  
    104 #if PJ_HAS_TCP 
    105 struct accept_operation 
    106 { 
    107     PJ_DECL_LIST_MEMBER(struct accept_operation); 
    108     pj_ioqueue_operation_e  op; 
    109  
    110     pj_sock_t              *accept_fd; 
    111     pj_sockaddr_t          *local_addr; 
    112     pj_sockaddr_t          *rmt_addr; 
    113     int                    *addrlen; 
    114 }; 
    115 #endif 
    116  
    117 union operation_key 
    118 { 
    119     struct generic_operation generic; 
    120     struct read_operation    read; 
    121     struct write_operation   write; 
    122 #if PJ_HAS_TCP 
    123     struct accept_operation  accept; 
    124 #endif 
    125 }; 
    126  
    127 /* 
    128  * This describes each key. 
    129  */ 
    130 struct pj_ioqueue_key_t 
    131 { 
    132     PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); 
    133     pj_ioqueue_t           *ioqueue; 
    134     pj_sock_t               fd; 
    135     void                   *user_data; 
    136     pj_ioqueue_callback     cb; 
    137     int                     connecting; 
    138     struct read_operation   read_list; 
    139     struct write_operation  write_list; 
    140 #if PJ_HAS_TCP 
    141     struct accept_operation accept_list; 
    142 #endif 
    143 }; 
    144  
    14584/* 
    14685 * This describes the I/O queue itself. 
     
    14887struct pj_ioqueue_t 
    14988{ 
    150     pj_lock_t          *lock; 
    151     pj_bool_t           auto_delete_lock; 
     89    DECLARE_COMMON_IOQUEUE 
     90 
    15291    unsigned            max, count; 
    15392    pj_ioqueue_key_t    key_list; 
     
    15998}; 
    16099 
     100/* Include implementation for common abstraction after we declare 
     101 * pj_ioqueue_key_t and pj_ioqueue_t. 
     102 */ 
     103#include "ioqueue_common_abs.c" 
     104 
    161105/* 
    162106 * pj_ioqueue_create() 
     
    169113{ 
    170114    pj_ioqueue_t *ioqueue; 
     115    pj_lock_t *lock; 
    171116    pj_status_t rc; 
    172117 
     
    181126 
    182127    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 
     128 
     129    ioqueue_init(ioqueue); 
     130 
    183131    ioqueue->max = max_fd; 
    184132    ioqueue->count = 0; 
     
    190138    pj_list_init(&ioqueue->key_list); 
    191139 
    192     rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock); 
     140    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); 
    193141    if (rc != PJ_SUCCESS) 
    194142        return rc; 
    195143 
    196     ioqueue->auto_delete_lock = PJ_TRUE; 
     144    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); 
     145    if (rc != PJ_SUCCESS) 
     146        return rc; 
    197147 
    198148    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); 
     
    209159PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 
    210160{ 
    211     pj_status_t rc = PJ_SUCCESS; 
    212  
    213161    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
    214162 
    215163    pj_lock_acquire(ioqueue->lock); 
    216  
    217     if (ioqueue->auto_delete_lock) 
    218         rc = pj_lock_destroy(ioqueue->lock); 
    219  
    220     return rc; 
     164    return ioqueue_destroy(ioqueue); 
    221165} 
    222166 
     
    261205    /* Create key. */ 
    262206    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
    263     key->ioqueue = ioqueue; 
    264     key->fd = sock; 
    265     key->user_data = user_data; 
    266     pj_list_init(&key->read_list); 
    267     pj_list_init(&key->write_list); 
    268 #if PJ_HAS_TCP 
    269     pj_list_init(&key->accept_list); 
    270 #endif 
    271  
    272     /* Save callback. */ 
    273     pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 
     207    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 
     208    if (rc != PJ_SUCCESS) 
     209        return rc; 
    274210 
    275211    /* Register */ 
     
    309245#endif 
    310246 
    311     pj_lock_release(ioqueue->lock); 
    312     return PJ_SUCCESS; 
    313 } 
    314  
    315 /* 
    316  * pj_ioqueue_get_user_data() 
    317  * 
    318  * Obtain value associated with a key. 
    319  */ 
    320 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) 
    321 { 
    322     PJ_ASSERT_RETURN(key != NULL, NULL); 
    323     return key->user_data; 
    324 } 
    325  
    326  
    327 /* 
    328  * pj_ioqueue_set_user_data() 
    329  */ 
    330 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 
    331                                               void *user_data, 
    332                                               void **old_data) 
    333 { 
    334     PJ_ASSERT_RETURN(key, PJ_EINVAL); 
    335  
    336     if (old_data) 
    337         *old_data = key->user_data; 
    338     key->user_data = user_data; 
     247    /* ioqueue_destroy may try to acquire key's mutex. 
     248     * Since normally the order of locking is to lock key's mutex first 
     249     * then ioqueue's mutex, ioqueue_destroy may deadlock unless we 
     250     * release ioqueue's mutex first. 
     251     */ 
     252    pj_lock_release(ioqueue->lock); 
     253 
     254    /* Destroy the key. */ 
     255    ioqueue_destroy_key(key); 
    339256 
    340257    return PJ_SUCCESS; 
     
    393310 
    394311 
     312/* ioqueue_remove_from_set() 
     313 * This function is called from ioqueue_dispatch_event() to instruct 
     314 * the ioqueue to remove the specified descriptor from ioqueue's descriptor 
     315 * set for the specified event. 
     316 */ 
     317static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, 
     318                                     pj_sock_t fd,  
     319                                     enum ioqueue_event_type event_type) 
     320{ 
     321    pj_lock_acquire(ioqueue->lock); 
     322 
     323    if (event_type == READABLE_EVENT) 
     324        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); 
     325    else if (event_type == WRITEABLE_EVENT) 
     326        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); 
     327    else if (event_type == EXCEPTION_EVENT) 
     328        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); 
     329    else 
     330        pj_assert(0); 
     331 
     332    pj_lock_release(ioqueue->lock); 
     333} 
     334 
     335/* 
     336 * ioqueue_add_to_set() 
     337 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc 
     338 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor 
     339 * set for the specified event. 
     340 */ 
     341static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, 
     342                                pj_sock_t fd, 
     343                                enum ioqueue_event_type event_type ) 
     344{ 
     345    pj_lock_acquire(ioqueue->lock); 
     346 
     347    if (event_type == READABLE_EVENT) 
     348        PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); 
     349    else if (event_type == WRITEABLE_EVENT) 
     350        PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); 
     351    else if (event_type == EXCEPTION_EVENT) 
     352        PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); 
     353    else 
     354        pj_assert(0); 
     355 
     356    pj_lock_release(ioqueue->lock); 
     357} 
     358 
    395359/* 
    396360 * pj_ioqueue_poll() 
     
    413377{ 
    414378    pj_fd_set_t rfdset, wfdset, xfdset; 
    415     int count; 
     379    int count, counter; 
    416380    pj_ioqueue_key_t *h; 
     381    struct event 
     382    { 
     383        pj_ioqueue_key_t    *key; 
     384        enum event_type      event_type; 
     385    } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
    417386 
    418387    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
     
    454423    if (count <= 0) 
    455424        return count; 
    456  
    457     /* Lock ioqueue again before scanning for signalled sockets.  
    458      * We must strictly use recursive mutex since application may invoke 
    459      * the ioqueue again inside the callback. 
     425    else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) 
     426        count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; 
     427 
     428    /* Scan descriptor sets for event and add the events in the event 
     429     * array to be processed later in this function. We do this so that 
     430     * events can be processed in parallel without holding ioqueue lock. 
    460431     */ 
    461432    pj_lock_acquire(ioqueue->lock); 
     433 
     434    counter = 0; 
    462435 
    463436    /* Scan for writable sockets first to handle piggy-back data 
     
    465438     */ 
    466439    h = ioqueue->key_list.next; 
    467 do_writable_scan: 
    468     for ( ; h!=&ioqueue->key_list; h = h->next) { 
    469         if ( (!pj_list_empty(&h->write_list) || h->connecting) 
     440    for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { 
     441        if ( (key_has_pending_write(h) || key_has_pending_connect(h)) 
    470442             && PJ_FD_ISSET(h->fd, &wfdset)) 
    471443        { 
    472             break; 
     444            event[counter].key = h; 
     445            event[counter].event_type = WRITEABLE_EVENT; 
     446            ++counter; 
    473447        } 
    474     } 
    475     if (h != &ioqueue->key_list) { 
    476         pj_assert(!pj_list_empty(&h->write_list) || h->connecting); 
    477  
    478 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 
    479         if (h->connecting) { 
    480             /* Completion of connect() operation */ 
    481             pj_ssize_t bytes_transfered; 
    482  
    483 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) 
    484             /* from connect(2):  
    485              * On Linux, use getsockopt to read the SO_ERROR option at 
    486              * level SOL_SOCKET to determine whether connect() completed 
    487              * successfully (if SO_ERROR is zero). 
    488              */ 
    489             int value; 
    490             socklen_t vallen = sizeof(value); 
    491             int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,  
    492                                    &value, &vallen); 
    493             if (gs_rc != 0) { 
    494                 /* Argh!! What to do now???  
    495                  * Just indicate that the socket is connected. The 
    496                  * application will get error as soon as it tries to use 
    497                  * the socket to send/receive. 
    498                  */ 
    499                 bytes_transfered = 0; 
    500             } else { 
    501                 bytes_transfered = value; 
    502             } 
    503 #elif defined(PJ_WIN32) && PJ_WIN32!=0 
    504             bytes_transfered = 0; /* success */ 
    505 #else 
    506             /* Excellent information in D.J. Bernstein page: 
    507              * http://cr.yp.to/docs/connect.html 
    508              * 
    509              * Seems like the most portable way of detecting connect() 
    510              * failure is to call getpeername(). If socket is connected, 
    511              * getpeername() will return 0. If the socket is not connected, 
    512              * it will return ENOTCONN, and read(fd, &ch, 1) will produce 
    513              * the right errno through error slippage. This is a combination 
    514              * of suggestions from Douglas C. Schmidt and Ken Keys. 
    515              */ 
    516             int gp_rc; 
    517             struct sockaddr_in addr; 
    518             socklen_t addrlen = sizeof(addr); 
    519  
    520             gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen); 
    521             bytes_transfered = gp_rc; 
    522 #endif 
    523  
    524             /* Clear operation. */ 
    525             h->connecting = 0; 
    526             PJ_FD_CLR(h->fd, &ioqueue->wfdset); 
    527             PJ_FD_CLR(h->fd, &ioqueue->xfdset); 
    528  
    529             /* Call callback. */ 
    530             if (h->cb.on_connect_complete) 
    531                 (*h->cb.on_connect_complete)(h, bytes_transfered); 
    532  
    533             /* Re-scan writable sockets. */ 
    534             goto do_writable_scan; 
    535  
    536         } else  
    537 #endif /* PJ_HAS_TCP */ 
    538         { 
    539             /* Socket is writable. */ 
    540             struct write_operation *write_op; 
    541             pj_ssize_t sent; 
    542             pj_status_t send_rc; 
    543  
    544             /* Get the first in the queue. */ 
    545             write_op = h->write_list.next; 
    546  
    547             /* Send the data. */ 
    548             sent = write_op->size - write_op->written; 
    549             if (write_op->op == PJ_IOQUEUE_OP_SEND) { 
    550                 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, 
    551                                        &sent, write_op->flags); 
    552             } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { 
    553                 send_rc = pj_sock_sendto(h->fd,  
    554                                          write_op->buf+write_op->written, 
    555                                          &sent, write_op->flags, 
    556                                          &write_op->rmt_addr,  
    557                                          write_op->rmt_addrlen); 
    558             } else { 
    559                 pj_assert(!"Invalid operation type!"); 
    560                 send_rc = PJ_EBUG; 
    561             } 
    562  
    563             if (send_rc == PJ_SUCCESS) { 
    564                 write_op->written += sent; 
    565             } else { 
    566                 pj_assert(send_rc > 0); 
    567                 write_op->written = -send_rc; 
    568             } 
    569  
    570             /* In any case we don't need to process this descriptor again. */ 
    571             PJ_FD_CLR(h->fd, &wfdset); 
    572  
    573             /* Are we finished with this buffer? */ 
    574             if (send_rc!=PJ_SUCCESS ||  
    575                 write_op->written == (pj_ssize_t)write_op->size)  
    576             { 
    577                 pj_list_erase(write_op); 
    578  
    579                 /* Clear operation if there's no more data to send. */ 
    580                 if (pj_list_empty(&h->write_list)) 
    581                     PJ_FD_CLR(h->fd, &ioqueue->wfdset); 
    582  
    583                 /* Call callback. */ 
    584                 if (h->cb.on_write_complete) { 
    585                     (*h->cb.on_write_complete)(h,  
    586                                                (pj_ioqueue_op_key_t*)write_op, 
    587                                                write_op->written); 
    588                 } 
    589             } 
    590              
    591             /* Re-scan writable sockets. */ 
    592             goto do_writable_scan; 
    593         } 
    594     } 
    595  
    596     /* Scan for readable socket. */ 
    597     h = ioqueue->key_list.next; 
    598 do_readable_scan: 
    599     for ( ; h!=&ioqueue->key_list; h = h->next) { 
    600         if ((!pj_list_empty(&h->read_list)  
     448 
     449        /* Scan for readable socket. */ 
     450        if ((key_has_pending_read(h) || key_has_pending_accept(h)) 
     451            && PJ_FD_ISSET(h->fd, &rfdset)) 
     452        { 
     453            event[counter].key = h; 
     454            event[counter].event_type = READABLE_EVENT; 
     455            ++counter;        } 
     456 
    601457#if PJ_HAS_TCP 
    602              || !pj_list_empty(&h->accept_list) 
    603 #endif 
    604             ) && PJ_FD_ISSET(h->fd, &rfdset)) 
    605         { 
    606             break; 
     458        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { 
     459            event[counter].key = h; 
     460            event[counter].event_type = EXCEPTION_EVENT; 
     461            ++counter; 
    607462        } 
    608     } 
    609     if (h != &ioqueue->key_list) { 
    610         pj_status_t rc; 
    611  
    612 #if PJ_HAS_TCP 
    613         pj_assert(!pj_list_empty(&h->read_list) ||  
    614                   !pj_list_empty(&h->accept_list)); 
    615 #else 
    616         pj_assert(!pj_list_empty(&h->read_list)); 
    617 #endif 
    618          
    619 #       if PJ_HAS_TCP 
    620         if (!pj_list_empty(&h->accept_list)) { 
    621  
    622             struct accept_operation *accept_op; 
    623              
    624             /* Get one accept operation from the list. */ 
    625             accept_op = h->accept_list.next; 
    626             pj_list_erase(accept_op); 
    627  
    628             rc=pj_sock_accept(h->fd, accept_op->accept_fd,  
    629                               accept_op->rmt_addr, accept_op->addrlen); 
    630             if (rc==PJ_SUCCESS && accept_op->local_addr) { 
    631                 rc = pj_sock_getsockname(*accept_op->accept_fd,  
    632                                          accept_op->local_addr, 
    633                                          accept_op->addrlen); 
    634             } 
    635  
    636             /* Clear bit in fdset if there is no more pending accept */ 
    637             if (pj_list_empty(&h->accept_list)) 
    638                 PJ_FD_CLR(h->fd, &ioqueue->rfdset); 
    639  
    640             /* Call callback. */ 
    641             if (h->cb.on_accept_complete) 
    642                 (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, 
    643                                             *accept_op->accept_fd, rc); 
    644  
    645             /* Re-scan readable sockets. */ 
    646             goto do_readable_scan; 
     463#endif 
     464    } 
     465 
     466    pj_lock_release(ioqueue->lock); 
     467 
     468    count = counter; 
     469 
     470    /* Now process all events. The dispatch functions will take care 
     471     * of locking in each of the key 
     472     */ 
     473    for (counter=0; counter<count; ++counter) { 
     474        switch (event[counter].event_type) { 
     475        case READABLE_EVENT: 
     476            ioqueue_dispatch_read_event(ioqueue, event[counter].key); 
     477            break; 
     478        case WRITEABLE_EVENT: 
     479            ioqueue_dispatch_write_event(ioqueue, event[counter].key); 
     480            break; 
     481        case EXCEPTION_EVENT: 
     482            ioqueue_dispatch_exception_event(ioqueue, event[counter].key); 
     483            break; 
     484        case NO_EVENT: 
     485        default: 
     486            pj_assert(!"Invalid event!"); 
     487            break; 
    647488        } 
    648         else { 
    649 #       endif 
    650             struct read_operation *read_op; 
    651             pj_ssize_t bytes_read; 
    652  
    653             pj_assert(!pj_list_empty(&h->read_list)); 
    654  
    655             /* Get one pending read operation from the list. */ 
    656             read_op = h->read_list.next; 
    657             pj_list_erase(read_op); 
    658  
    659             bytes_read = read_op->size; 
    660  
    661             if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { 
    662                 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, 
    663                                       read_op->rmt_addr,  
    664                                       read_op->rmt_addrlen); 
    665             } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { 
    666                 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 
    667             } else { 
    668                 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); 
    669                 /* 
    670                  * User has specified pj_ioqueue_read(). 
    671                  * On Win32, we should do ReadFile(). But because we got 
    672                  * here because of select() anyway, user must have put a 
    673                  * socket descriptor on h->fd, which in this case we can 
    674                  * just call pj_sock_recv() instead of ReadFile(). 
    675                  * On Unix, user may put a file in h->fd, so we'll have 
    676                  * to call read() here. 
    677                  * This may not compile on systems which doesn't have  
    678                  * read(). That's why we only specify PJ_LINUX here so 
    679                  * that error is easier to catch. 
    680                  */ 
    681 #               if defined(PJ_WIN32) && PJ_WIN32 != 0 
    682                 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 
    683                 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, 
    684                 //              &bytes_read, NULL); 
    685 #               elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) 
    686                 bytes_read = read(h->fd, h->rd_buf, bytes_read); 
    687                 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 
    688 #               elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 
    689                 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); 
    690                 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 
    691 #               else 
    692 #               error "Implement read() for this platform!" 
    693 #               endif 
    694             } 
    695              
    696             if (rc != PJ_SUCCESS) { 
    697 #               if defined(PJ_WIN32) && PJ_WIN32 != 0 
    698                 /* On Win32, for UDP, WSAECONNRESET on the receive side  
    699                  * indicates that previous sending has triggered ICMP Port  
    700                  * Unreachable message. 
    701                  * But we wouldn't know at this point which one of previous  
    702                  * key that has triggered the error, since UDP socket can 
    703                  * be shared! 
    704                  * So we'll just ignore it! 
    705                  */ 
    706  
    707                 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { 
    708                     //PJ_LOG(4,(THIS_FILE,  
    709                     //          "Ignored ICMP port unreach. on key=%p", h)); 
    710                 } 
    711 #               endif 
    712  
    713                 /* In any case we would report this to caller. */ 
    714                 bytes_read = -rc; 
    715             } 
    716  
    717             /* Clear fdset if there is no pending read. */ 
    718             if (pj_list_empty(&h->read_list)) 
    719                 PJ_FD_CLR(h->fd, &ioqueue->rfdset); 
    720  
    721             /* In any case clear from temporary set. */ 
    722             PJ_FD_CLR(h->fd, &rfdset); 
    723  
    724             /* Call callback. */ 
    725             if (h->cb.on_read_complete) 
    726                 (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, 
    727                                           bytes_read); 
    728  
    729             /* Re-scan readable sockets. */ 
    730             goto do_readable_scan; 
    731  
    732         } 
    733     } 
    734  
    735 #if PJ_HAS_TCP 
    736     /* Scan for exception socket for TCP connection error. */ 
    737     h = ioqueue->key_list.next; 
    738 do_except_scan: 
    739     for ( ; h!=&ioqueue->key_list; h = h->next) { 
    740         if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset)) 
    741             break; 
    742     } 
    743     if (h != &ioqueue->key_list) { 
    744  
    745         pj_assert(h->connecting); 
    746  
    747         /* Clear operation. */ 
    748         h->connecting = 0; 
    749         PJ_FD_CLR(h->fd, &ioqueue->wfdset); 
    750         PJ_FD_CLR(h->fd, &ioqueue->xfdset); 
    751         PJ_FD_CLR(h->fd, &wfdset); 
    752         PJ_FD_CLR(h->fd, &xfdset); 
    753  
    754         /* Call callback. */ 
    755         if (h->cb.on_connect_complete) 
    756             (*h->cb.on_connect_complete)(h, -1); 
    757  
    758         /* Re-scan exception list. */ 
    759         goto do_except_scan; 
    760     } 
    761 #endif  /* PJ_HAS_TCP */ 
    762  
    763     /* Shouldn't happen. */ 
    764     /* For strange reason on WinXP select() can return 1 while there is no 
    765      * pj_fd_set_t signaled. */ 
    766     /* pj_assert(0); */ 
    767  
    768     //count = 0; 
    769  
    770     pj_lock_release(ioqueue->lock); 
     489    } 
     490 
    771491    return count; 
    772492} 
    773493 
    774 /* 
    775  * pj_ioqueue_recv() 
    776  * 
    777  * Start asynchronous recv() from the socket. 
    778  */ 
    779 PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key, 
    780                                       pj_ioqueue_op_key_t *op_key, 
    781                                       void *buffer, 
    782                                       pj_ssize_t *length, 
    783                                       unsigned flags ) 
    784 { 
    785     pj_status_t status; 
    786     pj_ssize_t size; 
    787     struct read_operation *read_op; 
    788     pj_ioqueue_t *ioqueue; 
    789  
    790     PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 
    791     PJ_CHECK_STACK(); 
    792  
    793     /* Try to see if there's data immediately available.  
    794      */ 
    795     size = *length; 
    796     status = pj_sock_recv(key->fd, buffer, &size, flags); 
    797     if (status == PJ_SUCCESS) { 
    798         /* Yes! Data is available! */ 
    799         *length = size; 
    800         return PJ_SUCCESS; 
    801     } else { 
    802         /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
    803          * the error to caller. 
    804          */ 
    805         if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) 
    806             return status; 
    807     } 
    808  
    809     /* 
    810      * No data is immediately available. 
    811      * Must schedule asynchronous operation to the ioqueue. 
    812      */ 
    813     ioqueue = key->ioqueue; 
    814     pj_lock_acquire(ioqueue->lock); 
    815  
    816     read_op = (struct read_operation*)op_key; 
    817  
    818     read_op->op = PJ_IOQUEUE_OP_RECV; 
    819     read_op->buf = buffer; 
    820     read_op->size = *length; 
    821     read_op->flags = flags; 
    822  
    823     pj_list_insert_before(&key->read_list, read_op); 
    824     PJ_FD_SET(key->fd, &ioqueue->rfdset); 
    825  
    826     pj_lock_release(ioqueue->lock); 
    827     return PJ_EPENDING; 
    828 } 
    829  
    830 /* 
    831  * pj_ioqueue_recvfrom() 
    832  * 
    833  * Start asynchronous recvfrom() from the socket. 
    834  */ 
    835 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 
    836                                          pj_ioqueue_op_key_t *op_key, 
    837                                          void *buffer, 
    838                                          pj_ssize_t *length, 
    839                                          unsigned flags, 
    840                                          pj_sockaddr_t *addr, 
    841                                          int *addrlen) 
    842 { 
    843     pj_status_t status; 
    844     pj_ssize_t size; 
    845     struct read_operation *read_op; 
    846     pj_ioqueue_t *ioqueue; 
    847  
    848     PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 
    849     PJ_CHECK_STACK(); 
    850  
    851     /* Try to see if there's data immediately available.  
    852      */ 
    853     size = *length; 
    854     status = pj_sock_recvfrom(key->fd, buffer, &size, flags, 
    855                               addr, addrlen); 
    856     if (status == PJ_SUCCESS) { 
    857         /* Yes! Data is available! */ 
    858         *length = size; 
    859         return PJ_SUCCESS; 
    860     } else { 
    861         /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
    862          * the error to caller. 
    863          */ 
    864         if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) 
    865             return status; 
    866     } 
    867  
    868     /* 
    869      * No data is immediately available. 
    870      * Must schedule asynchronous operation to the ioqueue. 
    871      */ 
    872     ioqueue = key->ioqueue; 
    873     pj_lock_acquire(ioqueue->lock); 
    874  
    875     read_op = (struct read_operation*)op_key; 
    876  
    877     read_op->op = PJ_IOQUEUE_OP_RECV_FROM; 
    878     read_op->buf = buffer; 
    879     read_op->size = *length; 
    880     read_op->flags = flags; 
    881     read_op->rmt_addr = addr; 
    882     read_op->rmt_addrlen = addrlen; 
    883  
    884     pj_list_insert_before(&key->read_list, read_op); 
    885     PJ_FD_SET(key->fd, &ioqueue->rfdset); 
    886  
    887     pj_lock_release(ioqueue->lock); 
    888     return PJ_EPENDING; 
    889 } 
    890  
    891 /* 
    892  * pj_ioqueue_send() 
    893  * 
    894  * Start asynchronous send() to the descriptor. 
    895  */ 
    896 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 
    897                                      pj_ioqueue_op_key_t *op_key, 
    898                                      const void *data, 
    899                                      pj_ssize_t *length, 
    900                                      unsigned flags) 
    901 { 
    902     pj_ioqueue_t *ioqueue; 
    903     struct write_operation *write_op; 
    904     pj_status_t status; 
    905     pj_ssize_t sent; 
    906  
    907     PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 
    908     PJ_CHECK_STACK(); 
    909  
    910     /* Fast track: 
    911      *   Try to send data immediately, only if there's no pending write! 
    912      * Note: 
    913      *  We are speculating that the list is empty here without properly 
    914      *  acquiring ioqueue's mutex first. This is intentional, to maximize 
    915      *  performance via parallelism. 
    916      * 
    917      *  This should be safe, because: 
    918      *      - by convention, we require caller to make sure that the 
    919      *        key is not unregistered while other threads are invoking 
    920      *        an operation on the same key. 
    921      *      - pj_list_empty() is safe to be invoked by multiple threads, 
    922      *        even when other threads are modifying the list. 
    923      */ 
    924     if (pj_list_empty(&key->write_list)) { 
    925         /* 
    926          * See if data can be sent immediately. 
    927          */ 
    928         sent = *length; 
    929         status = pj_sock_send(key->fd, data, &sent, flags); 
    930         if (status == PJ_SUCCESS) { 
    931             /* Success! */ 
    932             *length = sent; 
    933             return PJ_SUCCESS; 
    934         } else { 
    935             /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
    936              * the error to caller. 
    937              */ 
    938             if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 
    939                 return status; 
    940             } 
    941         } 
    942     } 
    943  
    944     /* 
    945      * Schedule asynchronous send. 
    946      */ 
    947     ioqueue = key->ioqueue; 
    948     pj_lock_acquire(ioqueue->lock); 
    949  
    950     write_op = (struct write_operation*)op_key; 
    951     write_op->op = PJ_IOQUEUE_OP_SEND; 
    952     write_op->buf = NULL; 
    953     write_op->size = *length; 
    954     write_op->written = 0; 
    955     write_op->flags = flags; 
    956      
    957     pj_list_insert_before(&key->write_list, write_op); 
    958     PJ_FD_SET(key->fd, &ioqueue->wfdset); 
    959  
    960     pj_lock_release(ioqueue->lock); 
    961  
    962     return PJ_EPENDING; 
    963 } 
    964  
    965  
    966 /* 
    967  * pj_ioqueue_sendto() 
    968  * 
    969  * Start asynchronous write() to the descriptor. 
    970  */ 
    971 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 
    972                                        pj_ioqueue_op_key_t *op_key, 
    973                                        const void *data, 
    974                                        pj_ssize_t *length, 
    975                                        unsigned flags, 
    976                                        const pj_sockaddr_t *addr, 
    977                                        int addrlen) 
    978 { 
    979     pj_ioqueue_t *ioqueue; 
    980     struct write_operation *write_op; 
    981     pj_status_t status; 
    982     pj_ssize_t sent; 
    983  
    984     PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 
    985     PJ_CHECK_STACK(); 
    986  
    987     /* Fast track: 
    988      *   Try to send data immediately, only if there's no pending write! 
    989      * Note: 
    990      *  We are speculating that the list is empty here without properly 
    991      *  acquiring ioqueue's mutex first. This is intentional, to maximize 
    992      *  performance via parallelism. 
    993      * 
    994      *  This should be safe, because: 
    995      *      - by convention, we require caller to make sure that the 
    996      *        key is not unregistered while other threads are invoking 
    997      *        an operation on the same key. 
    998      *      - pj_list_empty() is safe to be invoked by multiple threads, 
    999      *        even when other threads are modifying the list. 
    1000      */ 
    1001     if (pj_list_empty(&key->write_list)) { 
    1002         /* 
    1003          * See if data can be sent immediately. 
    1004          */ 
    1005         sent = *length; 
    1006         status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); 
    1007         if (status == PJ_SUCCESS) { 
    1008             /* Success! */ 
    1009             *length = sent; 
    1010             return PJ_SUCCESS; 
    1011         } else { 
    1012             /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
    1013              * the error to caller. 
    1014              */ 
    1015             if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 
    1016                 return status; 
    1017             } 
    1018         } 
    1019     } 
    1020  
    1021     /* 
    1022      * Check that address storage can hold the address parameter. 
    1023      */ 
    1024     PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG); 
    1025  
    1026     /* 
    1027      * Schedule asynchronous send. 
    1028      */ 
    1029     ioqueue = key->ioqueue; 
    1030     pj_lock_acquire(ioqueue->lock); 
    1031  
    1032     write_op = (struct write_operation*)op_key; 
    1033     write_op->op = PJ_IOQUEUE_OP_SEND_TO; 
    1034     write_op->buf = NULL; 
    1035     write_op->size = *length; 
    1036     write_op->written = 0; 
    1037     write_op->flags = flags; 
    1038     pj_memcpy(&write_op->rmt_addr, addr, addrlen); 
    1039     write_op->rmt_addrlen = addrlen; 
    1040      
    1041     pj_list_insert_before(&key->write_list, write_op); 
    1042     PJ_FD_SET(key->fd, &ioqueue->wfdset); 
    1043  
    1044     pj_lock_release(ioqueue->lock); 
    1045  
    1046     return PJ_EPENDING; 
    1047 } 
    1048  
    1049 #if PJ_HAS_TCP 
    1050 /* 
    1051  * Initiate overlapped accept() operation. 
    1052  */ 
    1053 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 
    1054                                        pj_ioqueue_op_key_t *op_key, 
    1055                                        pj_sock_t *new_sock, 
    1056                                        pj_sockaddr_t *local, 
    1057                                        pj_sockaddr_t *remote, 
    1058                                        int *addrlen) 
    1059 { 
    1060     pj_ioqueue_t *ioqueue; 
    1061     struct accept_operation *accept_op; 
    1062     pj_status_t status; 
    1063  
    1064     /* check parameters. All must be specified! */ 
    1065     PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 
    1066  
    1067     /* Fast track: 
    1068      *  See if there's new connection available immediately. 
    1069      */ 
    1070     if (pj_list_empty(&key->accept_list)) { 
    1071         status = pj_sock_accept(key->fd, new_sock, remote, addrlen); 
    1072         if (status == PJ_SUCCESS) { 
    1073             /* Yes! New connection is available! */ 
    1074             if (local && addrlen) { 
    1075                 status = pj_sock_getsockname(*new_sock, local, addrlen); 
    1076                 if (status != PJ_SUCCESS) { 
    1077                     pj_sock_close(*new_sock); 
    1078                     *new_sock = PJ_INVALID_SOCKET; 
    1079                     return status; 
    1080                 } 
    1081             } 
    1082             return PJ_SUCCESS; 
    1083         } else { 
    1084             /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
    1085              * the error to caller. 
    1086              */ 
    1087             if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 
    1088                 return status; 
    1089             } 
    1090         } 
    1091     } 
    1092  
    1093     /* 
    1094      * No connection is available immediately. 
    1095      * Schedule accept() operation to be completed when there is incoming 
    1096      * connection available. 
    1097      */ 
    1098     ioqueue = key->ioqueue; 
    1099     accept_op = (struct accept_operation*)op_key; 
    1100  
    1101     pj_lock_acquire(ioqueue->lock); 
    1102  
    1103     accept_op->op = PJ_IOQUEUE_OP_ACCEPT; 
    1104     accept_op->accept_fd = new_sock; 
    1105     accept_op->rmt_addr = remote; 
    1106     accept_op->addrlen= addrlen; 
    1107     accept_op->local_addr = local; 
    1108  
    1109     pj_list_insert_before(&key->accept_list, accept_op); 
    1110     PJ_FD_SET(key->fd, &ioqueue->rfdset); 
    1111  
    1112     pj_lock_release(ioqueue->lock); 
    1113  
    1114     return PJ_EPENDING; 
    1115 } 
    1116  
    1117 /* 
    1118  * Initiate overlapped connect() operation (well, it's non-blocking actually, 
    1119  * since there's no overlapped version of connect()). 
    1120  */ 
    1121 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 
    1122                                         const pj_sockaddr_t *addr, 
    1123                                         int addrlen ) 
    1124 { 
    1125     pj_ioqueue_t *ioqueue; 
    1126     pj_status_t status; 
    1127      
    1128     /* check parameters. All must be specified! */ 
    1129     PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 
    1130  
    1131     /* Check if socket has not been marked for connecting */ 
    1132     if (key->connecting != 0) 
    1133         return PJ_EPENDING; 
    1134      
    1135     status = pj_sock_connect(key->fd, addr, addrlen); 
    1136     if (status == PJ_SUCCESS) { 
    1137         /* Connected! */ 
    1138         return PJ_SUCCESS; 
    1139     } else { 
    1140         if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { 
    1141             /* Pending! */ 
    1142             ioqueue = key->ioqueue; 
    1143             pj_lock_acquire(ioqueue->lock); 
    1144             key->connecting = PJ_TRUE; 
    1145             PJ_FD_SET(key->fd, &ioqueue->wfdset); 
    1146             PJ_FD_SET(key->fd, &ioqueue->xfdset); 
    1147             pj_lock_release(ioqueue->lock); 
    1148             return PJ_EPENDING; 
    1149         } else { 
    1150             /* Error! */ 
    1151             return status; 
    1152         } 
    1153     } 
    1154 } 
    1155 #endif  /* PJ_HAS_TCP */ 
    1156  
Note: See TracChangeset for help on using the changeset viewer.