Changeset 365 for pjproject


Ignore:
Timestamp:
Mar 30, 2006 4:32:18 PM (19 years ago)
Author:
bennylp
Message:

Fixed race condition bug in ioqueue unregistration for select and Win32 IOCP backend

Location:
pjproject/trunk/pjlib
Files:
1 added
1 deleted
11 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/build/Makefile

    r338 r365  
    2424        array.o config.o ctype.o errno.o except.o fifobuf.o guid.o \ 
    2525        hash.o list.o lock.o log.o os_time_common.o \ 
    26         pool.o pool_caching.o rand.o \ 
     26        pool.o pool_caching.o pool_dbg.o rand.o \ 
    2727        rbtree.o string.o timer.o \ 
    2828        types.o symbols.o 
     
    3535export TEST_OBJS += atomic.o echo_clt.o errno.o exception.o \ 
    3636                    fifobuf.o file.o \ 
    37                     ioq_perf.o ioq_udp.o ioq_tcp.o \ 
     37                    ioq_perf.o ioq_udp.o ioq_unreg.o ioq_tcp.o \ 
    3838                    list.o mutex.o os.o pool.o pool_perf.o rand.o rbtree.o \ 
    3939                    select.o sleep.o sock.o sock_perf.o \ 
  • pjproject/trunk/pjlib/build/pjlib.dsp

    r363 r365  
    234234 
    235235SOURCE=..\src\pj\ioqueue_select.c 
     236 
     237!IF  "$(CFG)" == "pjlib - Win32 Release" 
     238 
     239!ELSEIF  "$(CFG)" == "pjlib - Win32 Debug" 
     240 
     241!ENDIF  
     242 
    236243# End Source File 
    237244# Begin Source File 
     
    286293# Begin Source File 
    287294 
    288 SOURCE=..\src\pj\pool_dbg_win32.c 
     295SOURCE=..\src\pj\pool_dbg.c 
    289296# End Source File 
    290297# Begin Source File 
     
    523530 
    524531SOURCE=..\include\pj\pool.h 
     532# End Source File 
     533# Begin Source File 
     534 
     535SOURCE=..\include\pj\pool_alt.h 
    525536# End Source File 
    526537# Begin Source File 
  • pjproject/trunk/pjlib/build/pjlib_test.dsp

    r349 r365  
    124124# Begin Source File 
    125125 
     126SOURCE="..\src\pjlib-test\ioq_unreg.c" 
     127# End Source File 
     128# Begin Source File 
     129 
    126130SOURCE="..\src\pjlib-test\list.c" 
    127131# End Source File 
  • pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c

    r363 r365  
    2828 */ 
    2929 
    30 static long ioqueue_tls_id = -1; 
    31  
    32 typedef struct key_lock_data { 
    33     struct key_lock_data *prev; 
    34     pj_ioqueue_key_t     *key; 
    35     int                   is_alive; 
    36 } key_lock_data; 
    37  
    38  
    3930static void ioqueue_init( pj_ioqueue_t *ioqueue ) 
    4031{ 
    4132    ioqueue->lock = NULL; 
    4233    ioqueue->auto_delete_lock = 0; 
    43  
    44     if (ioqueue_tls_id == -1) { 
    45         pj_status_t status; 
    46         status = pj_thread_local_alloc(&ioqueue_tls_id); 
    47         pj_thread_local_set(ioqueue_tls_id, NULL); 
    48     } 
    4934} 
    5035 
     
    9479#if PJ_HAS_TCP 
    9580    pj_list_init(&key->accept_list); 
     81    key->connecting = 0; 
    9682#endif 
    9783 
    9884    /* Save callback. */ 
    9985    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 
     86 
     87#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     88    /* Set initial reference count to 1 */ 
     89    pj_assert(key->ref_count == 0); 
     90    ++key->ref_count; 
     91 
     92    key->closing = 0; 
     93#endif 
    10094 
    10195    /* Get socket type. When socket type is datagram, some optimization 
     
    108102        key->fd_type = PJ_SOCK_STREAM; 
    109103 
    110     key->inside_callback = 0; 
    111     key->destroy_requested = 0; 
    112  
    113104    /* Create mutex for the key. */ 
    114     rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 
     105#if !PJ_IOQUEUE_HAS_SAFE_UNREG 
     106    rc = pj_mutex_create_simple(pool, NULL, &key->mutex); 
     107#endif 
    115108     
    116109    return rc; 
    117 } 
    118  
    119 /* Lock the key and also keep the lock data in thread local storage. 
    120  * The lock data is used to detect if the key is deleted by application 
    121  * when we call its callback. 
    122  */ 
    123 static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck) 
    124 { 
    125     struct key_lock_data *prev_data; 
    126  
    127     pj_mutex_lock(key->mutex); 
    128     prev_data = (struct key_lock_data *)  
    129                     pj_thread_local_get(ioqueue_tls_id); 
    130     lck->prev = prev_data; 
    131     lck->key = key; 
    132     lck->is_alive = 1; 
    133     pj_thread_local_set(ioqueue_tls_id, lck); 
    134 } 
    135  
    136 /* Unlock the key only if it is still valid. */ 
    137 static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck) 
    138 { 
    139     pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck); 
    140     pj_assert( lck->key == key ); 
    141     pj_thread_local_set(ioqueue_tls_id, lck->prev); 
    142     if (lck->is_alive) 
    143         pj_mutex_unlock(key->mutex); 
    144 } 
    145  
    146 /* Destroy key */ 
    147 static void ioqueue_destroy_key( pj_ioqueue_key_t *key ) 
    148 { 
    149     key_lock_data *lck; 
    150  
    151     /* Make sure that no other threads are doing something with 
    152      * the key. 
    153      */ 
    154     pj_mutex_lock(key->mutex); 
    155      
    156     /* Check if this function is called within a callback context. 
    157      * If so, then we need to inform the callback that the key has 
    158      * been destroyed so that it doesn't attempt to unlock the 
    159      * key's mutex. 
    160      */ 
    161     lck = pj_thread_local_get(ioqueue_tls_id); 
    162     while (lck) { 
    163         if (lck->key == key) { 
    164             lck->is_alive = 0; 
    165         } 
    166         lck = lck->prev; 
    167     } 
    168  
    169     pj_mutex_destroy(key->mutex); 
    170110} 
    171111 
     
    222162 
    223163 
     164#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     165#   define IS_CLOSING(key)  (key->closing) 
     166#else 
     167#   define IS_CLOSING(key)  (0) 
     168#endif 
     169 
     170 
    224171/* 
    225172 * ioqueue_dispatch_event() 
     
    230177void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) 
    231178{ 
    232     key_lock_data lck_data; 
    233  
    234179    /* Lock the key. */ 
    235     lock_key(h, &lck_data); 
     180    pj_mutex_lock(h->mutex); 
     181 
     182    if (h->closing) { 
     183        pj_mutex_unlock(h->mutex); 
     184        return; 
     185    } 
    236186 
    237187#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 
     
    246196        ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); 
    247197 
    248         /* Unlock; from this point we don't need to hold key's mutex. */ 
    249         //pj_mutex_unlock(h->mutex); 
    250198 
    251199#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) 
     
    294242#endif 
    295243 
     244        /* Unlock; from this point we don't need to hold key's mutex. */ 
     245        pj_mutex_unlock(h->mutex); 
     246 
    296247        /* Call callback. */ 
    297         if (h->cb.on_connect_complete) 
     248        if (h->cb.on_connect_complete && !IS_CLOSING(h)) 
    298249            (*h->cb.on_connect_complete)(h, bytes_transfered); 
    299250 
     
    320271                ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 
    321272 
    322             //pj_mutex_unlock(h->mutex); 
    323273        } 
    324274 
     
    366316                    ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 
    367317 
    368                 /* No need to hold mutex anymore */ 
    369                 //pj_mutex_unlock(h->mutex); 
    370318            } 
    371319 
     320            /* No need to hold mutex anymore */ 
     321            pj_mutex_unlock(h->mutex); 
     322 
    372323            /* Call callback. */ 
    373             if (h->cb.on_write_complete) { 
     324            if (h->cb.on_write_complete && !IS_CLOSING(h)) { 
    374325                (*h->cb.on_write_complete)(h,  
    375326                                           (pj_ioqueue_op_key_t*)write_op, 
     
    378329 
    379330        } else { 
    380             //pj_mutex_unlock(h->mutex); 
     331            pj_mutex_unlock(h->mutex); 
    381332        } 
    382333 
     
    388339         * able to process the event. 
    389340         */ 
    390         //pj_mutex_unlock(h->mutex); 
    391     } 
    392  
    393     /* Finally unlock key */ 
    394     unlock_key(h, &lck_data); 
     341        pj_mutex_unlock(h->mutex); 
     342    } 
    395343} 
    396344 
    397345void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) 
    398346{ 
    399     key_lock_data lck_data; 
    400347    pj_status_t rc; 
    401348 
    402349    /* Lock the key. */ 
    403     lock_key(h, &lck_data); 
     350    pj_mutex_lock(h->mutex); 
     351 
     352    if (h->closing) { 
     353        pj_mutex_unlock(h->mutex); 
     354        return; 
     355    } 
    404356 
    405357#   if PJ_HAS_TCP 
     
    416368        if (pj_list_empty(&h->accept_list)) 
    417369            ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); 
    418  
    419         /* Unlock; from this point we don't need to hold key's mutex. */ 
    420         //pj_mutex_unlock(h->mutex); 
    421370 
    422371        rc=pj_sock_accept(h->fd, accept_op->accept_fd,  
     
    428377        } 
    429378 
     379        /* Unlock; from this point we don't need to hold key's mutex. */ 
     380        pj_mutex_unlock(h->mutex); 
     381 
    430382        /* Call callback. */ 
    431         if (h->cb.on_accept_complete) { 
     383        if (h->cb.on_accept_complete && !IS_CLOSING(h)) { 
    432384            (*h->cb.on_accept_complete)(h,  
    433385                                        (pj_ioqueue_op_key_t*)accept_op, 
     
    449401        if (pj_list_empty(&h->read_list)) 
    450402            ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); 
    451  
    452         /* Unlock; from this point we don't need to hold key's mutex. */ 
    453         //Crash as of revision 353 (since we added pjmedia socket to 
    454         //main ioqueue). 
    455         //pj_mutex_unlock(h->mutex); 
    456403 
    457404        bytes_read = read_op->size; 
     
    517464        } 
    518465 
     466        /* Unlock; from this point we don't need to hold key's mutex. */ 
     467        pj_mutex_unlock(h->mutex); 
     468 
    519469        /* Call callback. */ 
    520         if (h->cb.on_read_complete) { 
     470        if (h->cb.on_read_complete && !IS_CLOSING(h)) { 
    521471            (*h->cb.on_read_complete)(h,  
    522472                                      (pj_ioqueue_op_key_t*)read_op, 
     
    530480         * able to process the event. 
    531481         */ 
    532         //pj_mutex_unlock(h->mutex); 
    533     } 
    534  
    535     /* Unlock handle. */ 
    536     unlock_key(h, &lck_data); 
     482        pj_mutex_unlock(h->mutex); 
     483    } 
    537484} 
    538485 
     
    541488                                       pj_ioqueue_key_t *h ) 
    542489{ 
    543     key_lock_data lck_data; 
    544  
    545     lock_key(h, &lck_data); 
     490    pj_mutex_lock(h->mutex); 
    546491 
    547492    if (!h->connecting) { 
     
    550495         * it has been processed by other thread. 
    551496         */ 
    552         //pj_mutex_unlock(h->mutex); 
    553         unlock_key(h, &lck_data); 
     497        pj_mutex_unlock(h->mutex); 
    554498        return; 
     499    } 
     500 
     501    if (h->closing) { 
     502        pj_mutex_unlock(h->mutex); 
     503        return; 
    555504    } 
    556505 
     
    558507    h->connecting = 0; 
    559508 
    560     //pj_mutex_unlock(h->mutex); 
    561  
    562509    ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 
    563510    ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); 
    564511 
     512    pj_mutex_unlock(h->mutex); 
     513 
    565514    /* Call callback. */ 
    566     if (h->cb.on_connect_complete) 
     515    if (h->cb.on_connect_complete && !IS_CLOSING(h)) 
    567516        (*h->cb.on_connect_complete)(h, -1); 
    568  
    569     unlock_key(h, &lck_data); 
    570517} 
    571518 
     
    588535    read_op = (struct read_operation*)op_key; 
    589536    read_op->op = 0; 
     537 
     538    /* Check if key is closing. */ 
     539    if (key->closing) 
     540        return PJ_ECANCELLED; 
    590541 
    591542    /* Try to see if there's data immediately available.  
     
    646597    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 
    647598    PJ_CHECK_STACK(); 
     599 
     600    /* Check if key is closing. */ 
     601    if (key->closing) 
     602        return PJ_ECANCELLED; 
    648603 
    649604    read_op = (struct read_operation*)op_key; 
     
    710665    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 
    711666    PJ_CHECK_STACK(); 
     667 
     668    /* Check if key is closing. */ 
     669    if (key->closing) 
     670        return PJ_ECANCELLED; 
    712671 
    713672    write_op = (struct write_operation*)op_key; 
     
    789748    PJ_CHECK_STACK(); 
    790749 
     750    /* Check if key is closing. */ 
     751    if (key->closing) 
     752        return PJ_ECANCELLED; 
     753 
    791754    write_op = (struct write_operation*)op_key; 
    792755    write_op->op = 0; 
     
    869832    /* check parameters. All must be specified! */ 
    870833    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 
     834 
     835    /* Check if key is closing. */ 
     836    if (key->closing) 
     837        return PJ_ECANCELLED; 
    871838 
    872839    accept_op = (struct accept_operation*)op_key; 
     
    930897    /* check parameters. All must be specified! */ 
    931898    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 
     899 
     900    /* Check if key is closing. */ 
     901    if (key->closing) 
     902        return PJ_ECANCELLED; 
    932903 
    933904    /* Check if socket has not been marked for connecting */ 
     
    987958{ 
    988959    struct generic_operation *op_rec; 
    989     key_lock_data lck_data; 
    990960 
    991961    /* 
     
    993963     * really make sure that it's still there; then call the callback. 
    994964     */ 
    995     lock_key(key, &lck_data); 
     965    pj_mutex_lock(key->mutex); 
    996966 
    997967    /* Find the operation in the pending read list. */ 
     
    1001971            pj_list_erase(op_rec); 
    1002972            op_rec->op = 0; 
    1003             //pj_mutex_unlock(key->mutex); 
     973            pj_mutex_unlock(key->mutex); 
    1004974 
    1005975            (*key->cb.on_read_complete)(key, op_key, bytes_status); 
    1006  
    1007             unlock_key(key, &lck_data); 
    1008976            return PJ_SUCCESS; 
    1009977        } 
     
    1017985            pj_list_erase(op_rec); 
    1018986            op_rec->op = 0; 
    1019             //pj_mutex_unlock(key->mutex); 
     987            pj_mutex_unlock(key->mutex); 
    1020988 
    1021989            (*key->cb.on_write_complete)(key, op_key, bytes_status); 
    1022  
    1023             unlock_key(key, &lck_data); 
    1024990            return PJ_SUCCESS; 
    1025991        } 
     
    1033999            pj_list_erase(op_rec); 
    10341000            op_rec->op = 0; 
    1035             //pj_mutex_unlock(key->mutex); 
     1001            pj_mutex_unlock(key->mutex); 
    10361002 
    10371003            (*key->cb.on_accept_complete)(key, op_key,  
    10381004                                          PJ_INVALID_SOCKET, 
    10391005                                          bytes_status); 
    1040  
    1041             unlock_key(key, &lck_data); 
    10421006            return PJ_SUCCESS; 
    10431007        } 
     
    10451009    } 
    10461010 
    1047     unlock_key(key, &lck_data); 
     1011    pj_mutex_unlock(key->mutex); 
    10481012     
    10491013    return PJ_EINVALIDOP; 
  • pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.h

    r363 r365  
    8888}; 
    8989 
     90#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     91#   define UNREG_FIELDS                 \ 
     92        unsigned            ref_count;  \ 
     93        pj_bool_t           closing;    \ 
     94        pj_time_val         free_time;  \ 
     95         
     96#else 
     97#   define UNREG_FIELDS 
     98#endif 
     99 
    90100#define DECLARE_COMMON_KEY                          \ 
    91101    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);   \ 
     
    101111    struct read_operation   read_list;              \ 
    102112    struct write_operation  write_list;             \ 
    103     struct accept_operation accept_list; 
     113    struct accept_operation accept_list;            \ 
     114    UNREG_FIELDS 
    104115 
    105116 
  • pjproject/trunk/pjlib/src/pj/ioqueue_select.c

    r126 r365  
    110110 
    111111    unsigned            max, count; 
    112     pj_ioqueue_key_t    key_list; 
     112    pj_ioqueue_key_t    active_list; 
    113113    pj_fd_set_t         rfdset; 
    114114    pj_fd_set_t         wfdset; 
     
    116116    pj_fd_set_t         xfdset; 
    117117#endif 
     118 
     119#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     120    pj_mutex_t         *ref_cnt_mutex; 
     121    pj_ioqueue_key_t    closing_list; 
     122    pj_ioqueue_key_t    free_list; 
     123#endif 
    118124}; 
    119125 
     
    142148    pj_ioqueue_t *ioqueue; 
    143149    pj_lock_t *lock; 
     150    unsigned i; 
    144151    pj_status_t rc; 
    145152 
     
    153160                     sizeof(union operation_key), PJ_EBUG); 
    154161 
     162    /* Create and init common ioqueue stuffs */ 
    155163    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 
    156  
    157164    ioqueue_init(ioqueue); 
    158165 
     
    164171    PJ_FD_ZERO(&ioqueue->xfdset); 
    165172#endif 
    166     pj_list_init(&ioqueue->key_list); 
    167  
     173    pj_list_init(&ioqueue->active_list); 
     174 
     175#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     176    /* When safe unregistration is used (the default), we pre-create 
     177     * all keys and put them in the free list. 
     178     */ 
     179 
     180    /* Mutex to protect key's reference counter  
     181     * We don't want to use key's mutex or ioqueue's mutex because 
     182     * that would create deadlock situation in some cases. 
     183     */ 
     184    rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); 
     185    if (rc != PJ_SUCCESS) 
     186        return rc; 
     187 
     188 
     189    /* Init key list */ 
     190    pj_list_init(&ioqueue->free_list); 
     191    pj_list_init(&ioqueue->closing_list); 
     192 
     193 
     194    /* Pre-create all keys according to max_fd */ 
     195    for (i=0; i<max_fd; ++i) { 
     196        pj_ioqueue_key_t *key; 
     197 
     198        key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 
     199        key->ref_count = 0; 
     200        rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 
     201        if (rc != PJ_SUCCESS) { 
     202            key = ioqueue->free_list.next; 
     203            while (key != &ioqueue->free_list) { 
     204                pj_mutex_destroy(key->mutex); 
     205                key = key->next; 
     206            } 
     207            pj_mutex_destroy(ioqueue->ref_cnt_mutex); 
     208            return rc; 
     209        } 
     210 
     211        pj_list_push_back(&ioqueue->free_list, key); 
     212    } 
     213#endif 
     214 
     215    /* Create and init ioqueue mutex */ 
    168216    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); 
    169217    if (rc != PJ_SUCCESS) 
     
    187235PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 
    188236{ 
     237    pj_ioqueue_key_t *key; 
     238 
    189239    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
    190240 
    191241    pj_lock_acquire(ioqueue->lock); 
     242 
     243#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     244    /* Destroy reference counters */ 
     245    key = ioqueue->active_list.next; 
     246    while (key != &ioqueue->active_list) { 
     247        pj_mutex_destroy(key->mutex); 
     248        key = key->next; 
     249    } 
     250 
     251    key = ioqueue->closing_list.next; 
     252    while (key != &ioqueue->closing_list) { 
     253        pj_mutex_destroy(key->mutex); 
     254        key = key->next; 
     255    } 
     256 
     257    key = ioqueue->free_list.next; 
     258    while (key != &ioqueue->free_list) { 
     259        pj_mutex_destroy(key->mutex); 
     260        key = key->next; 
     261    } 
     262 
     263    pj_mutex_destroy(ioqueue->ref_cnt_mutex); 
     264#endif 
     265 
    192266    return ioqueue_destroy(ioqueue); 
    193267} 
     
    197271 * pj_ioqueue_register_sock() 
    198272 * 
    199  * Register a handle to ioqueue. 
     273 * Register socket handle to ioqueue. 
    200274 */ 
    201275PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 
     
    217291    if (ioqueue->count >= ioqueue->max) { 
    218292        rc = PJ_ETOOMANY; 
     293        goto on_return; 
     294    } 
     295 
     296    /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get 
     297     * the key from the free list. Otherwise allocate a new one.  
     298     */ 
     299#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     300    pj_assert(!pj_list_empty(&ioqueue->free_list)); 
     301    if (pj_list_empty(&ioqueue->free_list)) { 
     302        rc = PJ_ETOOMANY; 
     303        goto on_return; 
     304    } 
     305 
     306    key = ioqueue->free_list.next; 
     307    pj_list_erase(key); 
     308#else 
     309    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
     310#endif 
     311 
     312    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 
     313    if (rc != PJ_SUCCESS) { 
     314        key = NULL; 
    219315        goto on_return; 
    220316    } 
     
    232328    } 
    233329 
    234     /* Create key. */ 
    235     key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
    236     rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 
    237     if (rc != PJ_SUCCESS) { 
    238         key = NULL; 
    239         goto on_return; 
    240     } 
    241  
    242     /* Register */ 
    243     pj_list_insert_before(&ioqueue->key_list, key); 
     330 
     331    /* Put in active list. */ 
     332    pj_list_insert_before(&ioqueue->active_list, key); 
    244333    ++ioqueue->count; 
    245334 
     
    252341} 
    253342 
     343#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     344/* Increment key's reference counter */ 
     345static void increment_counter(pj_ioqueue_key_t *key) 
     346{ 
     347    pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 
     348    ++key->ref_count; 
     349    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 
     350} 
     351 
     352/* Decrement the key's reference counter, and when the counter reach zero, 
     353 * destroy the key. 
     354 * 
     355 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. 
     356 */ 
     357static void decrement_counter(pj_ioqueue_key_t *key) 
     358{ 
     359    pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 
     360    --key->ref_count; 
     361    if (key->ref_count == 0) { 
     362 
     363        pj_assert(key->closing == 1); 
     364        pj_gettimeofday(&key->free_time); 
     365        key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 
     366        pj_time_val_normalize(&key->free_time); 
     367 
     368        pj_lock_acquire(key->ioqueue->lock); 
     369        pj_list_erase(key); 
     370        pj_list_push_back(&key->ioqueue->closing_list, key); 
     371        pj_lock_release(key->ioqueue->lock); 
     372    } 
     373    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 
     374} 
     375#endif 
     376 
     377 
    254378/* 
    255379 * pj_ioqueue_unregister() 
     
    265389    ioqueue = key->ioqueue; 
    266390 
     391    /* Lock the key to make sure no callback is simultaneously modifying 
     392     * the key. We need to lock the key before ioqueue here to prevent 
     393     * deadlock. 
     394     */ 
     395    pj_mutex_lock(key->mutex); 
     396 
     397    /* Also lock ioqueue */ 
    267398    pj_lock_acquire(ioqueue->lock); 
    268399 
     
    276407#endif 
    277408 
    278     /* ioqueue_destroy may try to acquire key's mutex. 
    279      * Since normally the order of locking is to lock key's mutex first 
    280      * then ioqueue's mutex, ioqueue_destroy may deadlock unless we 
    281      * release ioqueue's mutex first. 
     409    /* Close socket. */ 
     410    pj_sock_close(key->fd); 
     411 
     412    /* Clear callback */ 
     413    key->cb.on_accept_complete = NULL; 
     414    key->cb.on_connect_complete = NULL; 
     415    key->cb.on_read_complete = NULL; 
     416    key->cb.on_write_complete = NULL; 
     417 
     418    /* Must release ioqueue lock first before decrementing counter, to 
     419     * prevent deadlock. 
    282420     */ 
    283421    pj_lock_release(ioqueue->lock); 
    284422 
    285     /* Destroy the key. */ 
    286     ioqueue_destroy_key(key); 
     423#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     424    /* Mark key is closing. */ 
     425    key->closing = 1; 
     426 
     427    /* Decrement counter. */ 
     428    decrement_counter(key); 
     429 
     430    /* Done. */ 
     431    pj_mutex_unlock(key->mutex); 
     432#else 
     433    pj_mutex_destroy(key->mutex); 
     434#endif 
    287435 
    288436    return PJ_SUCCESS; 
     
    309457    pj_assert(0); 
    310458 
    311     key = ioqueue->key_list.next; 
    312     while (key != &ioqueue->key_list) { 
     459    key = ioqueue->active_list.next; 
     460    while (key != &ioqueue->active_list) { 
    313461        if (!pj_list_empty(&key->read_list) 
    314462#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 
     
    396544} 
    397545 
     546#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     547/* Scan closing keys to be put to free list again */ 
     548static void scan_closing_keys(pj_ioqueue_t *ioqueue) 
     549{ 
     550    pj_time_val now; 
     551    pj_ioqueue_key_t *h; 
     552 
     553    pj_gettimeofday(&now); 
     554    h = ioqueue->closing_list.next; 
     555    while (h != &ioqueue->closing_list) { 
     556        pj_ioqueue_key_t *next = h->next; 
     557 
     558        pj_assert(h->closing != 0); 
     559 
     560        if (PJ_TIME_VAL_GTE(now, h->free_time)) { 
     561            pj_list_erase(h); 
     562            pj_list_push_back(&ioqueue->free_list, h); 
     563        } 
     564        h = next; 
     565    } 
     566} 
     567#endif 
     568 
     569 
    398570/* 
    399571 * pj_ioqueue_poll() 
     
    436608        PJ_FD_COUNT(&ioqueue->xfdset)==0) 
    437609    { 
    438         pj_lock_release(ioqueue->lock); 
     610#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     611        scan_closing_keys(ioqueue); 
     612#endif 
     613        pj_lock_release(ioqueue->lock); 
    439614        if (timeout) 
    440615            pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); 
     
    476651     * coming with accept(). 
    477652     */ 
    478     h = ioqueue->key_list.next; 
    479     for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { 
     653    h = ioqueue->active_list.next; 
     654    for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) { 
     655 
    480656        if ( (key_has_pending_write(h) || key_has_pending_connect(h)) 
    481              && PJ_FD_ISSET(h->fd, &wfdset)) 
     657             && PJ_FD_ISSET(h->fd, &wfdset) && !h->closing) 
    482658        { 
     659#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     660            increment_counter(h); 
     661#endif 
    483662            event[counter].key = h; 
    484663            event[counter].event_type = WRITEABLE_EVENT; 
     
    488667        /* Scan for readable socket. */ 
    489668        if ((key_has_pending_read(h) || key_has_pending_accept(h)) 
    490             && PJ_FD_ISSET(h->fd, &rfdset)) 
     669            && PJ_FD_ISSET(h->fd, &rfdset) && !h->closing) 
    491670        { 
     671#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     672            increment_counter(h); 
     673#endif 
    492674            event[counter].key = h; 
    493675            event[counter].event_type = READABLE_EVENT; 
     
    496678 
    497679#if PJ_HAS_TCP 
    498         if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { 
     680        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && 
     681            !h->closing)  
     682        { 
     683#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     684            increment_counter(h); 
     685#endif 
    499686            event[counter].key = h; 
    500687            event[counter].event_type = EXCEPTION_EVENT; 
     
    526713            break; 
    527714        } 
    528     } 
     715 
     716#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     717        decrement_counter(event[counter].key); 
     718#endif 
     719    } 
     720 
    529721 
    530722    return count; 
  • pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c

    r349 r365  
    107107struct pj_ioqueue_key_t 
    108108{ 
     109    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); 
     110 
    109111    pj_ioqueue_t       *ioqueue; 
    110112    HANDLE              hnd; 
    111113    void               *user_data; 
    112114    enum handle_type    hnd_type; 
     115    pj_ioqueue_callback cb; 
     116 
    113117#if PJ_HAS_TCP 
    114118    int                 connecting; 
    115119#endif 
    116     pj_ioqueue_callback cb; 
    117     pj_bool_t           has_quit_signal; 
     120 
     121#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     122    pj_atomic_t        *ref_count; 
     123    pj_bool_t           closing; 
     124    pj_time_val         free_time; 
     125#endif 
     126 
    118127}; 
    119128 
     
    126135    pj_lock_t        *lock; 
    127136    pj_bool_t         auto_delete_lock; 
     137 
     138#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     139    pj_ioqueue_key_t  active_list; 
     140    pj_ioqueue_key_t  free_list; 
     141    pj_ioqueue_key_t  closing_list; 
     142#endif 
     143 
     144    /* These are to keep track of connecting sockets */ 
     145#if PJ_HAS_TCP 
    128146    unsigned          event_count; 
    129147    HANDLE            event_pool[MAXIMUM_WAIT_OBJECTS+1]; 
    130 #if PJ_HAS_TCP 
    131148    unsigned          connecting_count; 
    132149    HANDLE            connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; 
     
    280297{ 
    281298    pj_ioqueue_t *ioqueue; 
     299    unsigned i; 
    282300    pj_status_t rc; 
    283301 
     
    291309                     sizeof(union operation_key), PJ_EBUG); 
    292310 
     311    /* Create IOCP */ 
    293312    ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); 
    294313    ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 
     
    296315        return PJ_RETURN_OS_ERROR(GetLastError()); 
    297316 
     317    /* Create IOCP mutex */ 
    298318    rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); 
    299319    if (rc != PJ_SUCCESS) { 
     
    304324    ioqueue->auto_delete_lock = PJ_TRUE; 
    305325 
     326#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     327    /* 
     328     * Create and initialize key pools. 
     329     */ 
     330    pj_list_init(&ioqueue->active_list); 
     331    pj_list_init(&ioqueue->free_list); 
     332    pj_list_init(&ioqueue->closing_list); 
     333 
     334    /* Preallocate keys according to max_fd setting, and put them 
     335     * in free_list. 
     336     */ 
     337    for (i=0; i<max_fd; ++i) { 
     338        pj_ioqueue_key_t *key; 
     339 
     340        key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 
     341 
     342        rc = pj_atomic_create(pool, 0, &key->ref_count); 
     343        if (rc != PJ_SUCCESS) { 
     344            key = ioqueue->free_list.next; 
     345            while (key != &ioqueue->free_list) { 
     346                pj_atomic_destroy(key->ref_count); 
     347                key = key->next; 
     348            } 
     349            CloseHandle(ioqueue->iocp); 
     350            return rc; 
     351        } 
     352 
     353        pj_list_push_back(&ioqueue->free_list, key); 
     354 
     355    } 
     356#endif 
     357 
    306358    *p_ioqueue = ioqueue; 
    307359 
     
    316368{ 
    317369    unsigned i; 
     370    pj_ioqueue_key_t *key; 
    318371 
    319372    PJ_CHECK_STACK(); 
    320373    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
    321374 
     375    pj_lock_acquire(ioqueue->lock); 
     376 
     377#if PJ_HAS_TCP 
    322378    /* Destroy events in the pool */ 
    323379    for (i=0; i<ioqueue->event_count; ++i) { 
     
    325381    } 
    326382    ioqueue->event_count = 0; 
     383#endif 
    327384 
    328385    if (CloseHandle(ioqueue->iocp) != TRUE) 
    329386        return PJ_RETURN_OS_ERROR(GetLastError()); 
     387 
     388#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     389    /* Destroy reference counters */ 
     390    key = ioqueue->active_list.next; 
     391    while (key != &ioqueue->active_list) { 
     392        pj_atomic_destroy(key->ref_count); 
     393        key = key->next; 
     394    } 
     395 
     396    key = ioqueue->closing_list.next; 
     397    while (key != &ioqueue->closing_list) { 
     398        pj_atomic_destroy(key->ref_count); 
     399        key = key->next; 
     400    } 
     401 
     402    key = ioqueue->free_list.next; 
     403    while (key != &ioqueue->free_list) { 
     404        pj_atomic_destroy(key->ref_count); 
     405        key = key->next; 
     406    } 
     407#endif 
    330408 
    331409    if (ioqueue->auto_delete_lock) 
     
    371449    PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); 
    372450 
     451    pj_lock_acquire(ioqueue->lock); 
     452 
     453#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     454    /* If safe unregistration is used, then get the key record from 
     455     * the free list. 
     456     */ 
     457    if (pj_list_empty(&ioqueue->free_list)) { 
     458        pj_lock_release(ioqueue->lock); 
     459        return PJ_ETOOMANY; 
     460    } 
     461 
     462    rec = ioqueue->free_list.next; 
     463    pj_list_erase(rec); 
     464 
     465    /* Set initial reference count to 1 */ 
     466    pj_assert(pj_atomic_get(rec->ref_count) == 0); 
     467    pj_atomic_inc(rec->ref_count); 
     468 
     469    rec->closing = 0; 
     470 
     471#else 
     472    rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
     473#endif 
     474 
    373475    /* Build the key for this socket. */ 
    374     rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
    375476    rec->ioqueue = ioqueue; 
    376477    rec->hnd = (HANDLE)sock; 
     
    379480    pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 
    380481 
     482#if PJ_HAS_TCP 
     483    rec->connecting = 0; 
     484#endif 
     485 
    381486    /* Set socket to nonblocking. */ 
    382487    value = 1; 
    383488    rc = ioctlsocket(sock, FIONBIO, &value); 
    384489    if (rc != 0) { 
     490        pj_lock_release(ioqueue->lock); 
    385491        return PJ_RETURN_OS_ERROR(WSAGetLastError()); 
    386492    } 
     
    389495    hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); 
    390496    if (!hioq) { 
     497        pj_lock_release(ioqueue->lock); 
    391498        return PJ_RETURN_OS_ERROR(GetLastError()); 
    392499    } 
    393500 
    394501    *key = rec; 
     502 
     503#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     504    pj_list_push_back(&ioqueue->active_list, rec); 
     505#endif 
     506 
     507    pj_lock_release(ioqueue->lock); 
     508 
    395509    return PJ_SUCCESS; 
    396510} 
     
    423537 
    424538 
    425  
    426 /* 
    427  * Internal function to poll the I/O Completion Port, execute callback,  
     539#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     540/* Decrement the key's reference counter, and when the counter reach zero, 
     541 * destroy the key. 
     542 */ 
     543static void decrement_counter(pj_ioqueue_key_t *key) 
     544{ 
     545    if (pj_atomic_dec_and_get(key->ref_count) == 0) { 
     546 
     547        pj_lock_acquire(key->ioqueue->lock); 
     548 
     549        pj_assert(key->closing == 1); 
     550        pj_gettimeofday(&key->free_time); 
     551        key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 
     552        pj_time_val_normalize(&key->free_time); 
     553 
     554        pj_list_erase(key); 
     555        pj_list_push_back(&key->ioqueue->closing_list, key); 
     556 
     557        pj_lock_release(key->ioqueue->lock); 
     558    } 
     559} 
     560#endif 
     561 
     562/* 
     563 * Poll the I/O Completion Port, execute callback,  
    428564 * and return the key and bytes transfered of the last operation. 
    429565 */ 
     
    458594            *p_key = key; 
    459595 
    460         /* If size_status is POST_QUIT_LEN, mark the key as quitting */ 
    461         if (size_status == POST_QUIT_LEN) { 
    462             key->has_quit_signal = 1; 
     596#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     597        /* We shouldn't call callbacks if key is quitting. */ 
     598        if (key->closing) 
    463599            return PJ_TRUE; 
    464         } 
    465  
    466         /* We shouldn't call callbacks if key is quitting.  
    467          * But this should have been taken care by unregister function 
    468          * (the unregister function should have cleared out the callbacks) 
     600 
     601        /* Increment reference counter to prevent this key from being 
     602         * deleted 
    469603         */ 
     604        pj_atomic_inc(key->ref_count); 
     605#endif 
    470606 
    471607        /* Carry out the callback */ 
     
    505641            break; 
    506642        } 
     643 
     644#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     645        decrement_counter(key); 
     646#endif 
     647 
    507648        return PJ_TRUE; 
    508649    } 
     
    517658PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 
    518659{ 
    519     pj_ssize_t polled_len; 
    520     pj_ioqueue_key_t *polled_key; 
    521     generic_overlapped ov; 
    522     BOOL rc; 
    523  
    524660    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
    525661 
     
    543679    } 
    544680#endif 
    545  
    546  
    547     /* Unregistering handle from IOCP is pretty tricky. 
    548      * 
    549      * Even after the socket has been closed, GetQueuedCompletionStatus 
    550      * may still return events for the handle. This will likely to 
    551      * cause crash in pjlib, because the key associated with the handle 
    552      * most likely will have been destroyed. 
    553      * 
    554      * The solution is to poll the IOCP until we're sure that there are 
    555      * no further events for the handle. 
    556      */ 
    557  
    558     /* Clear up callbacks for the key.  
    559      * We don't want the callback to be called for this key. 
    560      */ 
     681     
     682    /* Close handle (the only way to disassociate handle from IOCP).  
     683     * We also need to close handle to make sure that no further events 
     684     * will come to the handle. 
     685     */ 
     686    CloseHandle(key->hnd); 
     687 
     688    /* Reset callbacks */ 
     689    key->cb.on_accept_complete = NULL; 
     690    key->cb.on_connect_complete = NULL; 
    561691    key->cb.on_read_complete = NULL; 
    562692    key->cb.on_write_complete = NULL; 
    563     key->cb.on_accept_complete = NULL; 
    564     key->cb.on_connect_complete = NULL; 
    565  
    566     /* Init overlapped struct */ 
    567     pj_memset(&ov, 0, sizeof(ov)); 
    568     ov.operation = PJ_IOQUEUE_OP_READ; 
    569  
    570     /* Post queued completion status with a special length. */ 
    571     rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN, 
    572                                      (DWORD)key, &ov.overlapped); 
    573  
    574     /* Poll IOCP until has_quit_signal is set in the key. 
    575      * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN 
    576      * is detected. We need to have this flag because POST_QUIT_LEN may be 
    577      * detected by other threads. 
    578      */ 
    579     do { 
    580         polled_len = 0; 
    581         polled_key = NULL; 
    582  
    583         rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key); 
    584  
    585     } while (rc && !key->has_quit_signal); 
    586  
    587  
    588     /* Close handle if this is a file. */ 
    589     if (key->hnd_type == HND_IS_FILE) { 
    590         CloseHandle(key->hnd); 
    591     } 
     693 
     694#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     695    /* Mark key as closing. */ 
     696    key->closing = 1; 
     697 
     698    /* Decrement reference counter. */ 
     699    decrement_counter(key); 
     700 
     701    /* Even after handle is closed, I suspect that IOCP may still try to 
     702     * do something with the handle, causing memory corruption when pool 
     703     * debugging is enabled. 
     704     * 
     705     * Forcing context switch seems to have fixed that, but this is quite 
     706     * an ugly solution.. 
     707     */ 
     708    pj_thread_sleep(0); 
     709#endif 
    592710 
    593711    return PJ_SUCCESS; 
     
    603721    DWORD dwMsec; 
    604722    int connect_count = 0; 
    605     pj_bool_t has_event; 
     723    int event_count = 0; 
    606724 
    607725    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 
    608  
    609     /* Check the connecting array. */ 
    610 #if PJ_HAS_TCP 
    611     connect_count = check_connecting(ioqueue); 
    612 #endif 
    613726 
    614727    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ 
     
    616729 
    617730    /* Poll for completion status. */ 
    618     has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 
     731    event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 
     732 
     733#if PJ_HAS_TCP 
     734    /* Check the connecting array, only when there's no activity. */ 
     735    if (event_count == 0) { 
     736        connect_count = check_connecting(ioqueue); 
     737        if (connect_count > 0) 
     738            event_count += connect_count; 
     739    } 
     740#endif 
     741 
     742#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     743    /* Check the closing keys only when there's no activity and when there are 
     744     * pending closing keys. 
     745     */ 
     746    if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 
     747        pj_time_val now; 
     748        pj_ioqueue_key_t *key; 
     749 
     750        pj_gettimeofday(&now); 
     751         
     752        /* Move closing keys to free list when they've finished the closing 
     753         * idle time. 
     754         */ 
     755        pj_lock_acquire(ioqueue->lock); 
     756        key = ioqueue->closing_list.next; 
     757        while (key != &ioqueue->closing_list) { 
     758            pj_ioqueue_key_t *next = key->next; 
     759 
     760            pj_assert(key->closing != 0); 
     761 
     762            if (PJ_TIME_VAL_GTE(now, key->free_time)) { 
     763                pj_list_erase(key); 
     764                pj_list_push_back(&ioqueue->free_list, key); 
     765            } 
     766            key = next; 
     767        } 
     768        pj_lock_release(ioqueue->lock); 
     769    } 
     770#endif 
    619771 
    620772    /* Return number of events. */ 
    621     return connect_count + has_event; 
     773    return event_count; 
    622774} 
    623775 
     
    645797    PJ_CHECK_STACK(); 
    646798    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 
     799 
     800#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     801    /* Check key is not closing */ 
     802    if (key->closing) 
     803        return PJ_ECANCELLED; 
     804#endif 
    647805 
    648806    op_key_rec = (union operation_key*)op_key->internal__; 
     
    716874    PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); 
    717875 
     876#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     877    /* Check key is not closing */ 
     878    if (key->closing) 
     879        return PJ_ECANCELLED; 
     880#endif 
     881 
    718882    op_key_rec = (union operation_key*)op_key->internal__; 
    719883    op_key_rec->overlapped.wsabuf.buf = buffer; 
     
    800964    PJ_CHECK_STACK(); 
    801965    PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); 
    802      
     966 
     967#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     968    /* Check key is not closing */ 
     969    if (key->closing) 
     970        return PJ_ECANCELLED; 
     971#endif 
     972 
    803973    op_key_rec = (union operation_key*)op_key->internal__; 
    804974 
     
    8731043    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 
    8741044 
     1045#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     1046    /* Check key is not closing */ 
     1047    if (key->closing) 
     1048        return PJ_ECANCELLED; 
     1049#endif 
     1050 
    8751051    /* 
    8761052     * See if there is a new connection immediately available. 
     
    9621138    PJ_CHECK_STACK(); 
    9631139    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 
     1140 
     1141#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     1142    /* Check key is not closing */ 
     1143    if (key->closing) 
     1144        return PJ_ECANCELLED; 
     1145#endif 
    9641146 
    9651147    /* Initiate connect() */ 
  • pjproject/trunk/pjlib/src/pjlib-test/ioq_perf.c

    r363 r365  
    399399        pj_ioqueue_unregister(items[i].server_key); 
    400400        pj_ioqueue_unregister(items[i].client_key); 
    401         pj_sock_close(items[i].server_fd); 
    402         pj_sock_close(items[i].client_fd); 
    403401    } 
    404402 
  • pjproject/trunk/pjlib/src/pjlib-test/ioq_udp.c

    r349 r365  
    458458    /* Now unregister and close socket. */ 
    459459    pj_ioqueue_unregister(key); 
    460     pj_sock_close(rsock); 
    461460 
    462461    /* Poll ioqueue. */ 
     
    539538        if (rc != PJ_SUCCESS) { 
    540539            app_perror("...error in pj_ioqueue_unregister", rc); 
    541         } 
    542         rc = pj_sock_close(sock[i]); 
    543         if (rc != PJ_SUCCESS) { 
    544             app_perror("...error in pj_sock_close", rc); 
    545540        } 
    546541    } 
  • pjproject/trunk/pjlib/src/pjlib-test/test.c

    r126 r365  
    146146#endif 
    147147 
     148#if INCLUDE_IOQUEUE_UNREG_TEST 
     149    DO_TEST( udp_ioqueue_unreg_test() ); 
     150#endif 
     151 
    148152#if INCLUDE_FILE_TEST 
    149153    DO_TEST( file_test() ); 
  • pjproject/trunk/pjlib/src/pjlib-test/test.h

    r65 r365  
    4949#define INCLUDE_TCP_IOQUEUE_TEST    GROUP_NETWORK 
    5050#define INCLUDE_IOQUEUE_PERF_TEST   GROUP_NETWORK 
     51#define INCLUDE_IOQUEUE_UNREG_TEST  GROUP_NETWORK 
    5152#define INCLUDE_FILE_TEST           GROUP_FILE 
    5253 
     
    8384extern int select_test(void); 
    8485extern int udp_ioqueue_test(void); 
     86extern int udp_ioqueue_unreg_test(void); 
    8587extern int tcp_ioqueue_test(void); 
    8688extern int ioqueue_perf_test(void); 
Note: See TracChangeset for help on using the changeset viewer.