Ignore:
Timestamp:
Mar 30, 2006 4:32:18 PM (18 years ago)
Author:
bennylp
Message:

Fixed race condition bug in ioqueue unregistration for select and Win32 IOCP backend

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c

    r363 r365  
    2828 */ 
    2929 
    30 static long ioqueue_tls_id = -1; 
    31  
    32 typedef struct key_lock_data { 
    33     struct key_lock_data *prev; 
    34     pj_ioqueue_key_t     *key; 
    35     int                   is_alive; 
    36 } key_lock_data; 
    37  
    38  
    3930static void ioqueue_init( pj_ioqueue_t *ioqueue ) 
    4031{ 
    4132    ioqueue->lock = NULL; 
    4233    ioqueue->auto_delete_lock = 0; 
    43  
    44     if (ioqueue_tls_id == -1) { 
    45         pj_status_t status; 
    46         status = pj_thread_local_alloc(&ioqueue_tls_id); 
    47         pj_thread_local_set(ioqueue_tls_id, NULL); 
    48     } 
    4934} 
    5035 
     
    9479#if PJ_HAS_TCP 
    9580    pj_list_init(&key->accept_list); 
     81    key->connecting = 0; 
    9682#endif 
    9783 
    9884    /* Save callback. */ 
    9985    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 
     86 
     87#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     88    /* Set initial reference count to 1 */ 
     89    pj_assert(key->ref_count == 0); 
     90    ++key->ref_count; 
     91 
     92    key->closing = 0; 
     93#endif 
    10094 
    10195    /* Get socket type. When socket type is datagram, some optimization 
     
    108102        key->fd_type = PJ_SOCK_STREAM; 
    109103 
    110     key->inside_callback = 0; 
    111     key->destroy_requested = 0; 
    112  
    113104    /* Create mutex for the key. */ 
    114     rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 
     105#if !PJ_IOQUEUE_HAS_SAFE_UNREG 
     106    rc = pj_mutex_create_simple(pool, NULL, &key->mutex); 
     107#endif 
    115108     
    116109    return rc; 
    117 } 
    118  
    119 /* Lock the key and also keep the lock data in thread local storage. 
    120  * The lock data is used to detect if the key is deleted by application 
    121  * when we call its callback. 
    122  */ 
    123 static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck) 
    124 { 
    125     struct key_lock_data *prev_data; 
    126  
    127     pj_mutex_lock(key->mutex); 
    128     prev_data = (struct key_lock_data *)  
    129                     pj_thread_local_get(ioqueue_tls_id); 
    130     lck->prev = prev_data; 
    131     lck->key = key; 
    132     lck->is_alive = 1; 
    133     pj_thread_local_set(ioqueue_tls_id, lck); 
    134 } 
    135  
    136 /* Unlock the key only if it is still valid. */ 
    137 static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck) 
    138 { 
    139     pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck); 
    140     pj_assert( lck->key == key ); 
    141     pj_thread_local_set(ioqueue_tls_id, lck->prev); 
    142     if (lck->is_alive) 
    143         pj_mutex_unlock(key->mutex); 
    144 } 
    145  
    146 /* Destroy key */ 
    147 static void ioqueue_destroy_key( pj_ioqueue_key_t *key ) 
    148 { 
    149     key_lock_data *lck; 
    150  
    151     /* Make sure that no other threads are doing something with 
    152      * the key. 
    153      */ 
    154     pj_mutex_lock(key->mutex); 
    155      
    156     /* Check if this function is called within a callback context. 
    157      * If so, then we need to inform the callback that the key has 
    158      * been destroyed so that it doesn't attempt to unlock the 
    159      * key's mutex. 
    160      */ 
    161     lck = pj_thread_local_get(ioqueue_tls_id); 
    162     while (lck) { 
    163         if (lck->key == key) { 
    164             lck->is_alive = 0; 
    165         } 
    166         lck = lck->prev; 
    167     } 
    168  
    169     pj_mutex_destroy(key->mutex); 
    170110} 
    171111 
     
    222162 
    223163 
     164#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     165#   define IS_CLOSING(key)  (key->closing) 
     166#else 
     167#   define IS_CLOSING(key)  (0) 
     168#endif 
     169 
     170 
    224171/* 
    225172 * ioqueue_dispatch_event() 
     
    230177void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) 
    231178{ 
    232     key_lock_data lck_data; 
    233  
    234179    /* Lock the key. */ 
    235     lock_key(h, &lck_data); 
     180    pj_mutex_lock(h->mutex); 
     181 
     182    if (h->closing) { 
     183        pj_mutex_unlock(h->mutex); 
     184        return; 
     185    } 
    236186 
    237187#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 
     
    246196        ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); 
    247197 
    248         /* Unlock; from this point we don't need to hold key's mutex. */ 
    249         //pj_mutex_unlock(h->mutex); 
    250198 
    251199#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) 
     
    294242#endif 
    295243 
     244        /* Unlock; from this point we don't need to hold key's mutex. */ 
     245        pj_mutex_unlock(h->mutex); 
     246 
    296247        /* Call callback. */ 
    297         if (h->cb.on_connect_complete) 
     248        if (h->cb.on_connect_complete && !IS_CLOSING(h)) 
    298249            (*h->cb.on_connect_complete)(h, bytes_transfered); 
    299250 
     
    320271                ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 
    321272 
    322             //pj_mutex_unlock(h->mutex); 
    323273        } 
    324274 
     
    366316                    ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 
    367317 
    368                 /* No need to hold mutex anymore */ 
    369                 //pj_mutex_unlock(h->mutex); 
    370318            } 
    371319 
     320            /* No need to hold mutex anymore */ 
     321            pj_mutex_unlock(h->mutex); 
     322 
    372323            /* Call callback. */ 
    373             if (h->cb.on_write_complete) { 
     324            if (h->cb.on_write_complete && !IS_CLOSING(h)) { 
    374325                (*h->cb.on_write_complete)(h,  
    375326                                           (pj_ioqueue_op_key_t*)write_op, 
     
    378329 
    379330        } else { 
    380             //pj_mutex_unlock(h->mutex); 
     331            pj_mutex_unlock(h->mutex); 
    381332        } 
    382333 
     
    388339         * able to process the event. 
    389340         */ 
    390         //pj_mutex_unlock(h->mutex); 
    391     } 
    392  
    393     /* Finally unlock key */ 
    394     unlock_key(h, &lck_data); 
     341        pj_mutex_unlock(h->mutex); 
     342    } 
    395343} 
    396344 
    397345void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) 
    398346{ 
    399     key_lock_data lck_data; 
    400347    pj_status_t rc; 
    401348 
    402349    /* Lock the key. */ 
    403     lock_key(h, &lck_data); 
     350    pj_mutex_lock(h->mutex); 
     351 
     352    if (h->closing) { 
     353        pj_mutex_unlock(h->mutex); 
     354        return; 
     355    } 
    404356 
    405357#   if PJ_HAS_TCP 
     
    416368        if (pj_list_empty(&h->accept_list)) 
    417369            ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); 
    418  
    419         /* Unlock; from this point we don't need to hold key's mutex. */ 
    420         //pj_mutex_unlock(h->mutex); 
    421370 
    422371        rc=pj_sock_accept(h->fd, accept_op->accept_fd,  
     
    428377        } 
    429378 
     379        /* Unlock; from this point we don't need to hold key's mutex. */ 
     380        pj_mutex_unlock(h->mutex); 
     381 
    430382        /* Call callback. */ 
    431         if (h->cb.on_accept_complete) { 
     383        if (h->cb.on_accept_complete && !IS_CLOSING(h)) { 
    432384            (*h->cb.on_accept_complete)(h,  
    433385                                        (pj_ioqueue_op_key_t*)accept_op, 
     
    449401        if (pj_list_empty(&h->read_list)) 
    450402            ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); 
    451  
    452         /* Unlock; from this point we don't need to hold key's mutex. */ 
    453         //Crash as of revision 353 (since we added pjmedia socket to 
    454         //main ioqueue). 
    455         //pj_mutex_unlock(h->mutex); 
    456403 
    457404        bytes_read = read_op->size; 
     
    517464        } 
    518465 
     466        /* Unlock; from this point we don't need to hold key's mutex. */ 
     467        pj_mutex_unlock(h->mutex); 
     468 
    519469        /* Call callback. */ 
    520         if (h->cb.on_read_complete) { 
     470        if (h->cb.on_read_complete && !IS_CLOSING(h)) { 
    521471            (*h->cb.on_read_complete)(h,  
    522472                                      (pj_ioqueue_op_key_t*)read_op, 
     
    530480         * able to process the event. 
    531481         */ 
    532         //pj_mutex_unlock(h->mutex); 
    533     } 
    534  
    535     /* Unlock handle. */ 
    536     unlock_key(h, &lck_data); 
     482        pj_mutex_unlock(h->mutex); 
     483    } 
    537484} 
    538485 
     
    541488                                       pj_ioqueue_key_t *h ) 
    542489{ 
    543     key_lock_data lck_data; 
    544  
    545     lock_key(h, &lck_data); 
     490    pj_mutex_lock(h->mutex); 
    546491 
    547492    if (!h->connecting) { 
     
    550495         * it has been processed by other thread. 
    551496         */ 
    552         //pj_mutex_unlock(h->mutex); 
    553         unlock_key(h, &lck_data); 
     497        pj_mutex_unlock(h->mutex); 
    554498        return; 
     499    } 
     500 
     501    if (h->closing) { 
     502        pj_mutex_unlock(h->mutex); 
     503        return; 
    555504    } 
    556505 
     
    558507    h->connecting = 0; 
    559508 
    560     //pj_mutex_unlock(h->mutex); 
    561  
    562509    ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 
    563510    ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); 
    564511 
     512    pj_mutex_unlock(h->mutex); 
     513 
    565514    /* Call callback. */ 
    566     if (h->cb.on_connect_complete) 
     515    if (h->cb.on_connect_complete && !IS_CLOSING(h)) 
    567516        (*h->cb.on_connect_complete)(h, -1); 
    568  
    569     unlock_key(h, &lck_data); 
    570517} 
    571518 
     
    588535    read_op = (struct read_operation*)op_key; 
    589536    read_op->op = 0; 
     537 
     538    /* Check if key is closing. */ 
     539    if (key->closing) 
     540        return PJ_ECANCELLED; 
    590541 
    591542    /* Try to see if there's data immediately available.  
     
    646597    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 
    647598    PJ_CHECK_STACK(); 
     599 
     600    /* Check if key is closing. */ 
     601    if (key->closing) 
     602        return PJ_ECANCELLED; 
    648603 
    649604    read_op = (struct read_operation*)op_key; 
     
    710665    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 
    711666    PJ_CHECK_STACK(); 
     667 
     668    /* Check if key is closing. */ 
     669    if (key->closing) 
     670        return PJ_ECANCELLED; 
    712671 
    713672    write_op = (struct write_operation*)op_key; 
     
    789748    PJ_CHECK_STACK(); 
    790749 
     750    /* Check if key is closing. */ 
     751    if (key->closing) 
     752        return PJ_ECANCELLED; 
     753 
    791754    write_op = (struct write_operation*)op_key; 
    792755    write_op->op = 0; 
     
    869832    /* check parameters. All must be specified! */ 
    870833    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 
     834 
     835    /* Check if key is closing. */ 
     836    if (key->closing) 
     837        return PJ_ECANCELLED; 
    871838 
    872839    accept_op = (struct accept_operation*)op_key; 
     
    930897    /* check parameters. All must be specified! */ 
    931898    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 
     899 
     900    /* Check if key is closing. */ 
     901    if (key->closing) 
     902        return PJ_ECANCELLED; 
    932903 
    933904    /* Check if socket has not been marked for connecting */ 
     
    987958{ 
    988959    struct generic_operation *op_rec; 
    989     key_lock_data lck_data; 
    990960 
    991961    /* 
     
    993963     * really make sure that it's still there; then call the callback. 
    994964     */ 
    995     lock_key(key, &lck_data); 
     965    pj_mutex_lock(key->mutex); 
    996966 
    997967    /* Find the operation in the pending read list. */ 
     
    1001971            pj_list_erase(op_rec); 
    1002972            op_rec->op = 0; 
    1003             //pj_mutex_unlock(key->mutex); 
     973            pj_mutex_unlock(key->mutex); 
    1004974 
    1005975            (*key->cb.on_read_complete)(key, op_key, bytes_status); 
    1006  
    1007             unlock_key(key, &lck_data); 
    1008976            return PJ_SUCCESS; 
    1009977        } 
     
    1017985            pj_list_erase(op_rec); 
    1018986            op_rec->op = 0; 
    1019             //pj_mutex_unlock(key->mutex); 
     987            pj_mutex_unlock(key->mutex); 
    1020988 
    1021989            (*key->cb.on_write_complete)(key, op_key, bytes_status); 
    1022  
    1023             unlock_key(key, &lck_data); 
    1024990            return PJ_SUCCESS; 
    1025991        } 
     
    1033999            pj_list_erase(op_rec); 
    10341000            op_rec->op = 0; 
    1035             //pj_mutex_unlock(key->mutex); 
     1001            pj_mutex_unlock(key->mutex); 
    10361002 
    10371003            (*key->cb.on_accept_complete)(key, op_key,  
    10381004                                          PJ_INVALID_SOCKET, 
    10391005                                          bytes_status); 
    1040  
    1041             unlock_key(key, &lck_data); 
    10421006            return PJ_SUCCESS; 
    10431007        } 
     
    10451009    } 
    10461010 
    1047     unlock_key(key, &lck_data); 
     1011    pj_mutex_unlock(key->mutex); 
    10481012     
    10491013    return PJ_EINVALIDOP; 
Note: See TracChangeset for help on using the changeset viewer.