Ignore:
Timestamp:
Mar 30, 2006 4:32:18 PM (18 years ago)
Author:
bennylp
Message:

Fixed race condition bug in ioqueue unregistration for select and Win32 IOCP backend

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c

    r349 r365  
    107107struct pj_ioqueue_key_t 
    108108{ 
     109    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); 
     110 
    109111    pj_ioqueue_t       *ioqueue; 
    110112    HANDLE              hnd; 
    111113    void               *user_data; 
    112114    enum handle_type    hnd_type; 
     115    pj_ioqueue_callback cb; 
     116 
    113117#if PJ_HAS_TCP 
    114118    int                 connecting; 
    115119#endif 
    116     pj_ioqueue_callback cb; 
    117     pj_bool_t           has_quit_signal; 
     120 
     121#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     122    pj_atomic_t        *ref_count; 
     123    pj_bool_t           closing; 
     124    pj_time_val         free_time; 
     125#endif 
     126 
    118127}; 
    119128 
     
    126135    pj_lock_t        *lock; 
    127136    pj_bool_t         auto_delete_lock; 
     137 
     138#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     139    pj_ioqueue_key_t  active_list; 
     140    pj_ioqueue_key_t  free_list; 
     141    pj_ioqueue_key_t  closing_list; 
     142#endif 
     143 
     144    /* These are to keep track of connecting sockets */ 
     145#if PJ_HAS_TCP 
    128146    unsigned          event_count; 
    129147    HANDLE            event_pool[MAXIMUM_WAIT_OBJECTS+1]; 
    130 #if PJ_HAS_TCP 
    131148    unsigned          connecting_count; 
    132149    HANDLE            connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; 
     
    280297{ 
    281298    pj_ioqueue_t *ioqueue; 
     299    unsigned i; 
    282300    pj_status_t rc; 
    283301 
     
    291309                     sizeof(union operation_key), PJ_EBUG); 
    292310 
     311    /* Create IOCP */ 
    293312    ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); 
    294313    ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 
     
    296315        return PJ_RETURN_OS_ERROR(GetLastError()); 
    297316 
     317    /* Create IOCP mutex */ 
    298318    rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); 
    299319    if (rc != PJ_SUCCESS) { 
     
    304324    ioqueue->auto_delete_lock = PJ_TRUE; 
    305325 
     326#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     327    /* 
     328     * Create and initialize key pools. 
     329     */ 
     330    pj_list_init(&ioqueue->active_list); 
     331    pj_list_init(&ioqueue->free_list); 
     332    pj_list_init(&ioqueue->closing_list); 
     333 
     334    /* Preallocate keys according to max_fd setting, and put them 
     335     * in free_list. 
     336     */ 
     337    for (i=0; i<max_fd; ++i) { 
     338        pj_ioqueue_key_t *key; 
     339 
     340        key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 
     341 
     342        rc = pj_atomic_create(pool, 0, &key->ref_count); 
     343        if (rc != PJ_SUCCESS) { 
     344            key = ioqueue->free_list.next; 
     345            while (key != &ioqueue->free_list) { 
     346                pj_atomic_destroy(key->ref_count); 
     347                key = key->next; 
     348            } 
     349            CloseHandle(ioqueue->iocp); 
     350            return rc; 
     351        } 
     352 
     353        pj_list_push_back(&ioqueue->free_list, key); 
     354 
     355    } 
     356#endif 
     357 
    306358    *p_ioqueue = ioqueue; 
    307359 
     
    316368{ 
    317369    unsigned i; 
     370    pj_ioqueue_key_t *key; 
    318371 
    319372    PJ_CHECK_STACK(); 
    320373    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
    321374 
     375    pj_lock_acquire(ioqueue->lock); 
     376 
     377#if PJ_HAS_TCP 
    322378    /* Destroy events in the pool */ 
    323379    for (i=0; i<ioqueue->event_count; ++i) { 
     
    325381    } 
    326382    ioqueue->event_count = 0; 
     383#endif 
    327384 
    328385    if (CloseHandle(ioqueue->iocp) != TRUE) 
    329386        return PJ_RETURN_OS_ERROR(GetLastError()); 
     387 
     388#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     389    /* Destroy reference counters */ 
     390    key = ioqueue->active_list.next; 
     391    while (key != &ioqueue->active_list) { 
     392        pj_atomic_destroy(key->ref_count); 
     393        key = key->next; 
     394    } 
     395 
     396    key = ioqueue->closing_list.next; 
     397    while (key != &ioqueue->closing_list) { 
     398        pj_atomic_destroy(key->ref_count); 
     399        key = key->next; 
     400    } 
     401 
     402    key = ioqueue->free_list.next; 
     403    while (key != &ioqueue->free_list) { 
     404        pj_atomic_destroy(key->ref_count); 
     405        key = key->next; 
     406    } 
     407#endif 
    330408 
    331409    if (ioqueue->auto_delete_lock) 
     
    371449    PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); 
    372450 
     451    pj_lock_acquire(ioqueue->lock); 
     452 
     453#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     454    /* If safe unregistration is used, then get the key record from 
     455     * the free list. 
     456     */ 
     457    if (pj_list_empty(&ioqueue->free_list)) { 
     458        pj_lock_release(ioqueue->lock); 
     459        return PJ_ETOOMANY; 
     460    } 
     461 
     462    rec = ioqueue->free_list.next; 
     463    pj_list_erase(rec); 
     464 
     465    /* Set initial reference count to 1 */ 
     466    pj_assert(pj_atomic_get(rec->ref_count) == 0); 
     467    pj_atomic_inc(rec->ref_count); 
     468 
     469    rec->closing = 0; 
     470 
     471#else 
     472    rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
     473#endif 
     474 
    373475    /* Build the key for this socket. */ 
    374     rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
    375476    rec->ioqueue = ioqueue; 
    376477    rec->hnd = (HANDLE)sock; 
     
    379480    pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 
    380481 
     482#if PJ_HAS_TCP 
     483    rec->connecting = 0; 
     484#endif 
     485 
    381486    /* Set socket to nonblocking. */ 
    382487    value = 1; 
    383488    rc = ioctlsocket(sock, FIONBIO, &value); 
    384489    if (rc != 0) { 
     490        pj_lock_release(ioqueue->lock); 
    385491        return PJ_RETURN_OS_ERROR(WSAGetLastError()); 
    386492    } 
     
    389495    hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); 
    390496    if (!hioq) { 
     497        pj_lock_release(ioqueue->lock); 
    391498        return PJ_RETURN_OS_ERROR(GetLastError()); 
    392499    } 
    393500 
    394501    *key = rec; 
     502 
     503#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     504    pj_list_push_back(&ioqueue->active_list, rec); 
     505#endif 
     506 
     507    pj_lock_release(ioqueue->lock); 
     508 
    395509    return PJ_SUCCESS; 
    396510} 
     
    423537 
    424538 
    425  
    426 /* 
    427  * Internal function to poll the I/O Completion Port, execute callback,  
     539#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     540/* Decrement the key's reference counter, and when the counter reach zero, 
     541 * destroy the key. 
     542 */ 
     543static void decrement_counter(pj_ioqueue_key_t *key) 
     544{ 
     545    if (pj_atomic_dec_and_get(key->ref_count) == 0) { 
     546 
     547        pj_lock_acquire(key->ioqueue->lock); 
     548 
     549        pj_assert(key->closing == 1); 
     550        pj_gettimeofday(&key->free_time); 
     551        key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 
     552        pj_time_val_normalize(&key->free_time); 
     553 
     554        pj_list_erase(key); 
     555        pj_list_push_back(&key->ioqueue->closing_list, key); 
     556 
     557        pj_lock_release(key->ioqueue->lock); 
     558    } 
     559} 
     560#endif 
     561 
     562/* 
     563 * Poll the I/O Completion Port, execute callback,  
    428564 * and return the key and bytes transfered of the last operation. 
    429565 */ 
     
    458594            *p_key = key; 
    459595 
    460         /* If size_status is POST_QUIT_LEN, mark the key as quitting */ 
    461         if (size_status == POST_QUIT_LEN) { 
    462             key->has_quit_signal = 1; 
     596#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     597        /* We shouldn't call callbacks if key is quitting. */ 
     598        if (key->closing) 
    463599            return PJ_TRUE; 
    464         } 
    465  
    466         /* We shouldn't call callbacks if key is quitting.  
    467          * But this should have been taken care by unregister function 
    468          * (the unregister function should have cleared out the callbacks) 
     600 
     601        /* Increment reference counter to prevent this key from being 
     602         * deleted 
    469603         */ 
     604        pj_atomic_inc(key->ref_count); 
     605#endif 
    470606 
    471607        /* Carry out the callback */ 
     
    505641            break; 
    506642        } 
     643 
     644#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     645        decrement_counter(key); 
     646#endif 
     647 
    507648        return PJ_TRUE; 
    508649    } 
     
    517658PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 
    518659{ 
    519     pj_ssize_t polled_len; 
    520     pj_ioqueue_key_t *polled_key; 
    521     generic_overlapped ov; 
    522     BOOL rc; 
    523  
    524660    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
    525661 
     
    543679    } 
    544680#endif 
    545  
    546  
    547     /* Unregistering handle from IOCP is pretty tricky. 
    548      * 
    549      * Even after the socket has been closed, GetQueuedCompletionStatus 
    550      * may still return events for the handle. This will likely to 
    551      * cause crash in pjlib, because the key associated with the handle 
    552      * most likely will have been destroyed. 
    553      * 
    554      * The solution is to poll the IOCP until we're sure that there are 
    555      * no further events for the handle. 
    556      */ 
    557  
    558     /* Clear up callbacks for the key.  
    559      * We don't want the callback to be called for this key. 
    560      */ 
     681     
     682    /* Close handle (the only way to disassociate handle from IOCP).  
     683     * We also need to close handle to make sure that no further events 
     684     * will come to the handle. 
     685     */ 
     686    CloseHandle(key->hnd); 
     687 
     688    /* Reset callbacks */ 
     689    key->cb.on_accept_complete = NULL; 
     690    key->cb.on_connect_complete = NULL; 
    561691    key->cb.on_read_complete = NULL; 
    562692    key->cb.on_write_complete = NULL; 
    563     key->cb.on_accept_complete = NULL; 
    564     key->cb.on_connect_complete = NULL; 
    565  
    566     /* Init overlapped struct */ 
    567     pj_memset(&ov, 0, sizeof(ov)); 
    568     ov.operation = PJ_IOQUEUE_OP_READ; 
    569  
    570     /* Post queued completion status with a special length. */ 
    571     rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN, 
    572                                      (DWORD)key, &ov.overlapped); 
    573  
    574     /* Poll IOCP until has_quit_signal is set in the key. 
    575      * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN 
    576      * is detected. We need to have this flag because POST_QUIT_LEN may be 
    577      * detected by other threads. 
    578      */ 
    579     do { 
    580         polled_len = 0; 
    581         polled_key = NULL; 
    582  
    583         rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key); 
    584  
    585     } while (rc && !key->has_quit_signal); 
    586  
    587  
    588     /* Close handle if this is a file. */ 
    589     if (key->hnd_type == HND_IS_FILE) { 
    590         CloseHandle(key->hnd); 
    591     } 
     693 
     694#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     695    /* Mark key as closing. */ 
     696    key->closing = 1; 
     697 
     698    /* Decrement reference counter. */ 
     699    decrement_counter(key); 
     700 
     701    /* Even after handle is closed, I suspect that IOCP may still try to 
     702     * do something with the handle, causing memory corruption when pool 
     703     * debugging is enabled. 
     704     * 
     705     * Forcing context switch seems to have fixed that, but this is quite 
     706     * an ugly solution.. 
     707     */ 
     708    pj_thread_sleep(0); 
     709#endif 
    592710 
    593711    return PJ_SUCCESS; 
     
    603721    DWORD dwMsec; 
    604722    int connect_count = 0; 
    605     pj_bool_t has_event; 
     723    int event_count = 0; 
    606724 
    607725    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 
    608  
    609     /* Check the connecting array. */ 
    610 #if PJ_HAS_TCP 
    611     connect_count = check_connecting(ioqueue); 
    612 #endif 
    613726 
    614727    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ 
     
    616729 
    617730    /* Poll for completion status. */ 
    618     has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 
     731    event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 
     732 
     733#if PJ_HAS_TCP 
     734    /* Check the connecting array, only when there's no activity. */ 
     735    if (event_count == 0) { 
     736        connect_count = check_connecting(ioqueue); 
     737        if (connect_count > 0) 
     738            event_count += connect_count; 
     739    } 
     740#endif 
     741 
     742#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     743    /* Check the closing keys only when there's no activity and when there are 
     744     * pending closing keys. 
     745     */ 
     746    if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 
     747        pj_time_val now; 
     748        pj_ioqueue_key_t *key; 
     749 
     750        pj_gettimeofday(&now); 
     751         
     752        /* Move closing keys to free list when they've finished the closing 
     753         * idle time. 
     754         */ 
     755        pj_lock_acquire(ioqueue->lock); 
     756        key = ioqueue->closing_list.next; 
     757        while (key != &ioqueue->closing_list) { 
     758            pj_ioqueue_key_t *next = key->next; 
     759 
     760            pj_assert(key->closing != 0); 
     761 
     762            if (PJ_TIME_VAL_GTE(now, key->free_time)) { 
     763                pj_list_erase(key); 
     764                pj_list_push_back(&ioqueue->free_list, key); 
     765            } 
     766            key = next; 
     767        } 
     768        pj_lock_release(ioqueue->lock); 
     769    } 
     770#endif 
    619771 
    620772    /* Return number of events. */ 
    621     return connect_count + has_event; 
     773    return event_count; 
    622774} 
    623775 
     
    645797    PJ_CHECK_STACK(); 
    646798    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 
     799 
     800#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     801    /* Check key is not closing */ 
     802    if (key->closing) 
     803        return PJ_ECANCELLED; 
     804#endif 
    647805 
    648806    op_key_rec = (union operation_key*)op_key->internal__; 
     
    716874    PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); 
    717875 
     876#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     877    /* Check key is not closing */ 
     878    if (key->closing) 
     879        return PJ_ECANCELLED; 
     880#endif 
     881 
    718882    op_key_rec = (union operation_key*)op_key->internal__; 
    719883    op_key_rec->overlapped.wsabuf.buf = buffer; 
     
    800964    PJ_CHECK_STACK(); 
    801965    PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); 
    802      
     966 
     967#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     968    /* Check key is not closing */ 
     969    if (key->closing) 
     970        return PJ_ECANCELLED; 
     971#endif 
     972 
    803973    op_key_rec = (union operation_key*)op_key->internal__; 
    804974 
     
    8731043    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 
    8741044 
     1045#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     1046    /* Check key is not closing */ 
     1047    if (key->closing) 
     1048        return PJ_ECANCELLED; 
     1049#endif 
     1050 
    8751051    /* 
    8761052     * See if there is a new connection immediately available. 
     
    9621138    PJ_CHECK_STACK(); 
    9631139    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 
     1140 
     1141#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     1142    /* Check key is not closing */ 
     1143    if (key->closing) 
     1144        return PJ_ECANCELLED; 
     1145#endif 
    9641146 
    9651147    /* Initiate connect() */ 
Note: See TracChangeset for help on using the changeset viewer.