Ignore:
Timestamp:
Sep 3, 2019 2:10:45 AM (5 years ago)
Author:
ming
Message:

Fixed #2225: Timer heap refactoring

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pjlib-test/timer.c

    r5933 r6058  
    189189 * Stress test * 
    190190 *************** 
    191  * Test scenario: 
     191 * Test scenario (if RANDOMIZED_TEST is 0): 
    192192 * 1. Create and schedule a number of timer entries. 
    193193 * 2. Start threads for polling (simulating normal worker thread). 
     
    196196 * 3. Start threads for cancelling random entries. Each successfully 
    197197 *    cancelled entry will be re-scheduled after some random delay. 
     198 * 
     199 * Test scenario (if RANDOMIZED_TEST is 1): 
     200 * 1. Create and schedule a number of timer entries. 
     201 * 2. Start threads which will, based on a configurable probability 
     202 *    setting, randomly perform timer scheduling, cancelling, or 
     203 *    polling (simulating normal worker thread). 
     204 * This test is considered a failure if: 
     205 * - It triggers assertion/crash. 
     206 * - There's an error message in the log, which indicates a potential 
     207 *   bug in the implementation (note that race message is ok). 
    198208 */ 
    199 #define ST_POLL_THREAD_COUNT        10 
    200 #define ST_CANCEL_THREAD_COUNT      10 
    201  
    202 #define ST_ENTRY_COUNT              1000 
    203 #define ST_ENTRY_MAX_TIMEOUT_MS     100 
     209#define RANDOMIZED_TEST 1 
     210#define SIMULATE_CRASH  PJ_TIMER_HEAP_USE_COPY 
     211 
     212#if RANDOMIZED_TEST 
     213    #define ST_STRESS_THREAD_COUNT          20 
     214    #define ST_POLL_THREAD_COUNT            0 
     215    #define ST_CANCEL_THREAD_COUNT          0 
     216#else 
     217    #define ST_STRESS_THREAD_COUNT          0 
     218    #define ST_POLL_THREAD_COUNT            10 
     219    #define ST_CANCEL_THREAD_COUNT          10 
     220#endif 
     221 
     222#define ST_ENTRY_COUNT              10000 
     223#define ST_DURATION                 30000 
     224#define ST_ENTRY_MAX_TIMEOUT_MS     ST_DURATION/10 
    204225 
    205226/* Number of group lock, may be zero, shared by timer entries, group lock 
     
    216237    pj_bool_t stopping; 
    217238    pj_timer_entry *entries; 
     239    pj_atomic_t **status; 
     240    pj_atomic_t *n_sched, *n_cancel, *n_poll; 
     241    pj_grp_lock_t **grp_locks; 
     242    int err; 
    218243 
    219244    pj_atomic_t *idx; 
     
    227252{ 
    228253    pj_time_val delay = {0}; 
    229     pj_grp_lock_t *grp_lock = (pj_grp_lock_t*)e->user_data; 
     254    pj_grp_lock_t *grp_lock = NULL; 
    230255    pj_status_t status; 
     256    struct thread_param *tparam = (struct thread_param *)e->user_data; 
     257 
     258    if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 
     259        /* About 90% of entries should have group lock */ 
     260        grp_lock = tparam->grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 
     261    } 
    231262 
    232263    delay.msec = pj_rand() % ST_ENTRY_MAX_TIMEOUT_MS; 
     
    236267} 
    237268 
     269static void dummy_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 
     270{ 
     271    PJ_LOG(4,("test", "dummy callback called %p %p", e, e->user_data)); 
     272} 
     273 
    238274static void st_entry_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 
    239275{ 
     276    struct thread_param *tparam = (struct thread_param *)e->user_data; 
     277 
     278#if RANDOMIZED_TEST 
     279    /* Make sure the flag has been set. */ 
     280    while (pj_atomic_get(tparam->status[e - tparam->entries]) != 1) 
     281        pj_thread_sleep(10); 
     282    pj_atomic_set(tparam->status[e - tparam->entries], 0); 
     283#endif 
     284 
    240285    /* try to cancel this */ 
    241286    pj_timer_heap_cancel_if_active(ht, e, 10); 
     
    245290 
    246291    /* reschedule entry */ 
    247     st_schedule_entry(ht, e); 
     292    if (!ST_STRESS_THREAD_COUNT) 
     293        st_schedule_entry(ht, e); 
     294} 
     295 
     296/* Randomized stress worker thread function. */ 
     297static int stress_worker(void *arg) 
     298{ 
     299    /* Enumeration of possible task. */ 
     300    enum { 
     301        SCHEDULING = 0, 
     302        CANCELLING = 1, 
     303        POLLING = 2, 
     304        NOTHING = 3 
     305    }; 
     306    /* Probability of a certain task being chosen. 
     307     * The first number indicates the probability of the first task, 
     308     * the second number for the second task, and so on. 
     309     */ 
     310    int prob[3] = {75, 15, 5}; 
     311    struct thread_param *tparam = (struct thread_param*)arg; 
     312    int t_idx, i; 
     313 
     314    t_idx = pj_atomic_inc_and_get(tparam->idx); 
     315    PJ_LOG(4,("test", "...thread #%d (random) started", t_idx)); 
     316    while (!tparam->stopping) { 
     317        int job, task; 
     318        int idx, count; 
     319        pj_status_t prev_status, status; 
     320 
     321        /* Randomly choose which task to do */ 
     322        job = pj_rand() % 100; 
     323        if (job < prob[0]) task = SCHEDULING; 
     324        else if (job < (prob[0] + prob[1])) task = CANCELLING; 
     325        else if (job < (prob[0] + prob[1] + prob[2])) task = POLLING; 
     326        else task = NOTHING; 
     327     
     328        idx = pj_rand() % ST_ENTRY_COUNT; 
     329        prev_status = pj_atomic_get(tparam->status[idx]); 
     330        if (task == SCHEDULING) { 
     331            if (prev_status != 0) continue; 
     332            status = st_schedule_entry(tparam->timer, &tparam->entries[idx]); 
     333            if (prev_status == 0 && status != PJ_SUCCESS) { 
     334                /* To make sure the flag has been set. */ 
     335                pj_thread_sleep(20); 
     336                if (pj_atomic_get(tparam->status[idx]) == 1) { 
     337                    /* Race condition with another scheduling. */ 
     338                    PJ_LOG(3,("test", "race schedule-schedule %d: %p", 
     339                                      idx, &tparam->entries[idx])); 
     340                } else { 
     341                    if (tparam->err != 0) tparam->err = -210; 
     342                    PJ_LOG(3,("test", "error: failed to schedule entry %d: %p", 
     343                                      idx, &tparam->entries[idx])); 
     344                } 
     345            } else if (prev_status == 1 && status == PJ_SUCCESS) { 
     346                /* Race condition with another cancellation or 
     347                 * timer poll. 
     348                 */ 
     349                pj_thread_sleep(20); 
     350                PJ_LOG(3,("test", "race schedule-cancel/poll %d: %p", 
     351                                  idx, &tparam->entries[idx])); 
     352            } 
     353            if (status == PJ_SUCCESS) { 
     354                pj_atomic_set(tparam->status[idx], 1); 
     355                pj_atomic_inc(tparam->n_sched); 
     356            } 
     357        } else if (task == CANCELLING) { 
     358            count = pj_timer_heap_cancel_if_active(tparam->timer, 
     359                                                   &tparam->entries[idx], 10); 
     360            if (prev_status == 0 && count > 0) { 
     361                /* To make sure the flag has been set. */ 
     362                pj_thread_sleep(20); 
     363                if (pj_atomic_get(tparam->status[idx]) == 1) { 
     364                    /* Race condition with scheduling. */ 
     365                    PJ_LOG(3,("test", "race cancel-schedule %d: %p", 
     366                                      idx, &tparam->entries[idx])); 
     367                } else { 
     368                    if (tparam->err != 0) tparam->err = -220; 
     369                    PJ_LOG(3,("test", "error: cancelling invalid entry %d: %p", 
     370                                      idx, &tparam->entries[idx])); 
     371                } 
     372            } else if (prev_status == 1 && count == 0) { 
     373                /* To make sure the flag has been cleared. */ 
     374                pj_thread_sleep(20); 
     375                if (pj_atomic_get(tparam->status[idx]) == 0) { 
     376                    /* Race condition with polling. */ 
     377                    PJ_LOG(3,("test", "race cancel-poll %d: %p", 
     378                                      idx, &tparam->entries[idx])); 
     379                } else { 
     380                    if (tparam->err != 0) tparam->err = -230; 
     381                    PJ_LOG(3,("test", "error: failed to cancel entry %d: %p", 
     382                                      idx, &tparam->entries[idx])); 
     383                } 
     384            } 
     385            if (count > 0) { 
     386                /* Make sure the flag has been set. */ 
     387                while (pj_atomic_get(tparam->status[idx]) != 1) 
     388                    pj_thread_sleep(10); 
     389                pj_atomic_set(tparam->status[idx], 0); 
     390                pj_atomic_inc(tparam->n_cancel); 
     391            } 
     392        } else if (task == POLLING) { 
     393            count = pj_timer_heap_poll(tparam->timer, NULL); 
     394            for (i = 0; i < count; i++) { 
     395                pj_atomic_inc_and_get(tparam->n_poll); 
     396            } 
     397        } else { 
     398            pj_thread_sleep(10); 
     399        } 
     400    } 
     401    PJ_LOG(4,("test", "...thread #%d (poll) stopped", t_idx)); 
     402 
     403    return 0; 
    248404} 
    249405 
     
    308464static int timer_stress_test(void) 
    309465{ 
     466    unsigned count = 0, n_sched = 0, n_cancel = 0, n_poll = 0; 
    310467    int i; 
    311468    pj_timer_entry *entries = NULL; 
     469    pj_atomic_t **entries_status = NULL; 
    312470    pj_grp_lock_t **grp_locks = NULL; 
    313471    pj_pool_t *pool; 
     
    316474    pj_status_t status; 
    317475    int err=0; 
     476    pj_thread_t **stress_threads = NULL; 
    318477    pj_thread_t **poll_threads = NULL; 
    319478    pj_thread_t **cancel_threads = NULL; 
    320479    struct thread_param tparam = {0}; 
    321480    pj_time_val now; 
     481#if SIMULATE_CRASH 
     482    pj_timer_entry *entry; 
     483    pj_pool_t *tmp_pool; 
     484    pj_time_val delay = {0}; 
     485#endif 
    322486 
    323487    PJ_LOG(3,("test", "...Stress test")); 
     
    333497    } 
    334498 
    335     /* Create timer heap */ 
    336     status = pj_timer_heap_create(pool, ST_ENTRY_COUNT, &timer); 
     499    /* Create timer heap. 
     500     * Initially we only create a fraction of what's required, 
     501     * to test the timer heap growth algorithm. 
     502     */ 
     503    status = pj_timer_heap_create(pool, ST_ENTRY_COUNT/64, &timer); 
    337504    if (status != PJ_SUCCESS) { 
    338505        app_perror("...error: unable to create timer heap", status); 
     
    355522                    pj_pool_calloc(pool, ST_ENTRY_GROUP_LOCK_COUNT, 
    356523                                   sizeof(pj_grp_lock_t*)); 
     524        tparam.grp_locks = grp_locks; 
    357525    } 
    358526    for (i=0; i<ST_ENTRY_GROUP_LOCK_COUNT; ++i) {     
     
    373541        goto on_return; 
    374542    } 
    375  
     543    entries_status = (pj_atomic_t**)pj_pool_calloc(pool, ST_ENTRY_COUNT, 
     544                                                   sizeof(*entries_status)); 
     545    if (!entries_status) { 
     546        err = -55; 
     547        goto on_return; 
     548    } 
     549     
    376550    for (i=0; i<ST_ENTRY_COUNT; ++i) { 
    377         pj_grp_lock_t *grp_lock = NULL; 
    378  
    379         if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 
    380             /* About 90% of entries should have group lock */ 
    381             grp_lock = grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 
    382         } 
    383  
    384         pj_timer_entry_init(&entries[i], 0, grp_lock, &st_entry_callback); 
    385         status = st_schedule_entry(timer, &entries[i]); 
     551        pj_timer_entry_init(&entries[i], 0, &tparam, &st_entry_callback); 
     552 
     553        status = pj_atomic_create(pool, -1, &entries_status[i]); 
    386554        if (status != PJ_SUCCESS) { 
    387             app_perror("...error: unable to schedule entry", status); 
    388555            err = -60; 
    389556            goto on_return; 
     557        } 
     558        pj_atomic_set(entries_status[i], 0); 
     559 
     560        /* For randomized test, we schedule the entry inside the thread */ 
     561        if (!ST_STRESS_THREAD_COUNT) { 
     562            status = st_schedule_entry(timer, &entries[i]); 
     563            if (status != PJ_SUCCESS) { 
     564                app_perror("...error: unable to schedule entry", status); 
     565                err = -60; 
     566                goto on_return; 
     567            } 
    390568        } 
    391569    } 
     
    394572    tparam.timer = timer; 
    395573    tparam.entries = entries; 
     574    tparam.status = entries_status; 
    396575    status = pj_atomic_create(pool, -1, &tparam.idx); 
    397576    if (status != PJ_SUCCESS) { 
     
    399578        err = -70; 
    400579        goto on_return; 
     580    } 
     581    status = pj_atomic_create(pool, -1, &tparam.n_sched); 
     582    pj_assert (status == PJ_SUCCESS); 
     583    pj_atomic_set(tparam.n_sched, 0); 
     584    status = pj_atomic_create(pool, -1, &tparam.n_cancel); 
     585    pj_assert (status == PJ_SUCCESS); 
     586    pj_atomic_set(tparam.n_cancel, 0); 
     587    status = pj_atomic_create(pool, -1, &tparam.n_poll); 
     588    pj_assert (status == PJ_SUCCESS); 
     589    pj_atomic_set(tparam.n_poll, 0); 
     590 
     591    /* Start stress worker threads */ 
     592    if (ST_STRESS_THREAD_COUNT) { 
     593        stress_threads = (pj_thread_t**) 
     594                        pj_pool_calloc(pool, ST_STRESS_THREAD_COUNT, 
     595                                       sizeof(pj_thread_t*)); 
     596    } 
     597    for (i=0; i<ST_STRESS_THREAD_COUNT; ++i) { 
     598        status = pj_thread_create( pool, "poll", &stress_worker, &tparam, 
     599                                   0, 0, &stress_threads[i]); 
     600        if (status != PJ_SUCCESS) { 
     601            app_perror("...error: unable to create stress thread", status); 
     602            err = -75; 
     603            goto on_return; 
     604        } 
    401605    } 
    402606 
     
    433637    } 
    434638 
    435     /* Wait 30s */ 
    436     pj_thread_sleep(30*1000); 
    437  
     639#if SIMULATE_CRASH 
     640    tmp_pool = pj_pool_create( mem, NULL, 4096, 128, NULL); 
     641    pj_assert(tmp_pool); 
     642    entry = (pj_timer_entry*)pj_pool_calloc(tmp_pool, 1, sizeof(*entry)); 
     643    pj_assert(entry); 
     644    pj_timer_entry_init(entry, 0, &tparam, &dummy_callback); 
     645    delay.sec = 6; 
     646    status = pj_timer_heap_schedule(timer, entry, &delay); 
     647    pj_assert(status == PJ_SUCCESS); 
     648    pj_thread_sleep(1000); 
     649    PJ_LOG(3,("test", "...Releasing timer entry %p without cancelling it", 
     650                      entry)); 
     651    pj_pool_secure_release(tmp_pool); 
     652    //pj_pool_release(tmp_pool); 
     653    //pj_memset(tmp_pool, 128, 4096); 
     654#endif 
     655 
     656    /* Wait */ 
     657    pj_thread_sleep(ST_DURATION); 
    438658 
    439659on_return: 
     
    442662    tparam.stopping = PJ_TRUE; 
    443663     
     664    for (i=0; i<ST_STRESS_THREAD_COUNT; ++i) { 
     665        if (!stress_threads[i]) 
     666            continue; 
     667        pj_thread_join(stress_threads[i]); 
     668        pj_thread_destroy(stress_threads[i]); 
     669    } 
     670 
    444671    for (i=0; i<ST_POLL_THREAD_COUNT; ++i) { 
    445672        if (!poll_threads[i]) 
     
    463690 
    464691    for (i=0; i<ST_ENTRY_COUNT; ++i) { 
    465         pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 
     692        count += pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 
     693        if (entries_status) 
     694            pj_atomic_destroy(entries_status[i]); 
    466695    } 
    467696 
     
    478707        pj_timer_heap_destroy(timer); 
    479708 
     709    PJ_LOG(3,("test", "Total memory of timer heap: %d", 
     710                      pj_timer_heap_mem_size(ST_ENTRY_COUNT))); 
     711 
    480712    if (tparam.idx) 
    481713        pj_atomic_destroy(tparam.idx); 
     714    if (tparam.n_sched) { 
     715        n_sched = pj_atomic_get(tparam.n_sched); 
     716        PJ_LOG(3,("test", "Total number of scheduled entries: %d", n_sched)); 
     717        pj_atomic_destroy(tparam.n_sched); 
     718    } 
     719    if (tparam.n_cancel) { 
     720        n_cancel = pj_atomic_get(tparam.n_cancel); 
     721        PJ_LOG(3,("test", "Total number of cancelled entries: %d", n_cancel)); 
     722        pj_atomic_destroy(tparam.n_cancel); 
     723    } 
     724    if (tparam.n_poll) { 
     725        n_poll = pj_atomic_get(tparam.n_poll); 
     726        PJ_LOG(3,("test", "Total number of polled entries: %d", n_poll)); 
     727        pj_atomic_destroy(tparam.n_poll); 
     728    } 
     729    PJ_LOG(3,("test", "Number of remaining active entries: %d", count)); 
     730    if (n_sched) { 
     731        pj_bool_t match = PJ_TRUE; 
     732 
     733#if SIMULATE_CRASH 
     734        n_sched++; 
     735#endif 
     736        if (n_sched != (n_cancel + n_poll + count)) { 
     737            if (tparam.err != 0) tparam.err = -250; 
     738            match = PJ_FALSE; 
     739        } 
     740        PJ_LOG(3,("test", "Scheduled = cancelled + polled + remaining?: %s", 
     741                          (match? "yes": "no"))); 
     742    } 
    482743 
    483744    pj_pool_safe_release(&pool); 
    484745 
    485     return err; 
     746    return (err? err: tparam.err); 
    486747} 
    487748 
Note: See TracChangeset for help on using the changeset viewer.