Changeset 14


Ignore:
Timestamp:
Nov 6, 2005 4:50:38 PM (18 years ago)
Author:
bennylp
Message:

Tested new ioqueue framework on Linux with select and epoll

Location:
pjproject/main/pjlib
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • pjproject/main/pjlib/build/Makefile

    r2 r14  
    6161# Gather all flags. 
    6262# 
    63 export _CFLAGS  := -O2 $(CC_CFLAGS) $(OS_CFLAGS) $(HOST_CFLAGS) $(M_CFLAGS) \ 
     63export _CFLAGS  := -O2 -g $(CC_CFLAGS) $(OS_CFLAGS) $(HOST_CFLAGS) $(M_CFLAGS) \ 
    6464                   $(CFLAGS) $(CC_INC)../include 
    6565export _CXXFLAGS:= $(_CFLAGS) $(CC_CXXFLAGS) $(OS_CXXFLAGS) $(M_CXXFLAGS) \ 
     
    9999         
    100100export CC_OUT CC AR RANLIB HOST_MV HOST_RM HOST_RMDIR HOST_MKDIR OBJEXT LD LDOUT  
    101  
    102101############################################################################### 
    103102# Main entry 
     
    125124dep: depend 
    126125 
    127 pjlib: 
     126pjlib: ../include/pj/config_site.h 
    128127        $(MAKE) -f $(RULES_MAK) APP=PJLIB app=pjlib $(PJLIB_LIB) 
    129128 
     129../include/pj/config_site.h: 
     130        touch ../include/pj/config_site.h 
     131         
    130132pjlib-test:  
    131133        $(MAKE) -f $(RULES_MAK) APP=TEST app=pjlib-test $(TEST_EXE) 
  • pjproject/main/pjlib/build/os-linux.mak

    r2 r14  
    1515                        pool_policy_malloc.o sock_bsd.o sock_select.o 
    1616 
    17 export PJLIB_OBJS += ioqueue_select.o  
    18 #export PJLIB_OBJS += ioqueue_epoll.o 
     17#export PJLIB_OBJS += ioqueue_select.o  
     18export PJLIB_OBJS += ioqueue_epoll.o 
    1919 
    2020# 
  • pjproject/main/pjlib/src/pj/ioqueue_common_abs.c

    r13 r14  
    11/* $Id$ */ 
    22 
    3 #include <pj/ioqueue.h> 
    4 #include <pj/errno.h> 
    5 #include <pj/list.h> 
    6 #include <pj/sock.h> 
    7 #include <pj/lock.h> 
    8 #include <pj/assert.h> 
    9 #include <pj/string.h> 
    10  
     3/* 
     4 * ioqueue_common_abs.c 
     5 * 
     6 * This contains common functionalities to emulate proactor pattern with 
     7 * various event dispatching mechanisms (e.g. select, epoll). 
     8 * 
     9 * This file will be included by the appropriate ioqueue implementation. 
     10 * This file is NOT supposed to be compiled as stand-alone source. 
     11 */ 
    1112 
    1213static void ioqueue_init( pj_ioqueue_t *ioqueue ) 
     
    326327 
    327328        /* Call callback. */ 
    328         if (h->cb.on_accept_complete) 
     329        if (h->cb.on_accept_complete) { 
    329330            (*h->cb.on_accept_complete)(h,  
    330331                                        (pj_ioqueue_op_key_t*)accept_op, 
    331332                                        *accept_op->accept_fd, rc); 
     333        } 
    332334 
    333335    } 
     
    337339        struct read_operation *read_op; 
    338340        pj_ssize_t bytes_read; 
    339  
    340         pj_assert(!pj_list_empty(&h->read_list)); 
    341341 
    342342        /* Get one pending read operation from the list. */ 
     
    378378                //              &bytes_read, NULL); 
    379379#           elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) 
    380                 bytes_read = read(h->fd, h->rd_buf, bytes_read); 
     380                bytes_read = read(h->fd, read_op->buf, bytes_read); 
    381381                rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 
    382382#           elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 
    383                 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); 
     383                bytes_read = sys_read(h->fd, read_op->buf, bytes_read); 
    384384                rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 
    385385#           else 
  • pjproject/main/pjlib/src/pj/ioqueue_common_abs.h

    r13 r14  
    1515 */ 
    1616#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) 
    17 #   error "Error reporting must be enabled for this function to work!" 
     17#   error "Proper error reporting must be enabled for ioqueue to work!" 
    1818#endif 
    1919 
     
    106106                                     pj_sock_t fd,  
    107107                                     enum ioqueue_event_type event_type); 
     108 
  • pjproject/main/pjlib/src/pj/ioqueue_epoll.c

    r4 r14  
    11/* $Id$ 
    2  * 
    32 */ 
    43/* 
     
    3130#   define epoll_data           data.ptr 
    3231#   define epoll_data_type      void* 
    33 #   define ioctl_val_type       unsigned long* 
     32#   define ioctl_val_type       unsigned long 
    3433#   define getsockopt_val_ptr   int* 
    3534#   define os_getsockopt        getsockopt 
     
    127126#define THIS_FILE   "ioq_epoll" 
    128127 
    129 #define PJ_IOQUEUE_IS_READ_OP(op)   ((op & PJ_IOQUEUE_OP_READ) || \ 
    130                                      (op & PJ_IOQUEUE_OP_RECV) || \ 
    131                                      (op & PJ_IOQUEUE_OP_RECV_FROM)) 
    132 #define PJ_IOQUEUE_IS_WRITE_OP(op)  ((op & PJ_IOQUEUE_OP_WRITE) || \ 
    133                                      (op & PJ_IOQUEUE_OP_SEND) || \ 
    134                                      (op & PJ_IOQUEUE_OP_SEND_TO)) 
    135  
    136  
    137 #if PJ_HAS_TCP 
    138 #  define PJ_IOQUEUE_IS_ACCEPT_OP(op)   (op & PJ_IOQUEUE_OP_ACCEPT) 
    139 #  define PJ_IOQUEUE_IS_CONNECT_OP(op)  (op & PJ_IOQUEUE_OP_CONNECT) 
    140 #else 
    141 #  define PJ_IOQUEUE_IS_ACCEPT_OP(op)   0 
    142 #  define PJ_IOQUEUE_IS_CONNECT_OP(op)  0 
    143 #endif 
    144  
    145  
    146128//#define TRACE_(expr) PJ_LOG(3,expr) 
    147129#define TRACE_(expr) 
    148130 
     131/* 
     132 * Include common ioqueue abstraction. 
     133 */ 
     134#include "ioqueue_common_abs.h" 
    149135 
    150136/* 
     
    153139struct pj_ioqueue_key_t 
    154140{ 
    155     PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) 
    156     pj_sock_t               fd; 
    157     pj_ioqueue_operation_e  op; 
    158     void                   *user_data; 
    159     pj_ioqueue_callback     cb; 
    160  
    161     void                   *rd_buf; 
    162     unsigned                rd_flags; 
    163     pj_size_t               rd_buflen; 
    164     void                   *wr_buf; 
    165     pj_size_t               wr_buflen; 
    166  
    167     pj_sockaddr_t          *rmt_addr; 
    168     int                    *rmt_addrlen; 
    169  
    170     pj_sockaddr_t          *local_addr; 
    171     int                    *local_addrlen; 
    172  
    173     pj_sock_t              *accept_fd; 
     141    DECLARE_COMMON_KEY 
    174142}; 
    175143 
     
    179147struct pj_ioqueue_t 
    180148{ 
    181     pj_lock_t          *lock; 
    182     pj_bool_t           auto_delete_lock; 
     149    DECLARE_COMMON_IOQUEUE 
     150 
    183151    unsigned            max, count; 
    184152    pj_ioqueue_key_t    hlist; 
     
    186154}; 
    187155 
     156/* Include implementation for common abstraction after we declare 
     157 * pj_ioqueue_key_t and pj_ioqueue_t. 
     158 */ 
     159#include "ioqueue_common_abs.c" 
     160 
    188161/* 
    189162 * pj_ioqueue_create() 
     
    193166PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,  
    194167                                       pj_size_t max_fd, 
    195                                        int max_threads, 
    196168                                       pj_ioqueue_t **p_ioqueue) 
    197169{ 
    198     pj_ioqueue_t *ioque; 
     170    pj_ioqueue_t *ioqueue; 
    199171    pj_status_t rc; 
    200  
    201     PJ_UNUSED_ARG(max_threads); 
    202  
    203     if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { 
    204         pj_assert(!"max_fd too large"); 
    205         return PJ_EINVAL; 
    206     } 
    207  
    208     ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 
    209     ioque->max = max_fd; 
    210     ioque->count = 0; 
    211     pj_list_init(&ioque->hlist); 
    212  
    213     rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock); 
     172    pj_lock_t *lock; 
     173 
     174    /* Check that arguments are valid. */ 
     175    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&  
     176                     max_fd > 0, PJ_EINVAL); 
     177 
     178    /* Check that size of pj_ioqueue_op_key_t is sufficient */ 
     179    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= 
     180                     sizeof(union operation_key), PJ_EBUG); 
     181 
     182    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 
     183 
     184    ioqueue_init(ioqueue); 
     185 
     186    ioqueue->max = max_fd; 
     187    ioqueue->count = 0; 
     188    pj_list_init(&ioqueue->hlist); 
     189 
     190    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); 
    214191    if (rc != PJ_SUCCESS) 
    215192        return rc; 
    216193 
    217     ioque->auto_delete_lock = PJ_TRUE; 
    218     ioque->epfd = os_epoll_create(max_fd); 
    219     if (ioque->epfd < 0) { 
     194    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); 
     195    if (rc != PJ_SUCCESS) 
     196        return rc; 
     197 
     198    ioqueue->epfd = os_epoll_create(max_fd); 
     199    if (ioqueue->epfd < 0) { 
     200        ioqueue_destroy(ioqueue); 
    220201        return PJ_RETURN_OS_ERROR(pj_get_native_os_error()); 
    221202    } 
    222203     
    223     PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque)); 
    224  
    225     *p_ioqueue = ioque; 
     204    PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue)); 
     205 
     206    *p_ioqueue = ioqueue; 
    226207    return PJ_SUCCESS; 
    227208} 
     
    232213 * Destroy ioqueue. 
    233214 */ 
    234 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque) 
    235 { 
    236     PJ_ASSERT_RETURN(ioque, PJ_EINVAL); 
    237     PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP); 
    238  
    239     pj_lock_acquire(ioque->lock); 
    240     os_close(ioque->epfd); 
    241     ioque->epfd = 0; 
    242     if (ioque->auto_delete_lock) 
    243         pj_lock_destroy(ioque->lock); 
    244      
    245     return PJ_SUCCESS; 
    246 } 
    247  
    248 /* 
    249  * pj_ioqueue_set_lock() 
    250  */ 
    251 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,  
    252                                          pj_lock_t *lock, 
    253                                          pj_bool_t auto_delete ) 
    254 { 
    255     PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); 
    256  
    257     if (ioque->auto_delete_lock) { 
    258         pj_lock_destroy(ioque->lock); 
    259     } 
    260  
    261     ioque->lock = lock; 
    262     ioque->auto_delete_lock = auto_delete; 
    263  
    264     return PJ_SUCCESS; 
    265 } 
    266  
     215PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 
     216{ 
     217    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
     218    PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP); 
     219 
     220    pj_lock_acquire(ioqueue->lock); 
     221    os_close(ioqueue->epfd); 
     222    ioqueue->epfd = 0; 
     223    return ioqueue_destroy(ioqueue); 
     224} 
    267225 
    268226/* 
     
    272230 */ 
    273231PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 
    274                                               pj_ioqueue_t *ioque, 
     232                                              pj_ioqueue_t *ioqueue, 
    275233                                              pj_sock_t sock, 
    276234                                              void *user_data, 
     
    284242    pj_status_t rc = PJ_SUCCESS; 
    285243     
    286     PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET && 
     244    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && 
    287245                     cb && p_key, PJ_EINVAL); 
    288246 
    289     pj_lock_acquire(ioque->lock); 
    290  
    291     if (ioque->count >= ioque->max) { 
     247    pj_lock_acquire(ioqueue->lock); 
     248 
     249    if (ioqueue->count >= ioqueue->max) { 
    292250        rc = PJ_ETOOMANY; 
    293251        TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files")); 
     
    306264    /* Create key. */ 
    307265    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
    308     key->fd = sock; 
    309     key->user_data = user_data; 
    310     pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 
     266    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 
     267    if (rc != PJ_SUCCESS) { 
     268        key = NULL; 
     269        goto on_return; 
     270    } 
    311271 
    312272    /* os_epoll_ctl. */ 
    313273    ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; 
    314274    ev.epoll_data = (epoll_data_type)key; 
    315     status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev); 
     275    status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev); 
    316276    if (status < 0) { 
    317277        rc = pj_get_os_error(); 
     278        key = NULL; 
    318279        TRACE_((THIS_FILE,  
    319280                "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d",  
     
    323284     
    324285    /* Register */ 
    325     pj_list_insert_before(&ioque->hlist, key); 
    326     ++ioque->count; 
     286    pj_list_insert_before(&ioqueue->hlist, key); 
     287    ++ioqueue->count; 
    327288 
    328289on_return: 
    329290    *p_key = key; 
    330     pj_lock_release(ioque->lock); 
     291    pj_lock_release(ioqueue->lock); 
    331292     
    332293    return rc; 
     
    338299 * Unregister handle from ioqueue. 
    339300 */ 
    340 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, 
    341                                            pj_ioqueue_key_t *key) 
    342 { 
     301PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) 
     302{ 
     303    pj_ioqueue_t *ioqueue; 
    343304    struct epoll_event ev; 
    344305    int status; 
    345306     
    346     PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); 
    347  
    348     pj_lock_acquire(ioque->lock); 
    349  
    350     pj_assert(ioque->count > 0); 
    351     --ioque->count; 
     307    PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL); 
     308 
     309    ioqueue = key->ioqueue; 
     310    pj_lock_acquire(ioqueue->lock); 
     311 
     312    pj_assert(ioqueue->count > 0); 
     313    --ioqueue->count; 
    352314    pj_list_erase(key); 
    353315 
    354316    ev.events = 0; 
    355317    ev.epoll_data = (epoll_data_type)key; 
    356     status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev); 
     318    status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev); 
    357319    if (status != 0) { 
    358320        pj_status_t rc = pj_get_os_error(); 
    359         pj_lock_release(ioque->lock); 
     321        pj_lock_release(ioqueue->lock); 
    360322        return rc; 
    361323    } 
    362324 
    363     pj_lock_release(ioque->lock); 
     325    pj_lock_release(ioqueue->lock); 
     326 
     327    /* Destroy the key. */ 
     328    ioqueue_destroy_key(key); 
     329 
    364330    return PJ_SUCCESS; 
    365331} 
    366332 
    367 /* 
    368  * pj_ioqueue_get_user_data() 
    369  * 
    370  * Obtain value associated with a key. 
    371  */ 
    372 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) 
    373 { 
    374     PJ_ASSERT_RETURN(key != NULL, NULL); 
    375     return key->user_data; 
    376 } 
    377  
     333/* ioqueue_remove_from_set() 
     334 * This function is called from ioqueue_dispatch_event() to instruct 
     335 * the ioqueue to remove the specified descriptor from ioqueue's descriptor 
     336 * set for the specified event. 
     337 */ 
     338static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, 
     339                                     pj_sock_t fd,  
     340                                     enum ioqueue_event_type event_type) 
     341{ 
     342} 
     343 
     344/* 
     345 * ioqueue_add_to_set() 
     346 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc 
     347 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor 
     348 * set for the specified event. 
     349 */ 
     350static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, 
     351                                pj_sock_t fd, 
     352                                enum ioqueue_event_type event_type ) 
     353{ 
     354} 
    378355 
    379356/* 
     
    381358 * 
    382359 */ 
    383 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) 
     360PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 
    384361{ 
    385362    int i, count, processed; 
    386     struct epoll_event events[16]; 
     363    struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
    387364    int msec; 
     365    struct queue { 
     366        pj_ioqueue_key_t        *key; 
     367        enum ioqueue_event_type  event_type; 
     368    } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
    388369     
    389370    PJ_CHECK_STACK(); 
     
    391372    msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000; 
    392373     
    393     count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec); 
     374    count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec); 
    394375    if (count <= 0) 
    395376        return count; 
    396377 
    397378    /* Lock ioqueue. */ 
    398     pj_lock_acquire(ioque->lock); 
    399  
    400     processed = 0; 
    401  
    402     for (i=0; i<count; ++i) { 
     379    pj_lock_acquire(ioqueue->lock); 
     380 
     381    for (processed=0, i=0; i<count; ++i) { 
    403382        pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) 
    404383                                events[i].epoll_data; 
    405         pj_status_t rc; 
    406384 
    407385        /* 
    408          * Check for completion of read operations. 
     386         * Check readability. 
    409387         */ 
    410         if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) { 
    411             pj_ssize_t bytes_read = h->rd_buflen; 
    412  
    413             if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { 
    414                 rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0, 
    415                                        h->rmt_addr, h->rmt_addrlen); 
    416             } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { 
    417                 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); 
    418             } else { 
    419                 bytes_read = os_read( h->fd, h->rd_buf, bytes_read); 
    420                 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 
    421             } 
    422              
    423             if (rc != PJ_SUCCESS) { 
    424                 bytes_read = -rc; 
    425             } 
    426  
    427             h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV |  
    428                        PJ_IOQUEUE_OP_RECV_FROM); 
    429  
    430             /* Call callback. */ 
    431             (*h->cb.on_read_complete)(h, bytes_read); 
    432  
     388        if ((events[i].events & EPOLLIN) &&  
     389            (key_has_pending_read(h) || key_has_pending_accept(h))) { 
     390            queue[processed].key = h; 
     391            queue[processed].event_type = READABLE_EVENT; 
    433392            ++processed; 
    434393        } 
     394 
    435395        /* 
    436          * Check for completion of accept() operation. 
     396         * Check for writeability. 
    437397         */ 
    438         else if ((events[i].events & EPOLLIN) && 
    439                  (h->op & PJ_IOQUEUE_OP_ACCEPT))  
    440         { 
    441             /* accept() must be the only operation specified on  
    442              * server socket  
    443              */ 
    444             pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT); 
    445  
    446             rc = pj_sock_accept( h->fd, h->accept_fd,  
    447                                  h->rmt_addr, h->rmt_addrlen); 
    448             if (rc==PJ_SUCCESS && h->local_addr) { 
    449                 rc = pj_sock_getsockname(*h->accept_fd, h->local_addr,  
    450                                           h->local_addrlen); 
    451             } 
    452  
    453             h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); 
    454  
    455             /* Call callback. */ 
    456             (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); 
    457              
     398        if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) { 
     399            queue[processed].key = h; 
     400            queue[processed].event_type = WRITEABLE_EVENT; 
    458401            ++processed; 
    459402        } 
    460403 
    461         /* 
    462          * Check for completion of write operations. 
    463          */ 
    464         if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) { 
    465             /* Completion of write(), send(), or sendto() operation. */ 
    466  
    467             /* Clear operation. */ 
    468             h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND |  
    469                        PJ_IOQUEUE_OP_SEND_TO); 
    470  
    471             /* Call callback. */ 
    472             /* All data must have been sent? */ 
    473             (*h->cb.on_write_complete)(h, h->wr_buflen); 
    474  
    475             ++processed; 
    476         } 
    477404#if PJ_HAS_TCP 
    478405        /* 
    479406         * Check for completion of connect() operation. 
    480407         */ 
    481         else if ((events[i].events & EPOLLOUT) &&  
    482                  (h->op & PJ_IOQUEUE_OP_CONNECT))  
    483         { 
    484             /* Completion of connect() operation */ 
    485             pj_ssize_t bytes_transfered; 
    486  
    487             /* from connect(2):  
    488                 * On Linux, use getsockopt to read the SO_ERROR option at 
    489                 * level SOL_SOCKET to determine whether connect() completed 
    490                 * successfully (if SO_ERROR is zero). 
    491                 */ 
    492             int value; 
    493             socklen_t vallen = sizeof(value); 
    494             int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,  
    495                                       (getsockopt_val_ptr)&value, &vallen); 
    496             if (gs_rc != 0) { 
    497                 /* Argh!! What to do now???  
    498                  * Just indicate that the socket is connected. The 
    499                  * application will get error as soon as it tries to use 
    500                  * the socket to send/receive. 
    501                  */ 
    502                 bytes_transfered = 0; 
    503             } else { 
    504                 bytes_transfered = value; 
    505             } 
    506  
    507             /* Clear operation. */ 
    508             h->op &= (~PJ_IOQUEUE_OP_CONNECT); 
    509  
    510             /* Call callback. */ 
    511             (*h->cb.on_connect_complete)(h, bytes_transfered); 
    512  
     408        if ((events[i].events & EPOLLOUT) && (h->connecting)) { 
     409            queue[processed].key = h; 
     410            queue[processed].event_type = WRITEABLE_EVENT; 
    513411            ++processed; 
    514412        } 
     
    518416         * Check for error condition. 
    519417         */ 
    520         if (events[i].events & EPOLLERR) { 
    521             if (h->op & PJ_IOQUEUE_OP_CONNECT) { 
    522                 h->op &= ~PJ_IOQUEUE_OP_CONNECT; 
    523  
    524                 /* Call callback. */ 
    525                 (*h->cb.on_connect_complete)(h, -1); 
    526  
    527                 ++processed; 
    528             } 
    529         } 
    530     } 
    531      
    532     pj_lock_release(ioque->lock); 
     418        if (events[i].events & EPOLLERR && (h->connecting)) { 
     419            queue[processed].key = h; 
     420            queue[processed].event_type = EXCEPTION_EVENT; 
     421            ++processed; 
     422        } 
     423    } 
     424    pj_lock_release(ioqueue->lock); 
     425 
     426    /* Now process the events. */ 
     427    for (i=0; i<processed; ++i) { 
     428        switch (queue[i].event_type) { 
     429        case READABLE_EVENT: 
     430            ioqueue_dispatch_read_event(ioqueue, queue[i].key); 
     431            break; 
     432        case WRITEABLE_EVENT: 
     433            ioqueue_dispatch_write_event(ioqueue, queue[i].key); 
     434            break; 
     435        case EXCEPTION_EVENT: 
     436            ioqueue_dispatch_exception_event(ioqueue, queue[i].key); 
     437            break; 
     438        case NO_EVENT: 
     439            pj_assert(!"Invalid event!"); 
     440            break; 
     441        } 
     442    } 
    533443 
    534444    return processed; 
    535445} 
    536446 
    537 /* 
    538  * pj_ioqueue_read() 
    539  * 
    540  * Start asynchronous read from the descriptor. 
    541  */ 
    542 PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, 
    543                                      pj_ioqueue_key_t *key, 
    544                                      void *buffer, 
    545                                      pj_size_t buflen) 
    546 { 
    547     PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 
    548     PJ_CHECK_STACK(); 
    549  
    550     /* For consistency with other ioqueue implementation, we would reject  
    551      * if descriptor has already been submitted for reading before. 
    552      */ 
    553     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 
    554                       (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 
    555                       (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 
    556                      PJ_EBUSY); 
    557  
    558     pj_lock_acquire(ioque->lock); 
    559  
    560     key->op |= PJ_IOQUEUE_OP_READ; 
    561     key->rd_flags = 0; 
    562     key->rd_buf = buffer; 
    563     key->rd_buflen = buflen; 
    564  
    565     pj_lock_release(ioque->lock); 
    566     return PJ_EPENDING; 
    567 } 
    568  
    569  
    570 /* 
    571  * pj_ioqueue_recv() 
    572  * 
    573  * Start asynchronous recv() from the socket. 
    574  */ 
    575 PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_t *ioque, 
    576                                       pj_ioqueue_key_t *key, 
    577                                       void *buffer, 
    578                                       pj_size_t buflen, 
    579                                       unsigned flags ) 
    580 { 
    581     PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 
    582     PJ_CHECK_STACK(); 
    583  
    584     /* For consistency with other ioqueue implementation, we would reject  
    585      * if descriptor has already been submitted for reading before. 
    586      */ 
    587     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 
    588                       (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 
    589                       (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 
    590                      PJ_EBUSY); 
    591  
    592     pj_lock_acquire(ioque->lock); 
    593  
    594     key->op |= PJ_IOQUEUE_OP_RECV; 
    595     key->rd_buf = buffer; 
    596     key->rd_buflen = buflen; 
    597     key->rd_flags = flags; 
    598  
    599     pj_lock_release(ioque->lock); 
    600     return PJ_EPENDING; 
    601 } 
    602  
    603 /* 
    604  * pj_ioqueue_recvfrom() 
    605  * 
    606  * Start asynchronous recvfrom() from the socket. 
    607  */ 
    608 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, 
    609                                          pj_ioqueue_key_t *key, 
    610                                          void *buffer, 
    611                                          pj_size_t buflen, 
    612                                          unsigned flags, 
    613                                          pj_sockaddr_t *addr, 
    614                                          int *addrlen) 
    615 { 
    616     PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 
    617     PJ_CHECK_STACK(); 
    618  
    619     /* For consistency with other ioqueue implementation, we would reject  
    620      * if descriptor has already been submitted for reading before. 
    621      */ 
    622     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 
    623                       (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 
    624                       (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 
    625                      PJ_EBUSY); 
    626  
    627     pj_lock_acquire(ioque->lock); 
    628  
    629     key->op |= PJ_IOQUEUE_OP_RECV_FROM; 
    630     key->rd_buf = buffer; 
    631     key->rd_buflen = buflen; 
    632     key->rd_flags = flags; 
    633     key->rmt_addr = addr; 
    634     key->rmt_addrlen = addrlen; 
    635  
    636     pj_lock_release(ioque->lock); 
    637     return PJ_EPENDING; 
    638 } 
    639  
    640 /* 
    641  * pj_ioqueue_write() 
    642  * 
    643  * Start asynchronous write() to the descriptor. 
    644  */ 
    645 PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, 
    646                                       pj_ioqueue_key_t *key, 
    647                                       const void *data, 
    648                                       pj_size_t datalen) 
    649 { 
    650     pj_status_t rc; 
    651     pj_ssize_t sent; 
    652  
    653     PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 
    654     PJ_CHECK_STACK(); 
    655  
    656     /* For consistency with other ioqueue implementation, we would reject  
    657      * if descriptor has already been submitted for writing before. 
    658      */ 
    659     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 
    660                       (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 
    661                       (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 
    662                      PJ_EBUSY); 
    663  
    664     sent = datalen; 
    665     /* sent would be -1 after pj_sock_send() if it returns error. */ 
    666     rc = pj_sock_send(key->fd, data, &sent, 0); 
    667     if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 
    668         return rc; 
    669     } 
    670  
    671     pj_lock_acquire(ioque->lock); 
    672  
    673     key->op |= PJ_IOQUEUE_OP_WRITE; 
    674     key->wr_buf = NULL; 
    675     key->wr_buflen = datalen; 
    676  
    677     pj_lock_release(ioque->lock); 
    678  
    679     return PJ_EPENDING; 
    680 } 
    681  
    682 /* 
    683  * pj_ioqueue_send() 
    684  * 
    685  * Start asynchronous send() to the descriptor. 
    686  */ 
    687 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, 
    688                                      pj_ioqueue_key_t *key, 
    689                                      const void *data, 
    690                                      pj_size_t datalen, 
    691                                      unsigned flags) 
    692 { 
    693     pj_status_t rc; 
    694     pj_ssize_t sent; 
    695  
    696     PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 
    697     PJ_CHECK_STACK(); 
    698  
    699     /* For consistency with other ioqueue implementation, we would reject  
    700      * if descriptor has already been submitted for writing before. 
    701      */ 
    702     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 
    703                       (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 
    704                       (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 
    705                      PJ_EBUSY); 
    706  
    707     sent = datalen; 
    708     /* sent would be -1 after pj_sock_send() if it returns error. */ 
    709     rc = pj_sock_send(key->fd, data, &sent, flags); 
    710     if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 
    711         return rc; 
    712     } 
    713  
    714     pj_lock_acquire(ioque->lock); 
    715  
    716     key->op |= PJ_IOQUEUE_OP_SEND; 
    717     key->wr_buf = NULL; 
    718     key->wr_buflen = datalen; 
    719  
    720     pj_lock_release(ioque->lock); 
    721  
    722     return PJ_EPENDING; 
    723 } 
    724  
    725  
    726 /* 
    727  * pj_ioqueue_sendto() 
    728  * 
    729  * Start asynchronous write() to the descriptor. 
    730  */ 
    731 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, 
    732                                        pj_ioqueue_key_t *key, 
    733                                        const void *data, 
    734                                        pj_size_t datalen, 
    735                                        unsigned flags, 
    736                                        const pj_sockaddr_t *addr, 
    737                                        int addrlen) 
    738 { 
    739     pj_status_t rc; 
    740     pj_ssize_t sent; 
    741  
    742     PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 
    743     PJ_CHECK_STACK(); 
    744  
    745     /* For consistency with other ioqueue implementation, we would reject  
    746      * if descriptor has already been submitted for writing before. 
    747      */ 
    748     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 
    749                       (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 
    750                       (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 
    751                      PJ_EBUSY); 
    752  
    753     sent = datalen; 
    754     /* sent would be -1 after pj_sock_sendto() if it returns error. */ 
    755     rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); 
    756     if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))  { 
    757         return rc; 
    758     } 
    759  
    760     pj_lock_acquire(ioque->lock); 
    761  
    762     key->op |= PJ_IOQUEUE_OP_SEND_TO; 
    763     key->wr_buf = NULL; 
    764     key->wr_buflen = datalen; 
    765  
    766     pj_lock_release(ioque->lock); 
    767     return PJ_EPENDING; 
    768 } 
    769  
    770 #if PJ_HAS_TCP 
    771 /* 
    772  * Initiate overlapped accept() operation. 
    773  */ 
    774 PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, 
    775                                pj_ioqueue_key_t *key, 
    776                                pj_sock_t *new_sock, 
    777                                pj_sockaddr_t *local, 
    778                                pj_sockaddr_t *remote, 
    779                                int *addrlen) 
    780 { 
    781     /* check parameters. All must be specified! */ 
    782     pj_assert(ioqueue && key && new_sock); 
    783  
    784     /* Server socket must have no other operation! */ 
    785     pj_assert(key->op == 0); 
    786      
    787     pj_lock_acquire(ioqueue->lock); 
    788  
    789     key->op = PJ_IOQUEUE_OP_ACCEPT; 
    790     key->accept_fd = new_sock; 
    791     key->rmt_addr = remote; 
    792     key->rmt_addrlen = addrlen; 
    793     key->local_addr = local; 
    794     key->local_addrlen = addrlen;   /* use same addr. as rmt_addrlen */ 
    795  
    796     pj_lock_release(ioqueue->lock); 
    797     return PJ_EPENDING; 
    798 } 
    799  
    800 /* 
    801  * Initiate overlapped connect() operation (well, it's non-blocking actually, 
    802  * since there's no overlapped version of connect()). 
    803  */ 
    804 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, 
    805                                         pj_ioqueue_key_t *key, 
    806                                         const pj_sockaddr_t *addr, 
    807                                         int addrlen ) 
    808 { 
    809     pj_status_t rc; 
    810      
    811     /* check parameters. All must be specified! */ 
    812     PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL); 
    813  
    814     /* Connecting socket must have no other operation! */ 
    815     PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY); 
    816      
    817     rc = pj_sock_connect(key->fd, addr, addrlen); 
    818     if (rc == PJ_SUCCESS) { 
    819         /* Connected! */ 
    820         return PJ_SUCCESS; 
    821     } else { 
    822         if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) ||  
    823             rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))  
    824         { 
    825             /* Pending! */ 
    826             pj_lock_acquire(ioqueue->lock); 
    827             key->op = PJ_IOQUEUE_OP_CONNECT; 
    828             pj_lock_release(ioqueue->lock); 
    829             return PJ_EPENDING; 
    830         } else { 
    831             /* Error! */ 
    832             return rc; 
    833         } 
    834     } 
    835 } 
    836 #endif  /* PJ_HAS_TCP */ 
    837  
  • pjproject/main/pjlib/src/pj/ioqueue_select.c

    r12 r14  
    206206    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
    207207    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 
    208     if (rc != PJ_SUCCESS) 
    209         return rc; 
     208    if (rc != PJ_SUCCESS) { 
     209        key = NULL; 
     210        goto on_return; 
     211    } 
    210212 
    211213    /* Register */ 
     
    381383    struct event 
    382384    { 
    383         pj_ioqueue_key_t    *key; 
    384         enum event_type      event_type; 
     385        pj_ioqueue_key_t        *key; 
     386        enum ioqueue_event_type  event_type; 
    385387    } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
    386388 
     
    453455            event[counter].key = h; 
    454456            event[counter].event_type = READABLE_EVENT; 
    455             ++counter;        } 
     457            ++counter; 
     458        } 
    456459 
    457460#if PJ_HAS_TCP 
     
    483486            break; 
    484487        case NO_EVENT: 
    485         default: 
    486488            pj_assert(!"Invalid event!"); 
    487489            break; 
  • pjproject/main/pjlib/src/pj/os_core_unix.c

    r5 r14  
    716716    if (type == PJ_MUTEX_SIMPLE) { 
    717717#if defined(PJ_LINUX) && PJ_LINUX!=0 
     718        extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int); 
    718719        rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_FAST_NP); 
    719720#else 
     
    722723    } else { 
    723724#if defined(PJ_LINUX) && PJ_LINUX!=0 
     725        extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int); 
    724726        rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP); 
    725727#else 
  • pjproject/main/pjlib/src/pj/sock_bsd.c

    r12 r14  
    105105PJ_DEF(char*) pj_inet_ntoa(pj_in_addr inaddr) 
    106106{ 
    107     return inet_ntoa(*(struct in_addr*)&inaddr); 
     107    struct in_addr addr; 
     108    addr.s_addr = inaddr.s_addr; 
     109    return inet_ntoa(addr); 
    108110} 
    109111 
  • pjproject/main/pjlib/src/pjlib-test/ioq_perf.c

    r12 r14  
    7777                          bytes_read, errmsg)); 
    7878                PJ_LOG(3,(THIS_FILE,  
    79                           ".....additional info: total read=%u, total written=%u", 
     79                          ".....additional info: total read=%u, total sent=%u", 
    8080                          item->bytes_recv, item->bytes_sent)); 
    8181            } else { 
     
    108108            if (rc != last_error) { 
    109109                last_error = rc; 
    110                 app_perror("...error: read error", rc); 
     110                app_perror("...error: read error(1)", rc); 
    111111            } else { 
    112112                last_error_counter++; 
Note: See TracChangeset for help on using the changeset viewer.