Changeset 6058


Ignore:
Timestamp:
Sep 3, 2019 2:10:45 AM (5 years ago)
Author:
ming
Message:

Fixed #2225: Timer heap refactoring

Location:
pjproject/trunk/pjlib
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/include/pj/config.h

    r6031 r6058  
    533533 
    534534/** 
     535 * If enabled, when calling pj_pool_release(), the memory pool content 
     536 * will be wiped out first before released. 
     537 * 
     538 * Default: 0 
     539 */ 
     540#ifndef PJ_POOL_RELEASE_WIPE_DATA 
     541#  define PJ_POOL_RELEASE_WIPE_DATA     0 
     542#endif 
     543 
     544 
     545/** 
    535546 * Enable timer heap debugging facility. When this is enabled, application 
    536547 * can call pj_timer_heap_dump() to show the contents of the timer heap 
     
    538549 * See https://trac.pjsip.org/repos/ticket/1527 for more info. 
    539550 * 
    540  * Default: 0 
     551 * Default: 1 
    541552 */ 
    542553#ifndef PJ_TIMER_DEBUG 
    543 #  define PJ_TIMER_DEBUG            0 
     554#  define PJ_TIMER_DEBUG            1 
     555#endif 
     556 
     557 
     558/** 
     559 * If enabled, the timer heap will keep internal copies of the timer entries. 
     560 * This will increase the robustness and stability of the timer heap (against 
     561 * accidental modification or premature deallocation of the timer entries) and 
     562 * makes it easier to troubleshoot any timer related issues, with the overhead 
     563 * of additional memory space required. 
     564 * 
     565 * Note that the detection against premature deallocation only works if the 
     566 * freed memory content has changed (such as if it's been reallocated and 
     567 * overwritten by another data. Alternatively, you can enable 
     568 * PJ_POOL_RELEASE_WIPE_DATA which will erase the data first before releasing 
     569 * the memory). 
     570 * 
     571 * Default: 1 (enabled) 
     572 */ 
     573#ifndef PJ_TIMER_HEAP_USE_COPY 
     574#  define PJ_TIMER_HEAP_USE_COPY    1 
    544575#endif 
    545576 
  • pjproject/trunk/pjlib/include/pj/pool_i.h

    r5990 r6058  
    8989PJ_IDEF(void) pj_pool_release( pj_pool_t *pool ) 
    9090{ 
     91#if PJ_POOL_RELEASE_WIPE_DATA 
     92    pj_pool_block *b; 
     93 
     94    b = pool->block_list.next; 
     95    while (b != &pool->block_list) { 
     96        volatile unsigned char *p = b->buf; 
     97        while (p < b->end) *p++ = 0; 
     98        b = b->next; 
     99    } 
     100#endif 
     101 
    91102    if (pool->factory->release_pool) 
    92103        (*pool->factory->release_pool)(pool->factory, pool); 
  • pjproject/trunk/pjlib/include/pj/timer.h

    r5971 r6058  
    114114    pj_timer_id_t _timer_id; 
    115115 
     116#if !PJ_TIMER_HEAP_USE_COPY 
    116117    /**  
    117118     * The future time when the timer expires, which the value is updated 
     
    129130    const char  *src_file; 
    130131    int          src_line; 
     132#endif 
     133 
    131134#endif 
    132135} pj_timer_entry; 
  • pjproject/trunk/pjlib/src/pj/timer.c

    r5971 r6058  
    4747#define DEFAULT_MAX_TIMED_OUT_PER_POLL  (64) 
    4848 
     49/* Enable this to raise assertion in order to catch bug of timer entry 
     50 * which has been deallocated without being cancelled. If disabled, 
     51 * the timer heap will simply remove the destroyed entry (and print log) 
     52 * and resume normally. 
     53 * This setting only works if PJ_TIMER_HEAP_USE_COPY is enabled. 
     54 */ 
     55#define ASSERT_IF_ENTRY_DESTROYED (PJ_TIMER_HEAP_USE_COPY? 0: 0) 
     56 
     57 
    4958enum 
    5059{ 
     
    5463}; 
    5564 
     65#if PJ_TIMER_HEAP_USE_COPY 
     66 
     67/* Duplicate/copy of the timer entry. */ 
     68typedef struct pj_timer_entry_dup 
     69{ 
     70    /** 
     71     * The duplicate copy. 
     72     */ 
     73    pj_timer_entry  dup;                 
     74     
     75    /** 
     76     * Pointer of the original timer entry. 
     77     */ 
     78    pj_timer_entry *entry; 
     79 
     80    /**  
     81     * The future time when the timer expires, which the value is updated 
     82     * by timer heap when the timer is scheduled. 
     83     */ 
     84    pj_time_val     _timer_value; 
     85 
     86    /** 
     87     * Internal: the group lock used by this entry, set when 
     88     * pj_timer_heap_schedule_w_lock() is used. 
     89     */ 
     90    pj_grp_lock_t  *_grp_lock; 
     91 
     92#if PJ_TIMER_DEBUG 
     93    const char     *src_file; 
     94    int             src_line; 
     95#endif 
     96 
     97} pj_timer_entry_dup; 
     98 
     99#define GET_TIMER(ht, node) &ht->timer_dups[node->_timer_id] 
     100#define GET_ENTRY(node) node->entry 
     101#define GET_FIELD(node, _timer_id) node->dup._timer_id 
     102 
     103#else 
     104 
     105typedef pj_timer_entry pj_timer_entry_dup; 
     106 
     107#define GET_TIMER(ht, node) node 
     108#define GET_ENTRY(node) node 
     109#define GET_FIELD(node, _timer_id) node->_timer_id 
     110 
     111#endif 
    56112 
    57113/** 
     
    84140     * array. 
    85141     */ 
    86     pj_timer_entry **heap; 
     142    pj_timer_entry_dup **heap; 
    87143 
    88144    /** 
     
    97153     */ 
    98154    pj_timer_id_t *timer_ids; 
     155     
     156    /** 
     157     * An array of timer entry copies. 
     158     */ 
     159    pj_timer_entry_dup *timer_dups; 
    99160 
    100161    /** 
     
    127188 
    128189static void copy_node( pj_timer_heap_t *ht, pj_size_t slot,  
    129                        pj_timer_entry *moved_node ) 
     190                       pj_timer_entry_dup *moved_node ) 
    130191{ 
    131192    PJ_CHECK_STACK(); 
     
    135196     
    136197    // Update the corresponding slot in the parallel <timer_ids_> array. 
    137     ht->timer_ids[moved_node->_timer_id] = (int)slot; 
     198    ht->timer_ids[GET_FIELD(moved_node, _timer_id)] = (int)slot; 
    138199} 
    139200 
     
    165226 
    166227 
    167 static void reheap_down(pj_timer_heap_t *ht, pj_timer_entry *moved_node, 
     228static void reheap_down(pj_timer_heap_t *ht, pj_timer_entry_dup *moved_node, 
    168229                        size_t slot, size_t child) 
    169230{ 
     
    175236    { 
    176237        // Choose the smaller of the two children. 
    177         if (child + 1 < ht->cur_size 
    178             && PJ_TIME_VAL_LT(ht->heap[child + 1]->_timer_value, ht->heap[child]->_timer_value)) 
     238        if (child + 1 < ht->cur_size && 
     239            PJ_TIME_VAL_LT(ht->heap[child + 1]->_timer_value, 
     240                           ht->heap[child]->_timer_value)) 
     241        { 
    179242            child++; 
     243        } 
    180244         
    181245        // Perform a <copy> if the child has a larger timeout value than 
    182246        // the <moved_node>. 
    183         if (PJ_TIME_VAL_LT(ht->heap[child]->_timer_value, moved_node->_timer_value)) 
     247        if (PJ_TIME_VAL_LT(ht->heap[child]->_timer_value, 
     248                           moved_node->_timer_value)) 
    184249        { 
    185250            copy_node( ht, slot, ht->heap[child]); 
     
    195260} 
    196261 
    197 static void reheap_up( pj_timer_heap_t *ht, pj_timer_entry *moved_node, 
     262static void reheap_up( pj_timer_heap_t *ht, pj_timer_entry_dup *moved_node, 
    198263                       size_t slot, size_t parent) 
    199264{ 
     
    204269        // If the parent node is greater than the <moved_node> we need 
    205270        // to copy it down. 
    206         if (PJ_TIME_VAL_LT(moved_node->_timer_value, ht->heap[parent]->_timer_value)) 
     271        if (PJ_TIME_VAL_LT(moved_node->_timer_value, 
     272                           ht->heap[parent]->_timer_value)) 
    207273        { 
    208274            copy_node(ht, slot, ht->heap[parent]); 
     
    220286 
    221287 
    222 static pj_timer_entry * remove_node( pj_timer_heap_t *ht, size_t slot) 
    223 { 
    224     pj_timer_entry *removed_node = ht->heap[slot]; 
     288static pj_timer_entry_dup * remove_node( pj_timer_heap_t *ht, size_t slot) 
     289{ 
     290    pj_timer_entry_dup *removed_node = ht->heap[slot]; 
    225291     
    226292    // Return this timer id to the freelist. 
    227     push_freelist( ht, removed_node->_timer_id ); 
     293    push_freelist( ht, GET_FIELD(removed_node, _timer_id) ); 
    228294     
    229295    // Decrement the size of the heap by one since we're removing the 
     
    232298     
    233299    // Set the ID 
    234     removed_node->_timer_id = -1; 
     300    if (GET_FIELD(removed_node, _timer_id) != 
     301        GET_ENTRY(removed_node)->_timer_id) 
     302    { 
     303            PJ_LOG(3,(THIS_FILE, "Bug! Trying to remove entry %p from %s " 
     304                                 "line %d, which has been deallocated " 
     305                                 "without being cancelled", 
     306                                 GET_ENTRY(removed_node), 
     307#if PJ_TIMER_DEBUG 
     308                                 removed_node->src_file, 
     309                                 removed_node->src_line)); 
     310#else 
     311                                 "N/A", 0)); 
     312#endif 
     313#if ASSERT_IF_ENTRY_DESTROYED 
     314        pj_assert(removed_node->dup._timer_id==removed_node->entry->_timer_id); 
     315#endif 
     316    } 
     317    GET_ENTRY(removed_node)->_timer_id = -1; 
     318    GET_FIELD(removed_node, _timer_id) = -1; 
    235319 
    236320    // Only try to reheapify if we're not deleting the last entry. 
     
    239323    { 
    240324        pj_size_t parent; 
    241         pj_timer_entry *moved_node = ht->heap[ht->cur_size]; 
     325        pj_timer_entry_dup *moved_node = ht->heap[ht->cur_size]; 
    242326         
    243327        // Move the end node to the location being removed and update 
     
    249333        parent = HEAP_PARENT (slot); 
    250334         
    251         if (PJ_TIME_VAL_GTE(moved_node->_timer_value, ht->heap[parent]->_timer_value)) 
     335        if (PJ_TIME_VAL_GTE(moved_node->_timer_value, 
     336                            ht->heap[parent]->_timer_value)) 
     337        { 
    252338            reheap_down( ht, moved_node, slot, HEAP_LEFT(slot)); 
    253         else 
     339        } else { 
    254340            reheap_up( ht, moved_node, slot, parent); 
     341        } 
    255342    } 
    256343     
     
    258345} 
    259346 
    260 static void grow_heap(pj_timer_heap_t *ht) 
     347static pj_status_t grow_heap(pj_timer_heap_t *ht) 
    261348{ 
    262349    // All the containers will double in size from max_size_ 
    263350    size_t new_size = ht->max_size * 2; 
     351    pj_timer_entry_dup *new_timer_dups = 0; 
    264352    pj_timer_id_t *new_timer_ids; 
    265353    pj_size_t i; 
    266354     
     355    PJ_LOG(6,(THIS_FILE, "Growing heap size from %d to %d", 
     356                         ht->max_size, new_size)); 
     357     
    267358    // First grow the heap itself. 
    268359     
    269     pj_timer_entry **new_heap = 0; 
    270      
    271     new_heap = (pj_timer_entry**)  
    272                pj_pool_alloc(ht->pool, sizeof(pj_timer_entry*) * new_size); 
    273     memcpy(new_heap, ht->heap, ht->max_size * sizeof(pj_timer_entry*)); 
    274     //delete [] this->heap_; 
     360    pj_timer_entry_dup **new_heap = 0; 
     361     
     362    new_heap = (pj_timer_entry_dup**)  
     363               pj_pool_calloc(ht->pool, new_size, sizeof(pj_timer_entry_dup*)); 
     364    if (!new_heap) 
     365        return PJ_ENOMEM; 
     366     
     367#if PJ_TIMER_HEAP_USE_COPY 
     368    // Grow the array of timer copies. 
     369     
     370    new_timer_dups = (pj_timer_entry_dup*)  
     371                     pj_pool_alloc(ht->pool, 
     372                                   sizeof(pj_timer_entry_dup) * new_size); 
     373    if (!new_timer_dups) 
     374        return PJ_ENOMEM; 
     375 
     376    memcpy(new_timer_dups, ht->timer_dups, 
     377           ht->max_size * sizeof(pj_timer_entry_dup)); 
     378    for (i = 0; i < ht->cur_size; i++) { 
     379        int idx = ht->heap[i] - ht->timer_dups; 
     380        // Point to the address in the new array 
     381        pj_assert(idx >= 0 && idx < ht->max_size); 
     382        new_heap[i] = &new_timer_dups[idx]; 
     383    } 
     384    ht->timer_dups = new_timer_dups; 
     385#else 
     386    memcpy(new_heap, ht->heap, ht->max_size * sizeof(pj_timer_entry *)); 
     387#endif 
    275388    ht->heap = new_heap; 
    276389     
     
    280393    new_timer_ids = (pj_timer_id_t*) 
    281394                    pj_pool_alloc(ht->pool, new_size * sizeof(pj_timer_id_t)); 
    282      
     395    if (!new_timer_ids) 
     396        return PJ_ENOMEM; 
     397 
    283398    memcpy( new_timer_ids, ht->timer_ids, ht->max_size * sizeof(pj_timer_id_t)); 
    284399     
     
    291406     
    292407    ht->max_size = new_size; 
    293 } 
    294  
    295 static void insert_node(pj_timer_heap_t *ht, pj_timer_entry *new_node) 
    296 { 
    297     if (ht->cur_size + 2 >= ht->max_size) 
    298         grow_heap(ht); 
    299      
    300     reheap_up( ht, new_node, ht->cur_size, HEAP_PARENT(ht->cur_size)); 
     408     
     409    return PJ_SUCCESS; 
     410} 
     411 
     412static pj_status_t insert_node(pj_timer_heap_t *ht, 
     413                               pj_timer_entry *new_node, 
     414                               const pj_time_val *future_time) 
     415{ 
     416    pj_timer_entry_dup *timer_copy; 
     417 
     418    if (ht->cur_size + 2 >= ht->max_size) { 
     419        pj_status_t status = grow_heap(ht); 
     420        if (status != PJ_SUCCESS) 
     421            return status; 
     422    } 
     423 
     424    timer_copy = GET_TIMER(ht, new_node); 
     425#if PJ_TIMER_HEAP_USE_COPY     
     426    // Create a duplicate of the timer entry. 
     427    pj_bzero(timer_copy, sizeof(*timer_copy)); 
     428    pj_memcpy(&timer_copy->dup, new_node, sizeof(*new_node)); 
     429    timer_copy->entry = new_node; 
     430#endif 
     431    timer_copy->_timer_value = *future_time; 
     432 
     433    reheap_up( ht, timer_copy, ht->cur_size, HEAP_PARENT(ht->cur_size)); 
    301434    ht->cur_size++; 
     435 
     436    return PJ_SUCCESS; 
    302437} 
    303438 
     
    312447        // Set the entry 
    313448        entry->_timer_id = pop_freelist(ht); 
    314         entry->_timer_value = *future_time; 
    315         insert_node( ht, entry); 
    316         return 0; 
     449 
     450        return insert_node( ht, entry, future_time ); 
    317451    } 
    318452    else 
     
    325459                   unsigned flags) 
    326460{ 
    327   long timer_node_slot; 
    328  
    329   PJ_CHECK_STACK(); 
    330  
    331   // Check to see if the timer_id is out of range 
    332   if (entry->_timer_id < 0 || (pj_size_t)entry->_timer_id > ht->max_size) { 
    333     entry->_timer_id = -1; 
    334     return 0; 
    335   } 
    336  
    337   timer_node_slot = ht->timer_ids[entry->_timer_id]; 
    338  
    339   if (timer_node_slot < 0) { // Check to see if timer_id is still valid. 
    340     entry->_timer_id = -1; 
    341     return 0; 
    342   } 
    343  
    344   if (entry != ht->heap[timer_node_slot]) 
    345     { 
    346       if ((flags & F_DONT_ASSERT) == 0) 
    347           pj_assert(entry == ht->heap[timer_node_slot]); 
    348       entry->_timer_id = -1; 
    349       return 0; 
    350     } 
    351   else 
    352     { 
    353       remove_node( ht, timer_node_slot); 
    354  
    355       if ((flags & F_DONT_CALL) == 0) 
    356         // Call the close hook. 
    357         (*ht->callback)(ht, entry); 
    358       return 1; 
     461    long timer_node_slot; 
     462 
     463    PJ_CHECK_STACK(); 
     464 
     465    // Check to see if the timer_id is out of range 
     466    if (entry->_timer_id < 0 || (pj_size_t)entry->_timer_id > ht->max_size) { 
     467        entry->_timer_id = -1; 
     468        return 0; 
     469    } 
     470 
     471    timer_node_slot = ht->timer_ids[entry->_timer_id]; 
     472 
     473    if (timer_node_slot < 0) { // Check to see if timer_id is still valid. 
     474        entry->_timer_id = -1; 
     475        return 0; 
     476    } 
     477 
     478    if (entry != GET_ENTRY(ht->heap[timer_node_slot])) { 
     479        if ((flags & F_DONT_ASSERT) == 0) 
     480            pj_assert(entry == GET_ENTRY(ht->heap[timer_node_slot])); 
     481        entry->_timer_id = -1; 
     482        return 0; 
     483    } else { 
     484        remove_node( ht, timer_node_slot); 
     485 
     486        if ((flags & F_DONT_CALL) == 0) { 
     487            // Call the close hook. 
     488            (*ht->callback)(ht, entry); 
     489        } 
     490        return 1; 
    359491    } 
    360492} 
     
    369501           sizeof(pj_timer_heap_t) +  
    370502           /* size of each entry: */ 
    371            (count+2) * (sizeof(pj_timer_entry*)+sizeof(pj_timer_id_t)) + 
     503           (count+2) * (sizeof(pj_timer_entry_dup*)+sizeof(pj_timer_id_t)+ 
     504           sizeof(pj_timer_entry_dup)) + 
    372505           /* lock, pool etc: */ 
    373506           132; 
     
    392525 
    393526    /* Allocate timer heap data structure from the pool */ 
    394     ht = PJ_POOL_ALLOC_T(pool, pj_timer_heap_t); 
     527    ht = PJ_POOL_ZALLOC_T(pool, pj_timer_heap_t); 
    395528    if (!ht) 
    396529        return PJ_ENOMEM; 
     
    408541 
    409542    // Create the heap array. 
    410     ht->heap = (pj_timer_entry**) 
    411                pj_pool_alloc(pool, sizeof(pj_timer_entry*) * size); 
     543    ht->heap = (pj_timer_entry_dup**) 
     544               pj_pool_calloc(pool, size, sizeof(pj_timer_entry_dup*)); 
    412545    if (!ht->heap) 
    413546        return PJ_ENOMEM; 
     547 
     548#if PJ_TIMER_HEAP_USE_COPY 
     549    // Create the timer entry copies array. 
     550    ht->timer_dups = (pj_timer_entry_dup*) 
     551                     pj_pool_alloc(pool, sizeof(pj_timer_entry_dup) * size); 
     552    if (!ht->timer_dups) 
     553        return PJ_ENOMEM; 
     554#endif 
    414555 
    415556    // Create the parallel 
     
    468609    entry->user_data = user_data; 
    469610    entry->cb = cb; 
     611#if !PJ_TIMER_HEAP_USE_COPY 
    470612    entry->_grp_lock = NULL; 
     613#endif 
    471614 
    472615    return entry; 
     
    505648    //PJ_ASSERT_RETURN(entry->_timer_id < 1, PJ_EINVALIDOP); 
    506649 
    507 #if PJ_TIMER_DEBUG 
    508     entry->src_file = src_file; 
    509     entry->src_line = src_line; 
    510 #endif 
    511650    pj_gettickcount(&expires); 
    512651    PJ_TIME_VAL_ADD(expires, *delay); 
     
    517656    if (pj_timer_entry_running(entry)) { 
    518657        unlock_timer_heap(ht); 
    519         PJ_LOG(3,(THIS_FILE, "Bug! Rescheduling outstanding entry (%p)", 
     658        PJ_LOG(3,(THIS_FILE, "Warning! Rescheduling outstanding entry (%p)", 
    520659                  entry)); 
    521660        return PJ_EINVALIDOP; 
     
    524663    status = schedule_entry(ht, entry, &expires); 
    525664    if (status == PJ_SUCCESS) { 
     665        pj_timer_entry_dup *timer_copy = GET_TIMER(ht, entry); 
     666 
    526667        if (set_id) 
    527             entry->id = id_val; 
    528         entry->_grp_lock = grp_lock; 
    529         if (entry->_grp_lock) { 
    530             pj_grp_lock_add_ref(entry->_grp_lock); 
     668            GET_FIELD(timer_copy, id) = entry->id = id_val; 
     669        timer_copy->_grp_lock = grp_lock; 
     670        if (timer_copy->_grp_lock) { 
     671            pj_grp_lock_add_ref(timer_copy->_grp_lock); 
    531672        } 
     673#if PJ_TIMER_DEBUG 
     674        timer_copy->src_file = src_file; 
     675        timer_copy->src_line = src_line; 
     676#endif 
    532677    } 
    533678    unlock_timer_heap(ht); 
     
    584729                        int id_val) 
    585730{ 
     731    pj_timer_entry_dup *timer_copy; 
     732    pj_grp_lock_t *grp_lock; 
    586733    int count; 
    587734 
     
    589736 
    590737    lock_timer_heap(ht); 
     738    timer_copy = GET_TIMER(ht, entry); 
     739    grp_lock = timer_copy->_grp_lock; 
     740 
    591741    count = cancel(ht, entry, flags | F_DONT_CALL); 
    592742    if (count > 0) { 
     
    595745            entry->id = id_val; 
    596746        } 
    597         if (entry->_grp_lock) { 
    598             pj_grp_lock_t *grp_lock = entry->_grp_lock; 
    599             entry->_grp_lock = NULL; 
     747        if (grp_lock) { 
    600748            pj_grp_lock_dec_ref(grp_lock); 
    601749        } 
     
    641789            count < ht->max_entries_per_poll )  
    642790    { 
    643         pj_timer_entry *node = remove_node(ht, 0); 
     791        pj_timer_entry_dup *node = remove_node(ht, 0); 
     792        pj_timer_entry *entry = GET_ENTRY(node); 
    644793        /* Avoid re-use of this timer until the callback is done. */ 
    645794        ///Not necessary, even causes problem (see also #2176). 
    646795        ///pj_timer_id_t node_timer_id = pop_freelist(ht); 
    647796        pj_grp_lock_t *grp_lock; 
     797        pj_bool_t valid = PJ_TRUE; 
    648798 
    649799        ++count; 
     
    651801        grp_lock = node->_grp_lock; 
    652802        node->_grp_lock = NULL; 
     803        if (GET_FIELD(node, cb) != entry->cb || 
     804            GET_FIELD(node, user_data) != entry->user_data) 
     805        { 
     806            valid = PJ_FALSE; 
     807            PJ_LOG(3,(THIS_FILE, "Bug! Polling entry %p from %s line %d has " 
     808                                 "been deallocated without being cancelled", 
     809                                 GET_ENTRY(node), 
     810#if PJ_TIMER_DEBUG 
     811                                 node->src_file, node->src_line)); 
     812#else 
     813                                 "N/A", 0)); 
     814#endif 
     815#if ASSERT_IF_ENTRY_DESTROYED 
     816            pj_assert(node->dup.cb == entry->cb); 
     817            pj_assert(node->dup.user_data == entry->user_data); 
     818#endif 
     819        } 
    653820 
    654821        unlock_timer_heap(ht); 
     
    656823        PJ_RACE_ME(5); 
    657824 
    658         if (node->cb) 
    659             (*node->cb)(ht, node); 
    660  
    661         if (grp_lock) 
     825        if (valid && entry->cb) 
     826            (*entry->cb)(ht, entry); 
     827 
     828        if (valid && grp_lock) 
    662829            pj_grp_lock_dec_ref(grp_lock); 
    663830 
     
    720887 
    721888        for (i=0; i<(unsigned)ht->cur_size; ++i) { 
    722             pj_timer_entry *e = ht->heap[i]; 
     889            pj_timer_entry_dup *e = ht->heap[i]; 
    723890            pj_time_val delta; 
    724891 
     
    731898 
    732899            PJ_LOG(3,(THIS_FILE, "    %d\t%d\t%d.%03d\t%s:%d", 
    733                       e->_timer_id, e->id, 
     900                      GET_FIELD(e, _timer_id), GET_FIELD(e, id), 
    734901                      (int)delta.sec, (int)delta.msec, 
    735902                      e->src_file, e->src_line)); 
  • pjproject/trunk/pjlib/src/pjlib-test/timer.c

    r5933 r6058  
    189189 * Stress test * 
    190190 *************** 
    191  * Test scenario: 
     191 * Test scenario (if RANDOMIZED_TEST is 0): 
    192192 * 1. Create and schedule a number of timer entries. 
    193193 * 2. Start threads for polling (simulating normal worker thread). 
     
    196196 * 3. Start threads for cancelling random entries. Each successfully 
    197197 *    cancelled entry will be re-scheduled after some random delay. 
     198 * 
     199 * Test scenario (if RANDOMIZED_TEST is 1): 
     200 * 1. Create and schedule a number of timer entries. 
     201 * 2. Start threads which will, based on a configurable probability 
     202 *    setting, randomly perform timer scheduling, cancelling, or 
     203 *    polling (simulating normal worker thread). 
     204 * This test is considered a failure if: 
     205 * - It triggers assertion/crash. 
     206 * - There's an error message in the log, which indicates a potential 
     207 *   bug in the implementation (note that race message is ok). 
    198208 */ 
    199 #define ST_POLL_THREAD_COUNT        10 
    200 #define ST_CANCEL_THREAD_COUNT      10 
    201  
    202 #define ST_ENTRY_COUNT              1000 
    203 #define ST_ENTRY_MAX_TIMEOUT_MS     100 
     209#define RANDOMIZED_TEST 1 
     210#define SIMULATE_CRASH  PJ_TIMER_HEAP_USE_COPY 
     211 
     212#if RANDOMIZED_TEST 
     213    #define ST_STRESS_THREAD_COUNT          20 
     214    #define ST_POLL_THREAD_COUNT            0 
     215    #define ST_CANCEL_THREAD_COUNT          0 
     216#else 
     217    #define ST_STRESS_THREAD_COUNT          0 
     218    #define ST_POLL_THREAD_COUNT            10 
     219    #define ST_CANCEL_THREAD_COUNT          10 
     220#endif 
     221 
     222#define ST_ENTRY_COUNT              10000 
     223#define ST_DURATION                 30000 
     224#define ST_ENTRY_MAX_TIMEOUT_MS     ST_DURATION/10 
    204225 
    205226/* Number of group lock, may be zero, shared by timer entries, group lock 
     
    216237    pj_bool_t stopping; 
    217238    pj_timer_entry *entries; 
     239    pj_atomic_t **status; 
     240    pj_atomic_t *n_sched, *n_cancel, *n_poll; 
     241    pj_grp_lock_t **grp_locks; 
     242    int err; 
    218243 
    219244    pj_atomic_t *idx; 
     
    227252{ 
    228253    pj_time_val delay = {0}; 
    229     pj_grp_lock_t *grp_lock = (pj_grp_lock_t*)e->user_data; 
     254    pj_grp_lock_t *grp_lock = NULL; 
    230255    pj_status_t status; 
     256    struct thread_param *tparam = (struct thread_param *)e->user_data; 
     257 
     258    if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 
     259        /* About 90% of entries should have group lock */ 
     260        grp_lock = tparam->grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 
     261    } 
    231262 
    232263    delay.msec = pj_rand() % ST_ENTRY_MAX_TIMEOUT_MS; 
     
    236267} 
    237268 
     269static void dummy_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 
     270{ 
     271    PJ_LOG(4,("test", "dummy callback called %p %p", e, e->user_data)); 
     272} 
     273 
    238274static void st_entry_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 
    239275{ 
     276    struct thread_param *tparam = (struct thread_param *)e->user_data; 
     277 
     278#if RANDOMIZED_TEST 
     279    /* Make sure the flag has been set. */ 
     280    while (pj_atomic_get(tparam->status[e - tparam->entries]) != 1) 
     281        pj_thread_sleep(10); 
     282    pj_atomic_set(tparam->status[e - tparam->entries], 0); 
     283#endif 
     284 
    240285    /* try to cancel this */ 
    241286    pj_timer_heap_cancel_if_active(ht, e, 10); 
     
    245290 
    246291    /* reschedule entry */ 
    247     st_schedule_entry(ht, e); 
     292    if (!ST_STRESS_THREAD_COUNT) 
     293        st_schedule_entry(ht, e); 
     294} 
     295 
     296/* Randomized stress worker thread function. */ 
     297static int stress_worker(void *arg) 
     298{ 
     299    /* Enumeration of possible task. */ 
     300    enum { 
     301        SCHEDULING = 0, 
     302        CANCELLING = 1, 
     303        POLLING = 2, 
     304        NOTHING = 3 
     305    }; 
     306    /* Probability of a certain task being chosen. 
     307     * The first number indicates the probability of the first task, 
     308     * the second number for the second task, and so on. 
     309     */ 
     310    int prob[3] = {75, 15, 5}; 
     311    struct thread_param *tparam = (struct thread_param*)arg; 
     312    int t_idx, i; 
     313 
     314    t_idx = pj_atomic_inc_and_get(tparam->idx); 
     315    PJ_LOG(4,("test", "...thread #%d (random) started", t_idx)); 
     316    while (!tparam->stopping) { 
     317        int job, task; 
     318        int idx, count; 
     319        pj_status_t prev_status, status; 
     320 
     321        /* Randomly choose which task to do */ 
     322        job = pj_rand() % 100; 
     323        if (job < prob[0]) task = SCHEDULING; 
     324        else if (job < (prob[0] + prob[1])) task = CANCELLING; 
     325        else if (job < (prob[0] + prob[1] + prob[2])) task = POLLING; 
     326        else task = NOTHING; 
     327     
     328        idx = pj_rand() % ST_ENTRY_COUNT; 
     329        prev_status = pj_atomic_get(tparam->status[idx]); 
     330        if (task == SCHEDULING) { 
     331            if (prev_status != 0) continue; 
     332            status = st_schedule_entry(tparam->timer, &tparam->entries[idx]); 
     333            if (prev_status == 0 && status != PJ_SUCCESS) { 
     334                /* To make sure the flag has been set. */ 
     335                pj_thread_sleep(20); 
     336                if (pj_atomic_get(tparam->status[idx]) == 1) { 
     337                    /* Race condition with another scheduling. */ 
     338                    PJ_LOG(3,("test", "race schedule-schedule %d: %p", 
     339                                      idx, &tparam->entries[idx])); 
     340                } else { 
     341                    if (tparam->err != 0) tparam->err = -210; 
     342                    PJ_LOG(3,("test", "error: failed to schedule entry %d: %p", 
     343                                      idx, &tparam->entries[idx])); 
     344                } 
     345            } else if (prev_status == 1 && status == PJ_SUCCESS) { 
     346                /* Race condition with another cancellation or 
     347                 * timer poll. 
     348                 */ 
     349                pj_thread_sleep(20); 
     350                PJ_LOG(3,("test", "race schedule-cancel/poll %d: %p", 
     351                                  idx, &tparam->entries[idx])); 
     352            } 
     353            if (status == PJ_SUCCESS) { 
     354                pj_atomic_set(tparam->status[idx], 1); 
     355                pj_atomic_inc(tparam->n_sched); 
     356            } 
     357        } else if (task == CANCELLING) { 
     358            count = pj_timer_heap_cancel_if_active(tparam->timer, 
     359                                                   &tparam->entries[idx], 10); 
     360            if (prev_status == 0 && count > 0) { 
     361                /* To make sure the flag has been set. */ 
     362                pj_thread_sleep(20); 
     363                if (pj_atomic_get(tparam->status[idx]) == 1) { 
     364                    /* Race condition with scheduling. */ 
     365                    PJ_LOG(3,("test", "race cancel-schedule %d: %p", 
     366                                      idx, &tparam->entries[idx])); 
     367                } else { 
     368                    if (tparam->err != 0) tparam->err = -220; 
     369                    PJ_LOG(3,("test", "error: cancelling invalid entry %d: %p", 
     370                                      idx, &tparam->entries[idx])); 
     371                } 
     372            } else if (prev_status == 1 && count == 0) { 
     373                /* To make sure the flag has been cleared. */ 
     374                pj_thread_sleep(20); 
     375                if (pj_atomic_get(tparam->status[idx]) == 0) { 
     376                    /* Race condition with polling. */ 
     377                    PJ_LOG(3,("test", "race cancel-poll %d: %p", 
     378                                      idx, &tparam->entries[idx])); 
     379                } else { 
     380                    if (tparam->err != 0) tparam->err = -230; 
     381                    PJ_LOG(3,("test", "error: failed to cancel entry %d: %p", 
     382                                      idx, &tparam->entries[idx])); 
     383                } 
     384            } 
     385            if (count > 0) { 
     386                /* Make sure the flag has been set. */ 
     387                while (pj_atomic_get(tparam->status[idx]) != 1) 
     388                    pj_thread_sleep(10); 
     389                pj_atomic_set(tparam->status[idx], 0); 
     390                pj_atomic_inc(tparam->n_cancel); 
     391            } 
     392        } else if (task == POLLING) { 
     393            count = pj_timer_heap_poll(tparam->timer, NULL); 
     394            for (i = 0; i < count; i++) { 
     395                pj_atomic_inc_and_get(tparam->n_poll); 
     396            } 
     397        } else { 
     398            pj_thread_sleep(10); 
     399        } 
     400    } 
     401    PJ_LOG(4,("test", "...thread #%d (poll) stopped", t_idx)); 
     402 
     403    return 0; 
    248404} 
    249405 
     
    308464static int timer_stress_test(void) 
    309465{ 
     466    unsigned count = 0, n_sched = 0, n_cancel = 0, n_poll = 0; 
    310467    int i; 
    311468    pj_timer_entry *entries = NULL; 
     469    pj_atomic_t **entries_status = NULL; 
    312470    pj_grp_lock_t **grp_locks = NULL; 
    313471    pj_pool_t *pool; 
     
    316474    pj_status_t status; 
    317475    int err=0; 
     476    pj_thread_t **stress_threads = NULL; 
    318477    pj_thread_t **poll_threads = NULL; 
    319478    pj_thread_t **cancel_threads = NULL; 
    320479    struct thread_param tparam = {0}; 
    321480    pj_time_val now; 
     481#if SIMULATE_CRASH 
     482    pj_timer_entry *entry; 
     483    pj_pool_t *tmp_pool; 
     484    pj_time_val delay = {0}; 
     485#endif 
    322486 
    323487    PJ_LOG(3,("test", "...Stress test")); 
     
    333497    } 
    334498 
    335     /* Create timer heap */ 
    336     status = pj_timer_heap_create(pool, ST_ENTRY_COUNT, &timer); 
     499    /* Create timer heap. 
     500     * Initially we only create a fraction of what's required, 
     501     * to test the timer heap growth algorithm. 
     502     */ 
     503    status = pj_timer_heap_create(pool, ST_ENTRY_COUNT/64, &timer); 
    337504    if (status != PJ_SUCCESS) { 
    338505        app_perror("...error: unable to create timer heap", status); 
     
    355522                    pj_pool_calloc(pool, ST_ENTRY_GROUP_LOCK_COUNT, 
    356523                                   sizeof(pj_grp_lock_t*)); 
     524        tparam.grp_locks = grp_locks; 
    357525    } 
    358526    for (i=0; i<ST_ENTRY_GROUP_LOCK_COUNT; ++i) {     
     
    373541        goto on_return; 
    374542    } 
    375  
     543    entries_status = (pj_atomic_t**)pj_pool_calloc(pool, ST_ENTRY_COUNT, 
     544                                                   sizeof(*entries_status)); 
     545    if (!entries_status) { 
     546        err = -55; 
     547        goto on_return; 
     548    } 
     549     
    376550    for (i=0; i<ST_ENTRY_COUNT; ++i) { 
    377         pj_grp_lock_t *grp_lock = NULL; 
    378  
    379         if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 
    380             /* About 90% of entries should have group lock */ 
    381             grp_lock = grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 
    382         } 
    383  
    384         pj_timer_entry_init(&entries[i], 0, grp_lock, &st_entry_callback); 
    385         status = st_schedule_entry(timer, &entries[i]); 
     551        pj_timer_entry_init(&entries[i], 0, &tparam, &st_entry_callback); 
     552 
     553        status = pj_atomic_create(pool, -1, &entries_status[i]); 
    386554        if (status != PJ_SUCCESS) { 
    387             app_perror("...error: unable to schedule entry", status); 
    388555            err = -60; 
    389556            goto on_return; 
     557        } 
     558        pj_atomic_set(entries_status[i], 0); 
     559 
     560        /* For randomized test, we schedule the entry inside the thread */ 
     561        if (!ST_STRESS_THREAD_COUNT) { 
     562            status = st_schedule_entry(timer, &entries[i]); 
     563            if (status != PJ_SUCCESS) { 
     564                app_perror("...error: unable to schedule entry", status); 
     565                err = -60; 
     566                goto on_return; 
     567            } 
    390568        } 
    391569    } 
     
    394572    tparam.timer = timer; 
    395573    tparam.entries = entries; 
     574    tparam.status = entries_status; 
    396575    status = pj_atomic_create(pool, -1, &tparam.idx); 
    397576    if (status != PJ_SUCCESS) { 
     
    399578        err = -70; 
    400579        goto on_return; 
     580    } 
     581    status = pj_atomic_create(pool, -1, &tparam.n_sched); 
     582    pj_assert (status == PJ_SUCCESS); 
     583    pj_atomic_set(tparam.n_sched, 0); 
     584    status = pj_atomic_create(pool, -1, &tparam.n_cancel); 
     585    pj_assert (status == PJ_SUCCESS); 
     586    pj_atomic_set(tparam.n_cancel, 0); 
     587    status = pj_atomic_create(pool, -1, &tparam.n_poll); 
     588    pj_assert (status == PJ_SUCCESS); 
     589    pj_atomic_set(tparam.n_poll, 0); 
     590 
     591    /* Start stress worker threads */ 
     592    if (ST_STRESS_THREAD_COUNT) { 
     593        stress_threads = (pj_thread_t**) 
     594                        pj_pool_calloc(pool, ST_STRESS_THREAD_COUNT, 
     595                                       sizeof(pj_thread_t*)); 
     596    } 
     597    for (i=0; i<ST_STRESS_THREAD_COUNT; ++i) { 
     598        status = pj_thread_create( pool, "poll", &stress_worker, &tparam, 
     599                                   0, 0, &stress_threads[i]); 
     600        if (status != PJ_SUCCESS) { 
     601            app_perror("...error: unable to create stress thread", status); 
     602            err = -75; 
     603            goto on_return; 
     604        } 
    401605    } 
    402606 
     
    433637    } 
    434638 
    435     /* Wait 30s */ 
    436     pj_thread_sleep(30*1000); 
    437  
     639#if SIMULATE_CRASH 
     640    tmp_pool = pj_pool_create( mem, NULL, 4096, 128, NULL); 
     641    pj_assert(tmp_pool); 
     642    entry = (pj_timer_entry*)pj_pool_calloc(tmp_pool, 1, sizeof(*entry)); 
     643    pj_assert(entry); 
     644    pj_timer_entry_init(entry, 0, &tparam, &dummy_callback); 
     645    delay.sec = 6; 
     646    status = pj_timer_heap_schedule(timer, entry, &delay); 
     647    pj_assert(status == PJ_SUCCESS); 
     648    pj_thread_sleep(1000); 
     649    PJ_LOG(3,("test", "...Releasing timer entry %p without cancelling it", 
     650                      entry)); 
     651    pj_pool_secure_release(tmp_pool); 
     652    //pj_pool_release(tmp_pool); 
     653    //pj_memset(tmp_pool, 128, 4096); 
     654#endif 
     655 
     656    /* Wait */ 
     657    pj_thread_sleep(ST_DURATION); 
    438658 
    439659on_return: 
     
    442662    tparam.stopping = PJ_TRUE; 
    443663     
     664    for (i=0; i<ST_STRESS_THREAD_COUNT; ++i) { 
     665        if (!stress_threads[i]) 
     666            continue; 
     667        pj_thread_join(stress_threads[i]); 
     668        pj_thread_destroy(stress_threads[i]); 
     669    } 
     670 
    444671    for (i=0; i<ST_POLL_THREAD_COUNT; ++i) { 
    445672        if (!poll_threads[i]) 
     
    463690 
    464691    for (i=0; i<ST_ENTRY_COUNT; ++i) { 
    465         pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 
     692        count += pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 
     693        if (entries_status) 
     694            pj_atomic_destroy(entries_status[i]); 
    466695    } 
    467696 
     
    478707        pj_timer_heap_destroy(timer); 
    479708 
     709    PJ_LOG(3,("test", "Total memory of timer heap: %d", 
     710                      pj_timer_heap_mem_size(ST_ENTRY_COUNT))); 
     711 
    480712    if (tparam.idx) 
    481713        pj_atomic_destroy(tparam.idx); 
     714    if (tparam.n_sched) { 
     715        n_sched = pj_atomic_get(tparam.n_sched); 
     716        PJ_LOG(3,("test", "Total number of scheduled entries: %d", n_sched)); 
     717        pj_atomic_destroy(tparam.n_sched); 
     718    } 
     719    if (tparam.n_cancel) { 
     720        n_cancel = pj_atomic_get(tparam.n_cancel); 
     721        PJ_LOG(3,("test", "Total number of cancelled entries: %d", n_cancel)); 
     722        pj_atomic_destroy(tparam.n_cancel); 
     723    } 
     724    if (tparam.n_poll) { 
     725        n_poll = pj_atomic_get(tparam.n_poll); 
     726        PJ_LOG(3,("test", "Total number of polled entries: %d", n_poll)); 
     727        pj_atomic_destroy(tparam.n_poll); 
     728    } 
     729    PJ_LOG(3,("test", "Number of remaining active entries: %d", count)); 
     730    if (n_sched) { 
     731        pj_bool_t match = PJ_TRUE; 
     732 
     733#if SIMULATE_CRASH 
     734        n_sched++; 
     735#endif 
     736        if (n_sched != (n_cancel + n_poll + count)) { 
     737            if (tparam.err != 0) tparam.err = -250; 
     738            match = PJ_FALSE; 
     739        } 
     740        PJ_LOG(3,("test", "Scheduled = cancelled + polled + remaining?: %s", 
     741                          (match? "yes": "no"))); 
     742    } 
    482743 
    483744    pj_pool_safe_release(&pool); 
    484745 
    485     return err; 
     746    return (err? err: tparam.err); 
    486747} 
    487748 
Note: See TracChangeset for help on using the changeset viewer.