Ignore:
Timestamp:
Feb 21, 2013 11:18:36 AM (11 years ago)
Author:
bennylp
Message:

Fixed #1616: Implementation of Group lock and other foundation in PJLIB for fixing synchronization issues

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c

    r3666 r4359  
    7171                                     pj_ioqueue_key_t *key, 
    7272                                     pj_sock_t sock, 
     73                                     pj_grp_lock_t *grp_lock, 
    7374                                     void *user_data, 
    7475                                     const pj_ioqueue_callback *cb) 
     
    115116    /* Create mutex for the key. */ 
    116117#if !PJ_IOQUEUE_HAS_SAFE_UNREG 
    117     rc = pj_mutex_create_simple(pool, NULL, &key->mutex); 
     118    rc = pj_lock_create_simple_mutex(poll, NULL, &key->lock); 
    118119#endif 
     120    if (rc != PJ_SUCCESS) 
     121        return rc; 
     122 
     123    /* Group lock */ 
     124    key->grp_lock = grp_lock; 
     125    if (key->grp_lock) { 
     126        pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0); 
     127    } 
    119128     
    120     return rc; 
     129    return PJ_SUCCESS; 
    121130} 
    122131 
     
    190199{ 
    191200    /* Lock the key. */ 
    192     pj_mutex_lock(h->mutex); 
     201    pj_ioqueue_lock_key(h); 
    193202 
    194203    if (IS_CLOSING(h)) { 
    195         pj_mutex_unlock(h->mutex); 
     204        pj_ioqueue_unlock_key(h); 
    196205        return; 
    197206    } 
     
    262271             */ 
    263272            has_lock = PJ_FALSE; 
    264             pj_mutex_unlock(h->mutex); 
     273            pj_ioqueue_unlock_key(h); 
    265274        } else { 
    266275            has_lock = PJ_TRUE; 
     
    273282        /* Unlock if we still hold the lock */ 
    274283        if (has_lock) { 
    275             pj_mutex_unlock(h->mutex); 
     284            pj_ioqueue_unlock_key(h); 
    276285        } 
    277286 
     
    380389                 */ 
    381390                has_lock = PJ_FALSE; 
    382                 pj_mutex_unlock(h->mutex); 
     391                pj_ioqueue_unlock_key(h); 
     392                PJ_RACE_ME(5); 
    383393            } else { 
    384394                has_lock = PJ_TRUE; 
     
    393403 
    394404            if (has_lock) { 
    395                 pj_mutex_unlock(h->mutex); 
     405                pj_ioqueue_unlock_key(h); 
    396406            } 
    397407 
    398408        } else { 
    399             pj_mutex_unlock(h->mutex); 
     409            pj_ioqueue_unlock_key(h); 
    400410        } 
    401411 
     
    407417         * able to process the event. 
    408418         */ 
    409         pj_mutex_unlock(h->mutex); 
     419        pj_ioqueue_unlock_key(h); 
    410420    } 
    411421} 
     
    416426 
    417427    /* Lock the key. */ 
    418     pj_mutex_lock(h->mutex); 
     428    pj_ioqueue_lock_key(h); 
    419429 
    420430    if (IS_CLOSING(h)) { 
    421         pj_mutex_unlock(h->mutex); 
     431        pj_ioqueue_unlock_key(h); 
    422432        return; 
    423433    } 
     
    454464             */ 
    455465            has_lock = PJ_FALSE; 
    456             pj_mutex_unlock(h->mutex); 
     466            pj_ioqueue_unlock_key(h); 
     467            PJ_RACE_ME(5); 
    457468        } else { 
    458469            has_lock = PJ_TRUE; 
     
    467478 
    468479        if (has_lock) { 
    469             pj_mutex_unlock(h->mutex); 
     480            pj_ioqueue_unlock_key(h); 
    470481        } 
    471482    } 
     
    568579             */ 
    569580            has_lock = PJ_FALSE; 
    570             pj_mutex_unlock(h->mutex); 
     581            pj_ioqueue_unlock_key(h); 
     582            PJ_RACE_ME(5); 
    571583        } else { 
    572584            has_lock = PJ_TRUE; 
     
    581593 
    582594        if (has_lock) { 
    583             pj_mutex_unlock(h->mutex); 
     595            pj_ioqueue_unlock_key(h); 
    584596        } 
    585597 
     
    590602         * able to process the event. 
    591603         */ 
    592         pj_mutex_unlock(h->mutex); 
     604        pj_ioqueue_unlock_key(h); 
    593605    } 
    594606} 
     
    600612    pj_bool_t has_lock; 
    601613 
    602     pj_mutex_lock(h->mutex); 
     614    pj_ioqueue_lock_key(h); 
    603615 
    604616    if (!h->connecting) { 
     
    607619         * it has been processed by other thread. 
    608620         */ 
    609         pj_mutex_unlock(h->mutex); 
     621        pj_ioqueue_unlock_key(h); 
    610622        return; 
    611623    } 
    612624 
    613625    if (IS_CLOSING(h)) { 
    614         pj_mutex_unlock(h->mutex); 
     626        pj_ioqueue_unlock_key(h); 
    615627        return; 
    616628    } 
     
    630642         */ 
    631643        has_lock = PJ_FALSE; 
    632         pj_mutex_unlock(h->mutex); 
     644        pj_ioqueue_unlock_key(h); 
     645        PJ_RACE_ME(5); 
    633646    } else { 
    634647        has_lock = PJ_TRUE; 
     
    652665 
    653666    if (has_lock) { 
    654         pj_mutex_unlock(h->mutex); 
     667        pj_ioqueue_unlock_key(h); 
    655668    } 
    656669} 
     
    714727    read_op->flags = flags; 
    715728 
    716     pj_mutex_lock(key->mutex); 
     729    pj_ioqueue_lock_key(key); 
    717730    /* Check again. Handle may have been closed after the previous check 
    718731     * in multithreaded app. If we add bad handle to the set it will 
     
    720733     */ 
    721734    if (IS_CLOSING(key)) { 
    722         pj_mutex_unlock(key->mutex); 
     735        pj_ioqueue_unlock_key(key); 
    723736        return PJ_ECANCELLED; 
    724737    } 
    725738    pj_list_insert_before(&key->read_list, read_op); 
    726739    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); 
    727     pj_mutex_unlock(key->mutex); 
     740    pj_ioqueue_unlock_key(key); 
    728741 
    729742    return PJ_EPENDING; 
     
    790803    read_op->rmt_addrlen = addrlen; 
    791804 
    792     pj_mutex_lock(key->mutex); 
     805    pj_ioqueue_lock_key(key); 
    793806    /* Check again. Handle may have been closed after the previous check 
    794807     * in multithreaded app. If we add bad handle to the set it will 
     
    796809     */ 
    797810    if (IS_CLOSING(key)) { 
    798         pj_mutex_unlock(key->mutex); 
     811        pj_ioqueue_unlock_key(key); 
    799812        return PJ_ECANCELLED; 
    800813    } 
    801814    pj_list_insert_before(&key->read_list, read_op); 
    802815    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); 
    803     pj_mutex_unlock(key->mutex); 
     816    pj_ioqueue_unlock_key(key); 
    804817 
    805818    return PJ_EPENDING; 
     
    904917    write_op->flags = flags; 
    905918     
    906     pj_mutex_lock(key->mutex); 
     919    pj_ioqueue_lock_key(key); 
    907920    /* Check again. Handle may have been closed after the previous check 
    908921     * in multithreaded app. If we add bad handle to the set it will 
     
    910923     */ 
    911924    if (IS_CLOSING(key)) { 
    912         pj_mutex_unlock(key->mutex); 
     925        pj_ioqueue_unlock_key(key); 
    913926        return PJ_ECANCELLED; 
    914927    } 
    915928    pj_list_insert_before(&key->write_list, write_op); 
    916929    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); 
    917     pj_mutex_unlock(key->mutex); 
     930    pj_ioqueue_unlock_key(key); 
    918931 
    919932    return PJ_EPENDING; 
     
    10511064    write_op->rmt_addrlen = addrlen; 
    10521065     
    1053     pj_mutex_lock(key->mutex); 
     1066    pj_ioqueue_lock_key(key); 
    10541067    /* Check again. Handle may have been closed after the previous check 
    10551068     * in multithreaded app. If we add bad handle to the set it will 
     
    10571070     */ 
    10581071    if (IS_CLOSING(key)) { 
    1059         pj_mutex_unlock(key->mutex); 
     1072        pj_ioqueue_unlock_key(key); 
    10601073        return PJ_ECANCELLED; 
    10611074    } 
    10621075    pj_list_insert_before(&key->write_list, write_op); 
    10631076    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); 
    1064     pj_mutex_unlock(key->mutex); 
     1077    pj_ioqueue_unlock_key(key); 
    10651078 
    10661079    return PJ_EPENDING; 
     
    11281141    accept_op->local_addr = local; 
    11291142 
    1130     pj_mutex_lock(key->mutex); 
     1143    pj_ioqueue_lock_key(key); 
    11311144    /* Check again. Handle may have been closed after the previous check 
    11321145     * in multithreaded app. If we add bad handle to the set it will 
     
    11341147     */ 
    11351148    if (IS_CLOSING(key)) { 
    1136         pj_mutex_unlock(key->mutex); 
     1149        pj_ioqueue_unlock_key(key); 
    11371150        return PJ_ECANCELLED; 
    11381151    } 
    11391152    pj_list_insert_before(&key->accept_list, accept_op); 
    11401153    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); 
    1141     pj_mutex_unlock(key->mutex); 
     1154    pj_ioqueue_unlock_key(key); 
    11421155 
    11431156    return PJ_EPENDING; 
     
    11721185        if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { 
    11731186            /* Pending! */ 
    1174             pj_mutex_lock(key->mutex); 
     1187            pj_ioqueue_lock_key(key); 
    11751188            /* Check again. Handle may have been closed after the previous  
    11761189             * check in multithreaded app. See #913 
    11771190             */ 
    11781191            if (IS_CLOSING(key)) { 
    1179                 pj_mutex_unlock(key->mutex); 
     1192                pj_ioqueue_unlock_key(key); 
    11801193                return PJ_ECANCELLED; 
    11811194            } 
     
    11831196            ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); 
    11841197            ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT); 
    1185             pj_mutex_unlock(key->mutex); 
     1198            pj_ioqueue_unlock_key(key); 
    11861199            return PJ_EPENDING; 
    11871200        } else { 
     
    12291242     * really make sure that it's still there; then call the callback. 
    12301243     */ 
    1231     pj_mutex_lock(key->mutex); 
     1244    pj_ioqueue_lock_key(key); 
    12321245 
    12331246    /* Find the operation in the pending read list. */ 
     
    12371250            pj_list_erase(op_rec); 
    12381251            op_rec->op = PJ_IOQUEUE_OP_NONE; 
    1239             pj_mutex_unlock(key->mutex); 
     1252            pj_ioqueue_unlock_key(key); 
    12401253 
    12411254            (*key->cb.on_read_complete)(key, op_key, bytes_status); 
     
    12511264            pj_list_erase(op_rec); 
    12521265            op_rec->op = PJ_IOQUEUE_OP_NONE; 
    1253             pj_mutex_unlock(key->mutex); 
     1266            pj_ioqueue_unlock_key(key); 
    12541267 
    12551268            (*key->cb.on_write_complete)(key, op_key, bytes_status); 
     
    12651278            pj_list_erase(op_rec); 
    12661279            op_rec->op = PJ_IOQUEUE_OP_NONE; 
    1267             pj_mutex_unlock(key->mutex); 
     1280            pj_ioqueue_unlock_key(key); 
    12681281 
    12691282            (*key->cb.on_accept_complete)(key, op_key,  
     
    12751288    } 
    12761289 
    1277     pj_mutex_unlock(key->mutex); 
     1290    pj_ioqueue_unlock_key(key); 
    12781291     
    12791292    return PJ_EINVALIDOP; 
     
    13051318PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 
    13061319{ 
    1307     return pj_mutex_lock(key->mutex); 
     1320    if (key->grp_lock) 
     1321        return pj_grp_lock_acquire(key->grp_lock); 
     1322    else 
     1323        return pj_lock_acquire(key->lock); 
    13081324} 
    13091325 
    13101326PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 
    13111327{ 
    1312     return pj_mutex_unlock(key->mutex); 
    1313 } 
    1314  
     1328    if (key->grp_lock) 
     1329        return pj_grp_lock_release(key->grp_lock); 
     1330    else 
     1331        return pj_lock_release(key->lock); 
     1332} 
     1333 
     1334 
Note: See TracChangeset for help on using the changeset viewer.