Changeset 1789


Ignore:
Timestamp:
Feb 13, 2008 3:17:28 PM (17 years ago)
Author:
bennylp
Message:

Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)

Location:
pjproject/trunk/pjlib
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/include/pj/config.h

    r1788 r1789  
    286286#   undef PJ_EXCEPTION_USE_WIN32_SEH 
    287287#   undef PJ_HAS_ERROR_STRING 
     288 
     289#   define PJ_HAS_IPV6  1 
    288290#endif 
    289291 
     
    510512#ifndef PJ_IOQUEUE_HAS_SAFE_UNREG 
    511513#   define PJ_IOQUEUE_HAS_SAFE_UNREG    1 
     514#endif 
     515 
     516 
     517/** 
     518 * Default concurrency setting for sockets/handles registered to ioqueue. 
     519 * This controls whether the ioqueue is allowed to call the key's callback 
     520 * concurrently/in parallel. The default is yes, which means that if there 
     521 * are more than one pending operations complete simultaneously, more 
     522 * than one threads may call the key's callback at the same time. This 
     523 * generally would promote good scalability for application, at the  
     524 * expense of more complexity to manage the concurrent accesses. 
     525 * 
     526 * Please see the ioqueue documentation for more info. 
     527 */ 
     528#ifndef PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY 
     529#   define PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY   1 
     530#endif 
     531 
     532 
     533/* Sanity check: 
     534 *  if ioqueue concurrency is disallowed, PJ_IOQUEUE_HAS_SAFE_UNREG 
     535 *  must be enabled. 
     536 */ 
     537#if (PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY==0) && (PJ_IOQUEUE_HAS_SAFE_UNREG==0) 
     538#   error PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if ioqueue concurrency \ 
     539          is disabled 
    512540#endif 
    513541 
  • pjproject/trunk/pjlib/include/pj/ioqueue.h

    r1748 r1789  
    102102 * \section pj_ioqueue_concurrency_sec Concurrency Rules 
    103103 * 
    104  * The items below describe rules that must be obeyed when using the I/O  
    105  * queue, with regard to concurrency: 
    106  *  - simultaneous operations (by different threads) to different key is safe. 
    107  *  - simultaneous operations to the same key is also safe, except 
    108  *    <b>unregistration</b>, which is described below. 
    109  *  - <b>care must be taken when unregistering a key</b> from the 
     104 * The ioqueue has been fine tuned to allow multiple threads to poll the 
     105 * handles simultaneously, to maximize scalability when the application is 
     106 * running on multiprocessor systems. When more than one threads are polling 
     107 * the ioqueue and there are more than one handles are signaled, more than 
     108 * one threads will execute the callback simultaneously to serve the events. 
     109 * These parallel executions are completely safe when the events happen for 
     110 * two different handles. 
     111 * 
     112 * However, with multithreading, care must be taken when multiple events  
     113 * happen on the same handle, or when event is happening on a handle (and  
     114 * the callback is being executed) and application is performing  
     115 * unregistration to the handle at the same time. 
     116 * 
     117 * The treatments of above scenario differ according to the concurrency 
     118 * setting that are applied to the handle. 
     119 * 
     120 * \subsection pj_ioq_concur_set Concurrency Settings for Handles 
     121 * 
     122 * Concurrency can be set on per handle (key) basis, by using 
     123 * #pj_ioqueue_set_concurrency() function. The default key concurrency value  
     124 * for the handle is inherited from the key concurrency setting of the ioqueue,  
     125 * and the key concurrency setting for the ioqueue can be changed by using 
     126 * #pj_ioqueue_set_default_concurrency(). The default key concurrency setting  
     127 * for ioqueue itself is controlled by compile time setting 
     128 * PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY. 
     129 * 
     130 * Note that this key concurrency setting only controls whether multiple 
     131 * threads are allowed to operate <b>on the same key</b> at the same time.  
     132 * The ioqueue itself always allows multiple threads to enter the ioqeuue at  
     133 * the same time, and also simultaneous callback calls to <b>differrent  
     134 * keys</b> is always allowed regardless to the key concurrency setting. 
     135 * 
     136 * \subsection pj_ioq_parallel Parallel Callback Executions for the Same Handle 
     137 * 
     138 * Note that when key concurrency is enabled (i.e. parallel callback calls on 
     139 * the same key is allowed; this is the default setting), the ioqueue will only 
     140 * perform simultaneous callback executions on the same key when the key has 
     141 * invoked multiple pending operations. This could be done for example by 
     142 * calling #pj_ioqueue_recvfrom() more than once on the same key, each with 
     143 * the same key but different operation key (pj_ioqueue_op_key_t). With this 
     144 * scenario, when multiple packets arrive on the key at the same time, more 
     145 * than one threads may execute the callback simultaneously, each with the 
     146 * same key but different operation key. 
     147 * 
     148 * When there is only one pending operation on the key (e.g. there is only one 
     149 * #pj_ioqueue_recvfrom() invoked on the key), then events occuring to the 
     150 * same key will be queued by the ioqueue, thus no simultaneous callback calls 
     151 * will be performed. 
     152 * 
     153 * \subsection pj_ioq_allow_concur Concurrency is Enabled (Default Value) 
     154 * 
     155 * The default setting for the ioqueue is to allow multiple threads to 
     156 * execute callbacks for the same handle/key. This setting is selected to 
     157 * promote good performance and scalability for application. 
     158 * 
     159 * However this setting has a major drawback with regard to synchronization, 
     160 * and application MUST carefully follow the following guidelines to ensure  
     161 * that parallel access to the key does not cause problems: 
     162 * 
     163 *  - Always note that callback may be called simultaneously for the same 
     164 *    key. 
     165 *  - <b>Care must be taken when unregistering a key</b> from the 
    110166 *    ioqueue. Application must take care that when one thread is issuing 
    111  *    an unregistration, other thread is not simultaneously invoking an 
    112  *    operation <b>to the same key</b>. 
     167 *    an unregistration, other thread is not simultaneously invoking the 
     168 *    callback <b>to the same key</b>. 
    113169 *\n 
    114170 *    This happens because the ioqueue functions are working with a pointer 
     
    116172 *    has been rendered invalid by other threads before the ioqueue has a 
    117173 *    chance to acquire mutex on it. 
     174 * 
     175 * \subsection pj_ioq_disallow_concur Concurrency is Disabled 
     176 * 
     177 * Alternatively, application may disable key concurrency to make  
     178 * synchronization easier. As noted above, there are three ways to control 
     179 * key concurrency setting: 
     180 *  - by controlling on per handle/key basis, with #pj_ioqueue_set_concurrency(). 
     181 *  - by changing default key concurrency setting on the ioqueue, with 
     182 *    #pj_ioqueue_set_default_concurrency(). 
     183 *  - by changing the default concurrency on compile time, by declaring 
     184 *    PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY macro to zero in your config_site.h 
    118185 * 
    119186 * \section pj_ioqeuue_examples_sec Examples 
     
    292359 
    293360/** 
     361 * Set default concurrency policy for this ioqueue. If this function is not 
     362 * called, the default concurrency policy for the ioqueue is controlled by  
     363 * compile time setting PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY. 
     364 * 
     365 * Note that changing the concurrency setting to the ioqueue will only affect 
     366 * subsequent key registrations. To modify the concurrency setting for 
     367 * individual key, use #pj_ioqueue_set_concurrency(). 
     368 * 
     369 * @param ioqueue       The ioqueue instance. 
     370 * @param allow         Non-zero to allow concurrent callback calls, or 
     371 *                      PJ_FALSE to disallow it. 
     372 * 
     373 * @return              PJ_SUCCESS on success or the appropriate error code. 
     374 */ 
     375PJ_DECL(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, 
     376                                                        pj_bool_t allow); 
     377 
     378/** 
    294379 * Register a socket to the I/O queue framework.  
    295380 * When a socket is registered to the IOQueue, it may be modified to use 
     
    367452                                               void **old_data); 
    368453 
     454/** 
     455 * Configure whether the ioqueue is allowed to call the key's callback 
     456 * concurrently/in parallel. The default concurrency setting for the key 
     457 * is controlled by ioqueue's default concurrency value, which can be 
     458 * changed by calling #pj_ioqueue_set_default_concurrency(). 
     459 * 
     460 * If concurrency is allowed for the key, it means that if there are more 
     461 * than one pending operations complete simultaneously, more than one 
     462 * threads may call the key's  callback at the same time. This generally 
     463 * would promote good scalability for application, at the expense of more 
     464 * complexity to manage the concurrent accesses in application's code. 
     465 * 
     466 * Alternatively application may disable the concurrent access by 
     467 * setting the \a allow flag to false. With concurrency disabled, only 
     468 * one thread can call the key's callback at one time. 
     469 * 
     470 * @param key       The key that was previously obtained from registration. 
     471 * @param allow     Set this to non-zero to allow concurrent callback calls 
     472 *                  and zero (PJ_FALSE) to disallow it. 
     473 * 
     474 * @return          PJ_SUCCESS on success or the appropriate error code. 
     475 */ 
     476PJ_DECL(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 
     477                                                pj_bool_t allow); 
     478 
     479/** 
     480 * Acquire the key's mutex. When the key's concurrency is disabled,  
     481 * application may call this function to synchronize its operation 
     482 * with the key's callback (i.e. this function will block until the 
     483 * key's callback returns). 
     484 * 
     485 * @param key       The key that was previously obtained from registration. 
     486 * 
     487 * @return          PJ_SUCCESS on success or the appropriate error code. 
     488 */ 
     489PJ_DECL(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key); 
     490 
     491/** 
     492 * Release the lock previously acquired with pj_ioqueue_lock_key(). 
     493 * 
     494 * @param key       The key that was previously obtained from registration. 
     495 * 
     496 * @return          PJ_SUCCESS on success or the appropriate error code. 
     497 */ 
     498PJ_DECL(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key); 
    369499 
    370500/** 
  • pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c

    r1405 r1789  
    9797#endif 
    9898 
     99    rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency); 
     100    if (rc != PJ_SUCCESS) 
     101        return rc; 
     102 
    99103    /* Get socket type. When socket type is datagram, some optimization 
    100104     * will be performed during send to allow parallel send operations. 
     
    194198        /* Completion of connect() operation */ 
    195199        pj_ssize_t bytes_transfered; 
     200        pj_bool_t has_lock; 
    196201 
    197202        /* Clear operation. */ 
     
    247252#endif 
    248253 
    249         /* Unlock; from this point we don't need to hold key's mutex. */ 
    250         pj_mutex_unlock(h->mutex); 
     254        /* Unlock; from this point we don't need to hold key's mutex 
     255         * (unless concurrency is disabled, which in this case we should 
     256         * hold the mutex while calling the callback) */ 
     257        if (h->allow_concurrent) { 
     258            /* concurrency may be changed while we're in the callback, so 
     259             * save it to a flag. 
     260             */ 
     261            has_lock = PJ_FALSE; 
     262            pj_mutex_unlock(h->mutex); 
     263        } else { 
     264            has_lock = PJ_TRUE; 
     265        } 
    251266 
    252267        /* Call callback. */ 
    253268        if (h->cb.on_connect_complete && !IS_CLOSING(h)) 
    254269            (*h->cb.on_connect_complete)(h, bytes_transfered); 
     270 
     271        /* Unlock if we still hold the lock */ 
     272        if (has_lock) { 
     273            pj_mutex_unlock(h->mutex); 
     274        } 
    255275 
    256276        /* Done. */ 
     
    318338            h->fd_type == pj_SOCK_DGRAM())  
    319339        { 
     340            pj_bool_t has_lock; 
    320341 
    321342            write_op->op = PJ_IOQUEUE_OP_NONE; 
     
    331352            } 
    332353 
    333             /* No need to hold mutex anymore */ 
    334             pj_mutex_unlock(h->mutex); 
     354            /* Unlock; from this point we don't need to hold key's mutex 
     355             * (unless concurrency is disabled, which in this case we should 
     356             * hold the mutex while calling the callback) */ 
     357            if (h->allow_concurrent) { 
     358                /* concurrency may be changed while we're in the callback, so 
     359                 * save it to a flag. 
     360                 */ 
     361                has_lock = PJ_FALSE; 
     362                pj_mutex_unlock(h->mutex); 
     363            } else { 
     364                has_lock = PJ_TRUE; 
     365            } 
    335366 
    336367            /* Call callback. */ 
     
    340371                                           write_op->written); 
    341372            } 
     373 
     374            if (has_lock) { 
     375                pj_mutex_unlock(h->mutex); 
     376            } 
    342377 
    343378        } else { 
     
    372407 
    373408        struct accept_operation *accept_op; 
     409        pj_bool_t has_lock; 
    374410         
    375411        /* Get one accept operation from the list. */ 
     
    390426        } 
    391427 
    392         /* Unlock; from this point we don't need to hold key's mutex. */ 
    393         pj_mutex_unlock(h->mutex); 
     428        /* Unlock; from this point we don't need to hold key's mutex 
     429         * (unless concurrency is disabled, which in this case we should 
     430         * hold the mutex while calling the callback) */ 
     431        if (h->allow_concurrent) { 
     432            /* concurrency may be changed while we're in the callback, so 
     433             * save it to a flag. 
     434             */ 
     435            has_lock = PJ_FALSE; 
     436            pj_mutex_unlock(h->mutex); 
     437        } else { 
     438            has_lock = PJ_TRUE; 
     439        } 
    394440 
    395441        /* Call callback. */ 
     
    400446        } 
    401447 
     448        if (has_lock) { 
     449            pj_mutex_unlock(h->mutex); 
     450        } 
    402451    } 
    403452    else 
     
    406455        struct read_operation *read_op; 
    407456        pj_ssize_t bytes_read; 
     457        pj_bool_t has_lock; 
    408458 
    409459        /* Get one pending read operation from the list. */ 
     
    480530        } 
    481531 
    482         /* Unlock; from this point we don't need to hold key's mutex. */ 
    483         pj_mutex_unlock(h->mutex); 
     532        /* Unlock; from this point we don't need to hold key's mutex 
     533         * (unless concurrency is disabled, which in this case we should 
     534         * hold the mutex while calling the callback) */ 
     535        if (h->allow_concurrent) { 
     536            /* concurrency may be changed while we're in the callback, so 
     537             * save it to a flag. 
     538             */ 
     539            has_lock = PJ_FALSE; 
     540            pj_mutex_unlock(h->mutex); 
     541        } else { 
     542            has_lock = PJ_TRUE; 
     543        } 
    484544 
    485545        /* Call callback. */ 
     
    489549                                      bytes_read); 
    490550        } 
     551 
     552        if (has_lock) { 
     553            pj_mutex_unlock(h->mutex); 
     554        } 
    491555 
    492556    } else { 
     
    504568                                       pj_ioqueue_key_t *h ) 
    505569{ 
     570    pj_bool_t has_lock; 
     571 
    506572    pj_mutex_lock(h->mutex); 
    507573 
     
    526592    ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT); 
    527593 
    528     pj_mutex_unlock(h->mutex); 
     594    /* Unlock; from this point we don't need to hold key's mutex 
     595     * (unless concurrency is disabled, which in this case we should 
     596     * hold the mutex while calling the callback) */ 
     597    if (h->allow_concurrent) { 
     598        /* concurrency may be changed while we're in the callback, so 
     599         * save it to a flag. 
     600         */ 
     601        has_lock = PJ_FALSE; 
     602        pj_mutex_unlock(h->mutex); 
     603    } else { 
     604        has_lock = PJ_TRUE; 
     605    } 
    529606 
    530607    /* Call callback. */ 
     
    542619 
    543620        (*h->cb.on_connect_complete)(h, status); 
     621    } 
     622 
     623    if (has_lock) { 
     624        pj_mutex_unlock(h->mutex); 
    544625    } 
    545626} 
     
    10971178} 
    10981179 
     1180PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue, 
     1181                                                        pj_bool_t allow) 
     1182{ 
     1183    PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); 
     1184    ioqueue->default_concurrency = allow; 
     1185    return PJ_SUCCESS; 
     1186} 
     1187 
     1188 
     1189PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 
     1190                                               pj_bool_t allow) 
     1191{ 
     1192    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
     1193 
     1194    /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is 
     1195     * disabled. 
     1196     */ 
     1197    PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); 
     1198 
     1199    key->allow_concurrent = allow; 
     1200    return PJ_SUCCESS; 
     1201} 
     1202 
     1203PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 
     1204{ 
     1205    return pj_mutex_lock(key->mutex); 
     1206} 
     1207 
     1208PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 
     1209{ 
     1210    return pj_mutex_unlock(key->mutex); 
     1211} 
     1212 
  • pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.h

    r974 r1789  
    104104    pj_bool_t               inside_callback;        \ 
    105105    pj_bool_t               destroy_requested;      \ 
     106    pj_bool_t               allow_concurrent;       \ 
    106107    pj_sock_t               fd;                     \ 
    107108    int                     fd_type;                \ 
     
    117118#define DECLARE_COMMON_IOQUEUE                      \ 
    118119    pj_lock_t          *lock;                       \ 
    119     pj_bool_t           auto_delete_lock; 
     120    pj_bool_t           auto_delete_lock;           \ 
     121    pj_bool_t           default_concurrency; 
    120122 
    121123 
  • pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c

    r1405 r1789  
    115115    enum handle_type    hnd_type; 
    116116    pj_ioqueue_callback cb; 
     117    pj_bool_t           allow_concurrent; 
    117118 
    118119#if PJ_HAS_TCP 
     
    124125    pj_bool_t           closing; 
    125126    pj_time_val         free_time; 
     127    pj_mutex_t         *mutex; 
    126128#endif 
    127129 
     
    136138    pj_lock_t        *lock; 
    137139    pj_bool_t         auto_delete_lock; 
     140    pj_bool_t         default_concurrency; 
    138141 
    139142#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     
    152155#endif 
    153156}; 
     157 
     158 
     159#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     160/* Prototype */ 
     161static void scan_closing_keys(pj_ioqueue_t *ioqueue); 
     162#endif 
    154163 
    155164 
     
    316325 
    317326    /* Create IOCP mutex */ 
    318     rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); 
     327    rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock); 
    319328    if (rc != PJ_SUCCESS) { 
    320329        CloseHandle(ioqueue->iocp); 
     
    323332 
    324333    ioqueue->auto_delete_lock = PJ_TRUE; 
     334    ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY; 
    325335 
    326336#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     
    345355            while (key != &ioqueue->free_list) { 
    346356                pj_atomic_destroy(key->ref_count); 
     357                pj_mutex_destroy(key->mutex); 
    347358                key = key->next; 
    348359            } 
     
    351362        } 
    352363 
     364        rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex); 
     365        if (rc != PJ_SUCCESS) { 
     366            pj_atomic_destroy(key->ref_count); 
     367            key = ioqueue->free_list.next; 
     368            while (key != &ioqueue->free_list) { 
     369                pj_atomic_destroy(key->ref_count); 
     370                pj_mutex_destroy(key->mutex); 
     371                key = key->next; 
     372            } 
     373            CloseHandle(ioqueue->iocp); 
     374            return rc; 
     375        } 
     376 
    353377        pj_list_push_back(&ioqueue->free_list, key); 
    354  
    355378    } 
    356379#endif 
     
    393416    while (key != &ioqueue->active_list) { 
    394417        pj_atomic_destroy(key->ref_count); 
     418        pj_mutex_destroy(key->mutex); 
    395419        key = key->next; 
    396420    } 
     
    399423    while (key != &ioqueue->closing_list) { 
    400424        pj_atomic_destroy(key->ref_count); 
     425        pj_mutex_destroy(key->mutex); 
    401426        key = key->next; 
    402427    } 
     
    405430    while (key != &ioqueue->free_list) { 
    406431        pj_atomic_destroy(key->ref_count); 
     432        pj_mutex_destroy(key->mutex); 
    407433        key = key->next; 
    408434    } 
     
    412438        pj_lock_destroy(ioqueue->lock); 
    413439 
     440    return PJ_SUCCESS; 
     441} 
     442 
     443 
     444PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, 
     445                                                       pj_bool_t allow) 
     446{ 
     447    PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); 
     448    ioqueue->default_concurrency = allow; 
    414449    return PJ_SUCCESS; 
    415450} 
     
    454489 
    455490#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     491    /* Scan closing list first to release unused keys. 
     492     * Must do this with lock acquired. 
     493     */ 
     494    scan_closing_keys(ioqueue); 
     495 
    456496    /* If safe unregistration is used, then get the key record from 
    457497     * the free list. 
     
    482522    pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 
    483523 
     524    /* Set concurrency for this handle */ 
     525    rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency); 
     526    if (rc != PJ_SUCCESS) { 
     527        pj_lock_release(ioqueue->lock); 
     528        return rc; 
     529    } 
     530 
    484531#if PJ_HAS_TCP 
    485532    rec->connecting = 0; 
     
    586633     */ 
    587634    if (pOv) { 
     635        pj_bool_t has_lock; 
     636 
    588637        /* Event was dequeued for either successfull or failed I/O */ 
    589638        key = (pj_ioqueue_key_t*)dwKey; 
     
    601650            return PJ_TRUE; 
    602651 
     652        /* If concurrency is disabled, lock the key  
     653         * (and save the lock status to local var since app may change 
     654         * concurrency setting while in the callback) */ 
     655        if (key->allow_concurrent == PJ_FALSE) { 
     656            pj_mutex_lock(key->mutex); 
     657            has_lock = PJ_TRUE; 
     658        } else { 
     659            has_lock = PJ_FALSE; 
     660        } 
     661 
     662        /* Now that we get the lock, check again that key is not closing */ 
     663        if (key->closing) { 
     664            if (has_lock) { 
     665                pj_mutex_unlock(key->mutex); 
     666            } 
     667            return PJ_TRUE; 
     668        } 
     669 
    603670        /* Increment reference counter to prevent this key from being 
    604671         * deleted 
    605672         */ 
    606673        pj_atomic_inc(key->ref_count); 
     674#else 
     675        PJ_UNUSED_ARG(has_lock); 
    607676#endif 
    608677 
     
    655724#if PJ_IOQUEUE_HAS_SAFE_UNREG 
    656725        decrement_counter(key); 
     726        if (has_lock) 
     727            pj_mutex_unlock(key->mutex); 
    657728#endif 
    658729 
     
    670741{ 
    671742    unsigned i; 
     743    pj_bool_t has_lock; 
    672744    enum { RETRY = 10 }; 
    673745 
     
    697769    /* Mark key as closing before closing handle. */ 
    698770    key->closing = 1; 
     771 
     772    /* If concurrency is disabled, wait until the key has finished 
     773     * processing the callback 
     774     */ 
     775    if (key->allow_concurrent == PJ_FALSE) { 
     776        pj_mutex_lock(key->mutex); 
     777        has_lock = PJ_TRUE; 
     778    } else { 
     779        has_lock = PJ_FALSE; 
     780    } 
     781#else 
     782    PJ_UNUSED_ARG(has_lock); 
    699783#endif 
    700784     
     
    718802     * Forcing context switch seems to have fixed that, but this is quite 
    719803     * an ugly solution.. 
     804     * 
     805     * Update 2008/02/13: 
     806     *  This should not happen if concurrency is disallowed for the key. 
     807     *  So at least application has a solution for this (i.e. by disallowing 
     808     *  concurrency in the key). 
    720809     */ 
    721810    //This will loop forever if unregistration is done on the callback. 
     
    729818    /* Decrement reference counter to destroy the key. */ 
    730819    decrement_counter(key); 
     820 
     821    if (has_lock) 
     822        pj_mutex_unlock(key->mutex); 
    731823#endif 
    732824 
    733825    return PJ_SUCCESS; 
    734826} 
     827 
     828#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     829/* Scan the closing list, and put pending closing keys to free list. 
     830 * Must do this with ioqueue mutex held. 
     831 */ 
     832static void scan_closing_keys(pj_ioqueue_t *ioqueue) 
     833{ 
     834    if (!pj_list_empty(&ioqueue->closing_list)) { 
     835        pj_time_val now; 
     836        pj_ioqueue_key_t *key; 
     837 
     838        pj_gettimeofday(&now); 
     839         
     840        /* Move closing keys to free list when they've finished the closing 
     841         * idle time. 
     842         */ 
     843        key = ioqueue->closing_list.next; 
     844        while (key != &ioqueue->closing_list) { 
     845            pj_ioqueue_key_t *next = key->next; 
     846 
     847            pj_assert(key->closing != 0); 
     848 
     849            if (PJ_TIME_VAL_GTE(now, key->free_time)) { 
     850                pj_list_erase(key); 
     851                pj_list_push_back(&ioqueue->free_list, key); 
     852            } 
     853            key = next; 
     854        } 
     855    } 
     856} 
     857#endif 
    735858 
    736859/* 
     
    767890    /* Check the closing keys only when there's no activity and when there are 
    768891     * pending closing keys. 
    769      * blp: 
    770      *  no, always check the list. Otherwise on busy activity, this will cause 
    771      *  ioqueue to reject new registration. 
    772      */ 
    773     if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) { 
    774         pj_time_val now; 
    775         pj_ioqueue_key_t *key; 
    776  
    777         pj_gettimeofday(&now); 
    778          
    779         /* Move closing keys to free list when they've finished the closing 
    780          * idle time. 
    781          */ 
     892     */ 
     893    if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 
    782894        pj_lock_acquire(ioqueue->lock); 
    783         key = ioqueue->closing_list.next; 
    784         while (key != &ioqueue->closing_list) { 
    785             pj_ioqueue_key_t *next = key->next; 
    786  
    787             pj_assert(key->closing != 0); 
    788  
    789             if (PJ_TIME_VAL_GTE(now, key->free_time)) { 
    790                 pj_list_erase(key); 
    791                 pj_list_push_back(&ioqueue->free_list, key); 
    792             } 
    793             key = next; 
    794         } 
     895        scan_closing_keys(ioqueue); 
    795896        pj_lock_release(ioqueue->lock); 
    796897    } 
     
    12691370} 
    12701371 
     1372PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 
     1373                                               pj_bool_t allow) 
     1374{ 
     1375    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
     1376 
     1377    /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is 
     1378     * disabled. 
     1379     */ 
     1380    PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); 
     1381 
     1382    key->allow_concurrent = allow; 
     1383    return PJ_SUCCESS; 
     1384} 
     1385 
     1386PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 
     1387{ 
     1388#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     1389    return pj_mutex_lock(key->mutex); 
     1390#else 
     1391    PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); 
     1392#endif 
     1393} 
     1394 
     1395PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 
     1396{ 
     1397#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     1398    return pj_mutex_unlock(key->mutex); 
     1399#else 
     1400    PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); 
     1401#endif 
     1402} 
     1403 
  • pjproject/trunk/pjlib/src/pjlib-test/ioq_perf.c

    r1405 r1789  
    222222 *    period of time. 
    223223 */ 
    224 static int perform_test(int sock_type, const char *type_name, 
     224static int perform_test(pj_bool_t allow_concur, 
     225                        int sock_type, const char *type_name, 
    225226                        unsigned thread_cnt, unsigned sockpair_cnt, 
    226227                        pj_size_t buffer_size,  
     
    261262    } 
    262263 
     264    rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); 
     265    if (rc != PJ_SUCCESS) { 
     266        app_perror("...error: pj_ioqueue_set_default_concurrency()", rc); 
     267        return -16; 
     268    } 
     269 
    263270    /* Initialize each producer-consumer pair. */ 
    264271    for (i=0; i<sockpair_cnt; ++i) { 
     
    438445} 
    439446 
    440 /* 
    441  * main test entry. 
    442  */ 
    443 int ioqueue_perf_test(void) 
     447static int ioqueue_perf_test_imp(pj_bool_t allow_concur) 
    444448{ 
    445449    enum { BUF_SIZE = 512 }; 
     
    501505 
    502506    PJ_LOG(3,(THIS_FILE, "   Benchmarking %s ioqueue:", pj_ioqueue_name())); 
     507    PJ_LOG(3,(THIS_FILE, "   Testing with concurency=%d", allow_concur)); 
    503508    PJ_LOG(3,(THIS_FILE, "   =======================================")); 
    504509    PJ_LOG(3,(THIS_FILE, "   Type  Threads  Skt.Pairs      Bandwidth")); 
     
    509514        pj_size_t bandwidth; 
    510515 
    511         rc = perform_test(test_param[i].type,  
     516        rc = perform_test(allow_concur, 
     517                          test_param[i].type,  
    512518                          test_param[i].type_name, 
    513519                          test_param[i].thread_cnt,  
     
    538544} 
    539545 
     546/* 
     547 * main test entry. 
     548 */ 
     549int ioqueue_perf_test(void) 
     550{ 
     551    int rc; 
     552 
     553    rc = ioqueue_perf_test_imp(PJ_TRUE); 
     554    if (rc != 0) 
     555        return rc; 
     556 
     557    rc = ioqueue_perf_test_imp(PJ_FALSE); 
     558    if (rc != 0) 
     559        return rc; 
     560 
     561    return 0; 
     562} 
     563 
    540564#else 
    541565/* To prevent warning about "translation unit is empty" 
  • pjproject/trunk/pjlib/src/pjlib-test/ioq_tcp.c

    r1405 r1789  
    233233 * Compliance test for success scenario. 
    234234 */ 
    235 static int compliance_test_0(void) 
     235static int compliance_test_0(pj_bool_t allow_concur) 
    236236{ 
    237237    pj_sock_t ssock=-1, csock0=-1, csock1=-1; 
     
    291291        app_perror("...ERROR in pj_ioqueue_create()", rc); 
    292292        status=-20; goto on_error; 
     293    } 
     294 
     295    // Concurrency 
     296    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 
     297    if (rc != PJ_SUCCESS) { 
     298        app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc); 
     299        status=-21; goto on_error; 
    293300    } 
    294301 
     
    459466 * In this case, the client connects to a non-existant service. 
    460467 */ 
    461 static int compliance_test_1(void) 
     468static int compliance_test_1(pj_bool_t allow_concur) 
    462469{ 
    463470    pj_sock_t csock1=PJ_INVALID_SOCKET; 
     
    478485    if (!ioque) { 
    479486        status=-20; goto on_error; 
     487    } 
     488 
     489    // Concurrency 
     490    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 
     491    if (rc != PJ_SUCCESS) { 
     492        status=-21; goto on_error; 
    480493    } 
    481494 
     
    582595 * Repeated connect/accept on the same listener socket. 
    583596 */ 
    584 static int compliance_test_2(void) 
     597static int compliance_test_2(pj_bool_t allow_concur) 
    585598{ 
    586599#if defined(PJ_SYMBIAN) && PJ_SYMBIAN!=0 
     
    648661    } 
    649662 
     663 
     664    // Concurrency 
     665    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 
     666    if (rc != PJ_SUCCESS) { 
     667        app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc); 
     668        return -11; 
     669    } 
    650670 
    651671    // Allocate buffers for send and receive. 
     
    888908 
    889909 
    890 int tcp_ioqueue_test() 
     910static int tcp_ioqueue_test_impl(pj_bool_t allow_concur) 
    891911{ 
    892912    int status; 
     913 
     914    PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur)); 
    893915 
    894916    PJ_LOG(3, (THIS_FILE, "..%s compliance test 0 (success scenario)", 
    895917               pj_ioqueue_name())); 
    896     if ((status=compliance_test_0()) != 0) { 
     918    if ((status=compliance_test_0(allow_concur)) != 0) { 
    897919        PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status)); 
    898920        return status; 
     
    900922    PJ_LOG(3, (THIS_FILE, "..%s compliance test 1 (failed scenario)", 
    901923               pj_ioqueue_name())); 
    902     if ((status=compliance_test_1()) != 0) { 
     924    if ((status=compliance_test_1(allow_concur)) != 0) { 
    903925        PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status)); 
    904926        return status; 
     
    907929    PJ_LOG(3, (THIS_FILE, "..%s compliance test 2 (repeated accept)", 
    908930               pj_ioqueue_name())); 
    909     if ((status=compliance_test_2()) != 0) { 
     931    if ((status=compliance_test_2(allow_concur)) != 0) { 
    910932        PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status)); 
    911933        return status; 
    912934    } 
     935 
     936    return 0; 
     937} 
     938 
     939int tcp_ioqueue_test() 
     940{ 
     941    int rc; 
     942 
     943    rc = tcp_ioqueue_test_impl(PJ_TRUE); 
     944    if (rc != 0) 
     945        return rc; 
     946 
     947    rc = tcp_ioqueue_test_impl(PJ_FALSE); 
     948    if (rc != 0) 
     949        return rc; 
    913950 
    914951    return 0; 
  • pjproject/trunk/pjlib/src/pjlib-test/ioq_udp.c

    r1405 r1789  
    126126 * data between two sockets. 
    127127 */  
    128 static int compliance_test(void) 
     128static int compliance_test(pj_bool_t allow_concur) 
    129129{ 
    130130    pj_sock_t ssock=-1, csock=-1; 
     
    177177    if (rc != PJ_SUCCESS) { 
    178178        status=-20; goto on_error; 
     179    } 
     180 
     181    // Set concurrency 
     182    TRACE_("set concurrency..."); 
     183    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 
     184    if (rc != PJ_SUCCESS) { 
     185        status=-21; goto on_error; 
    179186    } 
    180187 
     
    352359 * closed. 
    353360 */  
    354 static int unregister_test(void) 
     361static int unregister_test(pj_bool_t allow_concur) 
    355362{ 
    356363    enum { RPORT = 50000, SPORT = 50001 }; 
     
    382389    } 
    383390 
     391    // Set concurrency 
     392    TRACE_("set concurrency..."); 
     393    status = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); 
     394    if (status != PJ_SUCCESS) { 
     395        return -112; 
     396    } 
     397 
    384398    /* Create sender socket */ 
    385399    status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, SPORT, &ssock); 
     
    513527 * of sockets to the ioqueue. 
    514528 */ 
    515 static int many_handles_test(void) 
     529static int many_handles_test(pj_bool_t allow_concur) 
    516530{ 
    517531    enum { MAX = PJ_IOQUEUE_MAX_HANDLES }; 
     
    538552        app_perror("...error in pj_ioqueue_create", rc); 
    539553        return -10; 
     554    } 
     555 
     556    // Set concurrency 
     557    rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); 
     558    if (rc != PJ_SUCCESS) { 
     559        return -11; 
    540560    } 
    541561 
     
    601621 * Benchmarking IOQueue 
    602622 */ 
    603 static int bench_test(int bufsize, int inactive_sock_count) 
     623static int bench_test(pj_bool_t allow_concur, int bufsize,  
     624                      int inactive_sock_count) 
    604625{ 
    605626    pj_sock_t ssock=-1, csock=-1; 
     
    649670    if (rc != PJ_SUCCESS) { 
    650671        app_perror("...error: pj_ioqueue_create()", rc); 
     672        goto on_error; 
     673    } 
     674 
     675    // Set concurrency 
     676    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 
     677    if (rc != PJ_SUCCESS) { 
     678        app_perror("...error: pj_ioqueue_set_default_concurrency()", rc); 
    651679        goto on_error; 
    652680    } 
     
    840868} 
    841869 
    842 int udp_ioqueue_test() 
     870static int udp_ioqueue_test_imp(pj_bool_t allow_concur) 
    843871{ 
    844872    int status; 
    845873    int bufsize, sock_count; 
    846874 
     875    PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur)); 
     876 
    847877    //goto pass1; 
    848878 
    849879    PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name())); 
    850     if ((status=compliance_test()) != 0) { 
     880    if ((status=compliance_test(allow_concur)) != 0) { 
    851881        return status; 
    852882    } 
     
    855885 
    856886    PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name())); 
    857     if ((status=unregister_test()) != 0) { 
     887    if ((status=unregister_test(allow_concur)) != 0) { 
    858888        return status; 
    859889    } 
    860890    PJ_LOG(3, (THIS_FILE, "....unregister test ok")); 
    861891 
    862     if ((status=many_handles_test()) != 0) { 
     892    if ((status=many_handles_test(allow_concur)) != 0) { 
    863893        return status; 
    864894    } 
     
    880910 
    881911    for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) { 
    882         if ((status=bench_test(bufsize, SOCK_INACTIVE_MIN)) != 0) 
     912        if ((status=bench_test(allow_concur, bufsize, SOCK_INACTIVE_MIN)) != 0) 
    883913            return status; 
    884914    } 
     
    890920    { 
    891921        //PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count)); 
    892         if ((status=bench_test(bufsize, sock_count-2)) != 0) 
     922        if ((status=bench_test(allow_concur, bufsize, sock_count-2)) != 0) 
    893923            return status; 
    894924    } 
     925    return 0; 
     926} 
     927 
     928int udp_ioqueue_test() 
     929{ 
     930    int rc; 
     931 
     932    rc = udp_ioqueue_test_imp(PJ_TRUE); 
     933    if (rc != 0) 
     934        return rc; 
     935 
     936    rc = udp_ioqueue_test_imp(PJ_FALSE); 
     937    if (rc != 0) 
     938        return rc; 
     939 
    895940    return 0; 
    896941} 
  • pjproject/trunk/pjlib/src/pjlib-test/ioq_unreg.c

    r1405 r1789  
    287287} 
    288288 
    289 int udp_ioqueue_unreg_test(void) 
     289static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur) 
    290290{ 
    291291    enum { LOOP = 10 }; 
     
    294294    pj_ioqueue_t *ioqueue; 
    295295    pj_pool_t *test_pool; 
    296                                
     296         
     297    PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur)); 
     298 
    297299    test_method = UNREGISTER_IN_APP; 
    298300 
     
    305307    } 
    306308 
     309    rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); 
     310    if (rc != PJ_SUCCESS) { 
     311        app_perror("Error in pj_ioqueue_set_default_concurrency()", rc); 
     312        return -12; 
     313    } 
    307314 
    308315    PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3 (%s)",  
     
    352359} 
    353360 
    354  
     361int udp_ioqueue_unreg_test(void) 
     362{ 
     363    int rc; 
     364 
     365    rc = udp_ioqueue_unreg_test_imp(PJ_TRUE); 
     366    if (rc != 0) 
     367        return rc; 
     368 
     369    rc = udp_ioqueue_unreg_test_imp(PJ_FALSE); 
     370    if (rc != 0) 
     371        return rc; 
     372 
     373    return 0; 
     374} 
    355375 
    356376#else 
Note: See TracChangeset for help on using the changeset viewer.