Ignore:
Timestamp:
Feb 13, 2008 3:17:28 PM (16 years ago)
Author:
bennylp
Message:

Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c

    r1405 r1789  
    9797#endif 
    9898 
     99    rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency); 
     100    if (rc != PJ_SUCCESS) 
     101        return rc; 
     102 
    99103    /* Get socket type. When socket type is datagram, some optimization 
    100104     * will be performed during send to allow parallel send operations. 
     
    194198        /* Completion of connect() operation */ 
    195199        pj_ssize_t bytes_transfered; 
     200        pj_bool_t has_lock; 
    196201 
    197202        /* Clear operation. */ 
     
    247252#endif 
    248253 
    249         /* Unlock; from this point we don't need to hold key's mutex. */ 
    250         pj_mutex_unlock(h->mutex); 
     254        /* Unlock; from this point we don't need to hold key's mutex 
     255         * (unless concurrency is disabled, which in this case we should 
     256         * hold the mutex while calling the callback) */ 
     257        if (h->allow_concurrent) { 
     258            /* concurrency may be changed while we're in the callback, so 
     259             * save it to a flag. 
     260             */ 
     261            has_lock = PJ_FALSE; 
     262            pj_mutex_unlock(h->mutex); 
     263        } else { 
     264            has_lock = PJ_TRUE; 
     265        } 
    251266 
    252267        /* Call callback. */ 
    253268        if (h->cb.on_connect_complete && !IS_CLOSING(h)) 
    254269            (*h->cb.on_connect_complete)(h, bytes_transfered); 
     270 
     271        /* Unlock if we still hold the lock */ 
     272        if (has_lock) { 
     273            pj_mutex_unlock(h->mutex); 
     274        } 
    255275 
    256276        /* Done. */ 
     
    318338            h->fd_type == pj_SOCK_DGRAM())  
    319339        { 
     340            pj_bool_t has_lock; 
    320341 
    321342            write_op->op = PJ_IOQUEUE_OP_NONE; 
     
    331352            } 
    332353 
    333             /* No need to hold mutex anymore */ 
    334             pj_mutex_unlock(h->mutex); 
     354            /* Unlock; from this point we don't need to hold key's mutex 
     355             * (unless concurrency is disabled, which in this case we should 
     356             * hold the mutex while calling the callback) */ 
     357            if (h->allow_concurrent) { 
     358                /* concurrency may be changed while we're in the callback, so 
     359                 * save it to a flag. 
     360                 */ 
     361                has_lock = PJ_FALSE; 
     362                pj_mutex_unlock(h->mutex); 
     363            } else { 
     364                has_lock = PJ_TRUE; 
     365            } 
    335366 
    336367            /* Call callback. */ 
     
    340371                                           write_op->written); 
    341372            } 
     373 
     374            if (has_lock) { 
     375                pj_mutex_unlock(h->mutex); 
     376            } 
    342377 
    343378        } else { 
     
    372407 
    373408        struct accept_operation *accept_op; 
     409        pj_bool_t has_lock; 
    374410         
    375411        /* Get one accept operation from the list. */ 
     
    390426        } 
    391427 
    392         /* Unlock; from this point we don't need to hold key's mutex. */ 
    393         pj_mutex_unlock(h->mutex); 
     428        /* Unlock; from this point we don't need to hold key's mutex 
     429         * (unless concurrency is disabled, which in this case we should 
     430         * hold the mutex while calling the callback) */ 
     431        if (h->allow_concurrent) { 
     432            /* concurrency may be changed while we're in the callback, so 
     433             * save it to a flag. 
     434             */ 
     435            has_lock = PJ_FALSE; 
     436            pj_mutex_unlock(h->mutex); 
     437        } else { 
     438            has_lock = PJ_TRUE; 
     439        } 
    394440 
    395441        /* Call callback. */ 
     
    400446        } 
    401447 
     448        if (has_lock) { 
     449            pj_mutex_unlock(h->mutex); 
     450        } 
    402451    } 
    403452    else 
     
    406455        struct read_operation *read_op; 
    407456        pj_ssize_t bytes_read; 
     457        pj_bool_t has_lock; 
    408458 
    409459        /* Get one pending read operation from the list. */ 
     
    480530        } 
    481531 
    482         /* Unlock; from this point we don't need to hold key's mutex. */ 
    483         pj_mutex_unlock(h->mutex); 
     532        /* Unlock; from this point we don't need to hold key's mutex 
     533         * (unless concurrency is disabled, which in this case we should 
     534         * hold the mutex while calling the callback) */ 
     535        if (h->allow_concurrent) { 
     536            /* concurrency may be changed while we're in the callback, so 
     537             * save it to a flag. 
     538             */ 
     539            has_lock = PJ_FALSE; 
     540            pj_mutex_unlock(h->mutex); 
     541        } else { 
     542            has_lock = PJ_TRUE; 
     543        } 
    484544 
    485545        /* Call callback. */ 
     
    489549                                      bytes_read); 
    490550        } 
     551 
     552        if (has_lock) { 
     553            pj_mutex_unlock(h->mutex); 
     554        } 
    491555 
    492556    } else { 
     
    504568                                       pj_ioqueue_key_t *h ) 
    505569{ 
     570    pj_bool_t has_lock; 
     571 
    506572    pj_mutex_lock(h->mutex); 
    507573 
     
    526592    ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT); 
    527593 
    528     pj_mutex_unlock(h->mutex); 
     594    /* Unlock; from this point we don't need to hold key's mutex 
     595     * (unless concurrency is disabled, which in this case we should 
     596     * hold the mutex while calling the callback) */ 
     597    if (h->allow_concurrent) { 
     598        /* concurrency may be changed while we're in the callback, so 
     599         * save it to a flag. 
     600         */ 
     601        has_lock = PJ_FALSE; 
     602        pj_mutex_unlock(h->mutex); 
     603    } else { 
     604        has_lock = PJ_TRUE; 
     605    } 
    529606 
    530607    /* Call callback. */ 
     
    542619 
    543620        (*h->cb.on_connect_complete)(h, status); 
     621    } 
     622 
     623    if (has_lock) { 
     624        pj_mutex_unlock(h->mutex); 
    544625    } 
    545626} 
     
    10971178} 
    10981179 
     1180PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue, 
     1181                                                        pj_bool_t allow) 
     1182{ 
     1183    PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); 
     1184    ioqueue->default_concurrency = allow; 
     1185    return PJ_SUCCESS; 
     1186} 
     1187 
     1188 
     1189PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 
     1190                                               pj_bool_t allow) 
     1191{ 
     1192    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
     1193 
     1194    /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is 
     1195     * disabled. 
     1196     */ 
     1197    PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); 
     1198 
     1199    key->allow_concurrent = allow; 
     1200    return PJ_SUCCESS; 
     1201} 
     1202 
     1203PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 
     1204{ 
     1205    return pj_mutex_lock(key->mutex); 
     1206} 
     1207 
     1208PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 
     1209{ 
     1210    return pj_mutex_unlock(key->mutex); 
     1211} 
     1212 
Note: See TracChangeset for help on using the changeset viewer.