Ignore:
Timestamp:
Feb 13, 2008 3:17:28 PM (17 years ago)
Author:
bennylp
Message:

Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c

    r1405 r1789  
    115115    enum handle_type    hnd_type; 
    116116    pj_ioqueue_callback cb; 
     117    pj_bool_t           allow_concurrent; 
    117118 
    118119#if PJ_HAS_TCP 
     
    124125    pj_bool_t           closing; 
    125126    pj_time_val         free_time; 
     127    pj_mutex_t         *mutex; 
    126128#endif 
    127129 
     
    136138    pj_lock_t        *lock; 
    137139    pj_bool_t         auto_delete_lock; 
     140    pj_bool_t         default_concurrency; 
    138141 
    139142#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     
    152155#endif 
    153156}; 
     157 
     158 
     159#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     160/* Prototype */ 
     161static void scan_closing_keys(pj_ioqueue_t *ioqueue); 
     162#endif 
    154163 
    155164 
     
    316325 
    317326    /* Create IOCP mutex */ 
    318     rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); 
     327    rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock); 
    319328    if (rc != PJ_SUCCESS) { 
    320329        CloseHandle(ioqueue->iocp); 
     
    323332 
    324333    ioqueue->auto_delete_lock = PJ_TRUE; 
     334    ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY; 
    325335 
    326336#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     
    345355            while (key != &ioqueue->free_list) { 
    346356                pj_atomic_destroy(key->ref_count); 
     357                pj_mutex_destroy(key->mutex); 
    347358                key = key->next; 
    348359            } 
     
    351362        } 
    352363 
     364        rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex); 
     365        if (rc != PJ_SUCCESS) { 
     366            pj_atomic_destroy(key->ref_count); 
     367            key = ioqueue->free_list.next; 
     368            while (key != &ioqueue->free_list) { 
     369                pj_atomic_destroy(key->ref_count); 
     370                pj_mutex_destroy(key->mutex); 
     371                key = key->next; 
     372            } 
     373            CloseHandle(ioqueue->iocp); 
     374            return rc; 
     375        } 
     376 
    353377        pj_list_push_back(&ioqueue->free_list, key); 
    354  
    355378    } 
    356379#endif 
     
    393416    while (key != &ioqueue->active_list) { 
    394417        pj_atomic_destroy(key->ref_count); 
     418        pj_mutex_destroy(key->mutex); 
    395419        key = key->next; 
    396420    } 
     
    399423    while (key != &ioqueue->closing_list) { 
    400424        pj_atomic_destroy(key->ref_count); 
     425        pj_mutex_destroy(key->mutex); 
    401426        key = key->next; 
    402427    } 
     
    405430    while (key != &ioqueue->free_list) { 
    406431        pj_atomic_destroy(key->ref_count); 
     432        pj_mutex_destroy(key->mutex); 
    407433        key = key->next; 
    408434    } 
     
    412438        pj_lock_destroy(ioqueue->lock); 
    413439 
     440    return PJ_SUCCESS; 
     441} 
     442 
     443 
     444PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, 
     445                                                       pj_bool_t allow) 
     446{ 
     447    PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); 
     448    ioqueue->default_concurrency = allow; 
    414449    return PJ_SUCCESS; 
    415450} 
     
    454489 
    455490#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     491    /* Scan closing list first to release unused keys. 
     492     * Must do this with lock acquired. 
     493     */ 
     494    scan_closing_keys(ioqueue); 
     495 
    456496    /* If safe unregistration is used, then get the key record from 
    457497     * the free list. 
     
    482522    pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 
    483523 
     524    /* Set concurrency for this handle */ 
     525    rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency); 
     526    if (rc != PJ_SUCCESS) { 
     527        pj_lock_release(ioqueue->lock); 
     528        return rc; 
     529    } 
     530 
    484531#if PJ_HAS_TCP 
    485532    rec->connecting = 0; 
     
    586633     */ 
    587634    if (pOv) { 
     635        pj_bool_t has_lock; 
     636 
    588637        /* Event was dequeued for either successfull or failed I/O */ 
    589638        key = (pj_ioqueue_key_t*)dwKey; 
     
    601650            return PJ_TRUE; 
    602651 
     652        /* If concurrency is disabled, lock the key  
     653         * (and save the lock status to local var since app may change 
     654         * concurrency setting while in the callback) */ 
     655        if (key->allow_concurrent == PJ_FALSE) { 
     656            pj_mutex_lock(key->mutex); 
     657            has_lock = PJ_TRUE; 
     658        } else { 
     659            has_lock = PJ_FALSE; 
     660        } 
     661 
     662        /* Now that we get the lock, check again that key is not closing */ 
     663        if (key->closing) { 
     664            if (has_lock) { 
     665                pj_mutex_unlock(key->mutex); 
     666            } 
     667            return PJ_TRUE; 
     668        } 
     669 
    603670        /* Increment reference counter to prevent this key from being 
    604671         * deleted 
    605672         */ 
    606673        pj_atomic_inc(key->ref_count); 
     674#else 
     675        PJ_UNUSED_ARG(has_lock); 
    607676#endif 
    608677 
     
    655724#if PJ_IOQUEUE_HAS_SAFE_UNREG 
    656725        decrement_counter(key); 
     726        if (has_lock) 
     727            pj_mutex_unlock(key->mutex); 
    657728#endif 
    658729 
     
    670741{ 
    671742    unsigned i; 
     743    pj_bool_t has_lock; 
    672744    enum { RETRY = 10 }; 
    673745 
     
    697769    /* Mark key as closing before closing handle. */ 
    698770    key->closing = 1; 
     771 
     772    /* If concurrency is disabled, wait until the key has finished 
     773     * processing the callback 
     774     */ 
     775    if (key->allow_concurrent == PJ_FALSE) { 
     776        pj_mutex_lock(key->mutex); 
     777        has_lock = PJ_TRUE; 
     778    } else { 
     779        has_lock = PJ_FALSE; 
     780    } 
     781#else 
     782    PJ_UNUSED_ARG(has_lock); 
    699783#endif 
    700784     
     
    718802     * Forcing context switch seems to have fixed that, but this is quite 
    719803     * an ugly solution.. 
     804     * 
     805     * Update 2008/02/13: 
     806     *  This should not happen if concurrency is disallowed for the key. 
     807     *  So at least application has a solution for this (i.e. by disallowing 
     808     *  concurrency in the key). 
    720809     */ 
    721810    //This will loop forever if unregistration is done on the callback. 
     
    729818    /* Decrement reference counter to destroy the key. */ 
    730819    decrement_counter(key); 
     820 
     821    if (has_lock) 
     822        pj_mutex_unlock(key->mutex); 
    731823#endif 
    732824 
    733825    return PJ_SUCCESS; 
    734826} 
     827 
     828#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     829/* Scan the closing list, and put pending closing keys to free list. 
     830 * Must do this with ioqueue mutex held. 
     831 */ 
     832static void scan_closing_keys(pj_ioqueue_t *ioqueue) 
     833{ 
     834    if (!pj_list_empty(&ioqueue->closing_list)) { 
     835        pj_time_val now; 
     836        pj_ioqueue_key_t *key; 
     837 
     838        pj_gettimeofday(&now); 
     839         
     840        /* Move closing keys to free list when they've finished the closing 
     841         * idle time. 
     842         */ 
     843        key = ioqueue->closing_list.next; 
     844        while (key != &ioqueue->closing_list) { 
     845            pj_ioqueue_key_t *next = key->next; 
     846 
     847            pj_assert(key->closing != 0); 
     848 
     849            if (PJ_TIME_VAL_GTE(now, key->free_time)) { 
     850                pj_list_erase(key); 
     851                pj_list_push_back(&ioqueue->free_list, key); 
     852            } 
     853            key = next; 
     854        } 
     855    } 
     856} 
     857#endif 
    735858 
    736859/* 
     
    767890    /* Check the closing keys only when there's no activity and when there are 
    768891     * pending closing keys. 
    769      * blp: 
    770      *  no, always check the list. Otherwise on busy activity, this will cause 
    771      *  ioqueue to reject new registration. 
    772      */ 
    773     if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) { 
    774         pj_time_val now; 
    775         pj_ioqueue_key_t *key; 
    776  
    777         pj_gettimeofday(&now); 
    778          
    779         /* Move closing keys to free list when they've finished the closing 
    780          * idle time. 
    781          */ 
     892     */ 
     893    if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 
    782894        pj_lock_acquire(ioqueue->lock); 
    783         key = ioqueue->closing_list.next; 
    784         while (key != &ioqueue->closing_list) { 
    785             pj_ioqueue_key_t *next = key->next; 
    786  
    787             pj_assert(key->closing != 0); 
    788  
    789             if (PJ_TIME_VAL_GTE(now, key->free_time)) { 
    790                 pj_list_erase(key); 
    791                 pj_list_push_back(&ioqueue->free_list, key); 
    792             } 
    793             key = next; 
    794         } 
     895        scan_closing_keys(ioqueue); 
    795896        pj_lock_release(ioqueue->lock); 
    796897    } 
     
    12691370} 
    12701371 
     1372PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 
     1373                                               pj_bool_t allow) 
     1374{ 
     1375    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
     1376 
     1377    /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is 
     1378     * disabled. 
     1379     */ 
     1380    PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); 
     1381 
     1382    key->allow_concurrent = allow; 
     1383    return PJ_SUCCESS; 
     1384} 
     1385 
     1386PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 
     1387{ 
     1388#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     1389    return pj_mutex_lock(key->mutex); 
     1390#else 
     1391    PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); 
     1392#endif 
     1393} 
     1394 
     1395PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 
     1396{ 
     1397#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     1398    return pj_mutex_unlock(key->mutex); 
     1399#else 
     1400    PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); 
     1401#endif 
     1402} 
     1403 
Note: See TracChangeset for help on using the changeset viewer.