Ignore:
Timestamp:
Feb 21, 2013 11:18:36 AM (11 years ago)
Author:
bennylp
Message:

Fixed #1616: Implementation of Group lock and other foundation in PJLIB for fixing synchronization issues

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_select.c

    r3553 r4359  
    4040#include <pj/sock_qos.h> 
    4141#include <pj/errno.h> 
     42#include <pj/rand.h> 
    4243 
    4344/* Now that we have access to OS'es <sys/select>, lets check again that 
     
    238239        key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); 
    239240        key->ref_count = 0; 
    240         rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 
     241        rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock); 
    241242        if (rc != PJ_SUCCESS) { 
    242243            key = ioqueue->free_list.next; 
    243244            while (key != &ioqueue->free_list) { 
    244                 pj_mutex_destroy(key->mutex); 
     245                pj_lock_destroy(key->lock); 
    245246                key = key->next; 
    246247            } 
     
    285286    key = ioqueue->active_list.next; 
    286287    while (key != &ioqueue->active_list) { 
    287         pj_mutex_destroy(key->mutex); 
     288        pj_lock_destroy(key->lock); 
    288289        key = key->next; 
    289290    } 
     
    291292    key = ioqueue->closing_list.next; 
    292293    while (key != &ioqueue->closing_list) { 
    293         pj_mutex_destroy(key->mutex); 
     294        pj_lock_destroy(key->lock); 
    294295        key = key->next; 
    295296    } 
     
    297298    key = ioqueue->free_list.next; 
    298299    while (key != &ioqueue->free_list) { 
    299         pj_mutex_destroy(key->mutex); 
     300        pj_lock_destroy(key->lock); 
    300301        key = key->next; 
    301302    } 
     
    313314 * Register socket handle to ioqueue. 
    314315 */ 
    315 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 
     316PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, 
    316317                                              pj_ioqueue_t *ioqueue, 
    317318                                              pj_sock_t sock, 
     319                                              pj_grp_lock_t *grp_lock, 
    318320                                              void *user_data, 
    319321                                              const pj_ioqueue_callback *cb, 
     
    359361#endif 
    360362 
    361     rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 
     363    rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb); 
    362364    if (rc != PJ_SUCCESS) { 
    363365        key = NULL; 
     
    387389on_return: 
    388390    /* On error, socket may be left in non-blocking mode. */ 
     391    if (rc != PJ_SUCCESS) { 
     392        if (key->grp_lock) 
     393            pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); 
     394    } 
    389395    *p_key = key; 
    390396    pj_lock_release(ioqueue->lock); 
     
    393399} 
    394400 
     401PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 
     402                                              pj_ioqueue_t *ioqueue, 
     403                                              pj_sock_t sock, 
     404                                              void *user_data, 
     405                                              const pj_ioqueue_callback *cb, 
     406                                              pj_ioqueue_key_t **p_key) 
     407{ 
     408    return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, 
     409                                     cb, p_key); 
     410} 
     411 
    395412#if PJ_IOQUEUE_HAS_SAFE_UNREG 
    396413/* Increment key's reference counter */ 
     
    447464     * deadlock. 
    448465     */ 
    449     pj_mutex_lock(key->mutex); 
     466    pj_ioqueue_lock_key(key); 
    450467 
    451468    /* Also lock ioqueue */ 
     
    486503 
    487504    /* Done. */ 
    488     pj_mutex_unlock(key->mutex); 
     505    if (key->grp_lock) { 
     506        /* just dec_ref and unlock. we will set grp_lock to NULL 
     507         * elsewhere */ 
     508        pj_grp_lock_t *grp_lock = key->grp_lock; 
     509        // Don't set grp_lock to NULL otherwise the other thread 
     510        // will crash. Just leave it as dangling pointer, but this 
     511        // should be safe 
     512        //key->grp_lock = NULL; 
     513        pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); 
     514        pj_grp_lock_release(grp_lock); 
     515    } else { 
     516        pj_ioqueue_unlock_key(key); 
     517    } 
    489518#else 
    490     pj_mutex_destroy(key->mutex); 
     519    if (key->grp_lock) { 
     520        /* set grp_lock to NULL and unlock */ 
     521        pj_grp_lock_t *grp_lock = key->grp_lock; 
     522        // Don't set grp_lock to NULL otherwise the other thread 
     523        // will crash. Just leave it as dangling pointer, but this 
     524        // should be safe 
     525        //key->grp_lock = NULL; 
     526        pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); 
     527        pj_grp_lock_release(grp_lock); 
     528    } else { 
     529        pj_ioqueue_unlock_key(key); 
     530    } 
     531 
     532    pj_lock_destroy(key->lock); 
    491533#endif 
    492534 
     
    621663        if (PJ_TIME_VAL_GTE(now, h->free_time)) { 
    622664            pj_list_erase(h); 
     665            // Don't set grp_lock to NULL otherwise the other thread 
     666            // will crash. Just leave it as dangling pointer, but this 
     667            // should be safe 
     668            //h->grp_lock = NULL; 
    623669            pj_list_push_back(&ioqueue->free_list, h); 
    624670        } 
     
    782828{ 
    783829    pj_fd_set_t rfdset, wfdset, xfdset; 
    784     int count, counter; 
     830    int count, i, counter; 
    785831    pj_ioqueue_key_t *h; 
    786832    struct event 
     
    893939    } 
    894940 
     941    for (i=0; i<counter; ++i) { 
     942        if (event[i].key->grp_lock) 
     943            pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); 
     944    } 
     945 
     946    PJ_RACE_ME(5); 
     947 
    895948    pj_lock_release(ioqueue->lock); 
     949 
     950    PJ_RACE_ME(5); 
    896951 
    897952    count = counter; 
     
    919974        decrement_counter(event[counter].key); 
    920975#endif 
     976 
     977        if (event[counter].key->grp_lock) 
     978            pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock, 
     979                                    "ioqueue", 0); 
    921980    } 
    922981 
Note: See TracChangeset for help on using the changeset viewer.