Ignore:
Timestamp:
Mar 30, 2006 4:32:18 PM (18 years ago)
Author:
bennylp
Message:

Fixed race condition bug in ioqueue unregistration for select and Win32 IOCP backend

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_select.c

    r126 r365  
    110110 
    111111    unsigned            max, count; 
    112     pj_ioqueue_key_t    key_list; 
     112    pj_ioqueue_key_t    active_list; 
    113113    pj_fd_set_t         rfdset; 
    114114    pj_fd_set_t         wfdset; 
     
    116116    pj_fd_set_t         xfdset; 
    117117#endif 
     118 
     119#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     120    pj_mutex_t         *ref_cnt_mutex; 
     121    pj_ioqueue_key_t    closing_list; 
     122    pj_ioqueue_key_t    free_list; 
     123#endif 
    118124}; 
    119125 
     
    142148    pj_ioqueue_t *ioqueue; 
    143149    pj_lock_t *lock; 
     150    unsigned i; 
    144151    pj_status_t rc; 
    145152 
     
    153160                     sizeof(union operation_key), PJ_EBUG); 
    154161 
     162    /* Create and init common ioqueue stuffs */ 
    155163    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 
    156  
    157164    ioqueue_init(ioqueue); 
    158165 
     
    164171    PJ_FD_ZERO(&ioqueue->xfdset); 
    165172#endif 
    166     pj_list_init(&ioqueue->key_list); 
    167  
     173    pj_list_init(&ioqueue->active_list); 
     174 
     175#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     176    /* When safe unregistration is used (the default), we pre-create 
     177     * all keys and put them in the free list. 
     178     */ 
     179 
     180    /* Mutex to protect key's reference counter  
     181     * We don't want to use key's mutex or ioqueue's mutex because 
     182     * that would create deadlock situation in some cases. 
     183     */ 
     184    rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); 
     185    if (rc != PJ_SUCCESS) 
     186        return rc; 
     187 
     188 
     189    /* Init key list */ 
     190    pj_list_init(&ioqueue->free_list); 
     191    pj_list_init(&ioqueue->closing_list); 
     192 
     193 
     194    /* Pre-create all keys according to max_fd */ 
     195    for (i=0; i<max_fd; ++i) { 
     196        pj_ioqueue_key_t *key; 
     197 
     198        key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 
     199        key->ref_count = 0; 
     200        rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 
     201        if (rc != PJ_SUCCESS) { 
     202            key = ioqueue->free_list.next; 
     203            while (key != &ioqueue->free_list) { 
     204                pj_mutex_destroy(key->mutex); 
     205                key = key->next; 
     206            } 
     207            pj_mutex_destroy(ioqueue->ref_cnt_mutex); 
     208            return rc; 
     209        } 
     210 
     211        pj_list_push_back(&ioqueue->free_list, key); 
     212    } 
     213#endif 
     214 
     215    /* Create and init ioqueue mutex */ 
    168216    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); 
    169217    if (rc != PJ_SUCCESS) 
     
    187235PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 
    188236{ 
     237    pj_ioqueue_key_t *key; 
     238 
    189239    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
    190240 
    191241    pj_lock_acquire(ioqueue->lock); 
     242 
     243#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     244    /* Destroy reference counters */ 
     245    key = ioqueue->active_list.next; 
     246    while (key != &ioqueue->active_list) { 
     247        pj_mutex_destroy(key->mutex); 
     248        key = key->next; 
     249    } 
     250 
     251    key = ioqueue->closing_list.next; 
     252    while (key != &ioqueue->closing_list) { 
     253        pj_mutex_destroy(key->mutex); 
     254        key = key->next; 
     255    } 
     256 
     257    key = ioqueue->free_list.next; 
     258    while (key != &ioqueue->free_list) { 
     259        pj_mutex_destroy(key->mutex); 
     260        key = key->next; 
     261    } 
     262 
     263    pj_mutex_destroy(ioqueue->ref_cnt_mutex); 
     264#endif 
     265 
    192266    return ioqueue_destroy(ioqueue); 
    193267} 
     
    197271 * pj_ioqueue_register_sock() 
    198272 * 
    199  * Register a handle to ioqueue. 
     273 * Register socket handle to ioqueue. 
    200274 */ 
    201275PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 
     
    217291    if (ioqueue->count >= ioqueue->max) { 
    218292        rc = PJ_ETOOMANY; 
     293        goto on_return; 
     294    } 
     295 
     296    /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get 
     297     * the key from the free list. Otherwise allocate a new one.  
     298     */ 
     299#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     300    pj_assert(!pj_list_empty(&ioqueue->free_list)); 
     301    if (pj_list_empty(&ioqueue->free_list)) { 
     302        rc = PJ_ETOOMANY; 
     303        goto on_return; 
     304    } 
     305 
     306    key = ioqueue->free_list.next; 
     307    pj_list_erase(key); 
     308#else 
     309    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
     310#endif 
     311 
     312    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 
     313    if (rc != PJ_SUCCESS) { 
     314        key = NULL; 
    219315        goto on_return; 
    220316    } 
     
    232328    } 
    233329 
    234     /* Create key. */ 
    235     key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
    236     rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 
    237     if (rc != PJ_SUCCESS) { 
    238         key = NULL; 
    239         goto on_return; 
    240     } 
    241  
    242     /* Register */ 
    243     pj_list_insert_before(&ioqueue->key_list, key); 
     330 
     331    /* Put in active list. */ 
     332    pj_list_insert_before(&ioqueue->active_list, key); 
    244333    ++ioqueue->count; 
    245334 
     
    252341} 
    253342 
     343#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     344/* Increment key's reference counter */ 
     345static void increment_counter(pj_ioqueue_key_t *key) 
     346{ 
     347    pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 
     348    ++key->ref_count; 
     349    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 
     350} 
     351 
     352/* Decrement the key's reference counter, and when the counter reach zero, 
     353 * destroy the key. 
     354 * 
     355 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. 
     356 */ 
     357static void decrement_counter(pj_ioqueue_key_t *key) 
     358{ 
     359    pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 
     360    --key->ref_count; 
     361    if (key->ref_count == 0) { 
     362 
     363        pj_assert(key->closing == 1); 
     364        pj_gettimeofday(&key->free_time); 
     365        key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 
     366        pj_time_val_normalize(&key->free_time); 
     367 
     368        pj_lock_acquire(key->ioqueue->lock); 
     369        pj_list_erase(key); 
     370        pj_list_push_back(&key->ioqueue->closing_list, key); 
     371        pj_lock_release(key->ioqueue->lock); 
     372    } 
     373    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 
     374} 
     375#endif 
     376 
     377 
    254378/* 
    255379 * pj_ioqueue_unregister() 
     
    265389    ioqueue = key->ioqueue; 
    266390 
     391    /* Lock the key to make sure no callback is simultaneously modifying 
     392     * the key. We need to lock the key before ioqueue here to prevent 
     393     * deadlock. 
     394     */ 
     395    pj_mutex_lock(key->mutex); 
     396 
     397    /* Also lock ioqueue */ 
    267398    pj_lock_acquire(ioqueue->lock); 
    268399 
     
    276407#endif 
    277408 
    278     /* ioqueue_destroy may try to acquire key's mutex. 
    279      * Since normally the order of locking is to lock key's mutex first 
    280      * then ioqueue's mutex, ioqueue_destroy may deadlock unless we 
    281      * release ioqueue's mutex first. 
     409    /* Close socket. */ 
     410    pj_sock_close(key->fd); 
     411 
     412    /* Clear callback */ 
     413    key->cb.on_accept_complete = NULL; 
     414    key->cb.on_connect_complete = NULL; 
     415    key->cb.on_read_complete = NULL; 
     416    key->cb.on_write_complete = NULL; 
     417 
     418    /* Must release ioqueue lock first before decrementing counter, to 
     419     * prevent deadlock. 
    282420     */ 
    283421    pj_lock_release(ioqueue->lock); 
    284422 
    285     /* Destroy the key. */ 
    286     ioqueue_destroy_key(key); 
     423#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     424    /* Mark key is closing. */ 
     425    key->closing = 1; 
     426 
     427    /* Decrement counter. */ 
     428    decrement_counter(key); 
     429 
     430    /* Done. */ 
     431    pj_mutex_unlock(key->mutex); 
     432#else 
     433    pj_mutex_destroy(key->mutex); 
     434#endif 
    287435 
    288436    return PJ_SUCCESS; 
     
    309457    pj_assert(0); 
    310458 
    311     key = ioqueue->key_list.next; 
    312     while (key != &ioqueue->key_list) { 
     459    key = ioqueue->active_list.next; 
     460    while (key != &ioqueue->active_list) { 
    313461        if (!pj_list_empty(&key->read_list) 
    314462#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 
     
    396544} 
    397545 
     546#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     547/* Scan closing keys to be put to free list again */ 
     548static void scan_closing_keys(pj_ioqueue_t *ioqueue) 
     549{ 
     550    pj_time_val now; 
     551    pj_ioqueue_key_t *h; 
     552 
     553    pj_gettimeofday(&now); 
     554    h = ioqueue->closing_list.next; 
     555    while (h != &ioqueue->closing_list) { 
     556        pj_ioqueue_key_t *next = h->next; 
     557 
     558        pj_assert(h->closing != 0); 
     559 
     560        if (PJ_TIME_VAL_GTE(now, h->free_time)) { 
     561            pj_list_erase(h); 
     562            pj_list_push_back(&ioqueue->free_list, h); 
     563        } 
     564        h = next; 
     565    } 
     566} 
     567#endif 
     568 
     569 
    398570/* 
    399571 * pj_ioqueue_poll() 
     
    436608        PJ_FD_COUNT(&ioqueue->xfdset)==0) 
    437609    { 
    438         pj_lock_release(ioqueue->lock); 
     610#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     611        scan_closing_keys(ioqueue); 
     612#endif 
     613        pj_lock_release(ioqueue->lock); 
    439614        if (timeout) 
    440615            pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); 
     
    476651     * coming with accept(). 
    477652     */ 
    478     h = ioqueue->key_list.next; 
    479     for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { 
     653    h = ioqueue->active_list.next; 
     654    for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) { 
     655 
    480656        if ( (key_has_pending_write(h) || key_has_pending_connect(h)) 
    481              && PJ_FD_ISSET(h->fd, &wfdset)) 
     657             && PJ_FD_ISSET(h->fd, &wfdset) && !h->closing) 
    482658        { 
     659#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     660            increment_counter(h); 
     661#endif 
    483662            event[counter].key = h; 
    484663            event[counter].event_type = WRITEABLE_EVENT; 
     
    488667        /* Scan for readable socket. */ 
    489668        if ((key_has_pending_read(h) || key_has_pending_accept(h)) 
    490             && PJ_FD_ISSET(h->fd, &rfdset)) 
     669            && PJ_FD_ISSET(h->fd, &rfdset) && !h->closing) 
    491670        { 
     671#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     672            increment_counter(h); 
     673#endif 
    492674            event[counter].key = h; 
    493675            event[counter].event_type = READABLE_EVENT; 
     
    496678 
    497679#if PJ_HAS_TCP 
    498         if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { 
     680        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && 
     681            !h->closing)  
     682        { 
     683#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     684            increment_counter(h); 
     685#endif 
    499686            event[counter].key = h; 
    500687            event[counter].event_type = EXCEPTION_EVENT; 
     
    526713            break; 
    527714        } 
    528     } 
     715 
     716#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     717        decrement_counter(event[counter].key); 
     718#endif 
     719    } 
     720 
    529721 
    530722    return count; 
Note: See TracChangeset for help on using the changeset viewer.