Ignore:
Timestamp:
Feb 13, 2019 6:41:34 AM (3 months ago)
Author:
nanang
Message:

Re #2176: added timer stress test into pjlib-test.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pjlib-test/timer.c

    r5170 r5933  
    6363    pj_size_t size; 
    6464    unsigned count; 
     65 
     66    PJ_LOG(3,("test", "...Basic test")); 
    6567 
    6668    size = pj_timer_heap_mem_size(MAX_COUNT)+MAX_COUNT*sizeof(pj_timer_entry); 
     
    184186 
    185187 
     188/*************** 
     189 * Stress test * 
     190 *************** 
     191 * Test scenario: 
     192 * 1. Create and schedule a number of timer entries. 
     193 * 2. Start threads for polling (simulating normal worker thread). 
     194 *    Each expired entry will try to cancel and re-schedule itself 
     195 *    from within the callback. 
     196 * 3. Start threads for cancelling random entries. Each successfully 
     197 *    cancelled entry will be re-scheduled after some random delay. 
     198 */ 
     199#define ST_POLL_THREAD_COUNT        10 
     200#define ST_CANCEL_THREAD_COUNT      10 
     201 
     202#define ST_ENTRY_COUNT              1000 
     203#define ST_ENTRY_MAX_TIMEOUT_MS     100 
     204 
     205/* Number of group lock, may be zero, shared by timer entries, group lock 
     206 * can be useful to evaluate poll vs cancel race condition scenario, i.e: 
     207 * each group lock must have ref count==1 at the end of the test, otherwise 
     208 * assertion will raise. 
     209 */ 
     210#define ST_ENTRY_GROUP_LOCK_COUNT   1 
     211 
     212 
     213struct thread_param 
     214{ 
     215    pj_timer_heap_t *timer; 
     216    pj_bool_t stopping; 
     217    pj_timer_entry *entries; 
     218 
     219    pj_atomic_t *idx; 
     220    struct { 
     221        pj_bool_t is_poll; 
     222        unsigned cnt; 
     223    } stat[ST_POLL_THREAD_COUNT + ST_CANCEL_THREAD_COUNT]; 
     224}; 
     225 
     226static pj_status_t st_schedule_entry(pj_timer_heap_t *ht, pj_timer_entry *e) 
     227{ 
     228    pj_time_val delay = {0}; 
     229    pj_grp_lock_t *grp_lock = (pj_grp_lock_t*)e->user_data; 
     230    pj_status_t status; 
     231 
     232    delay.msec = pj_rand() % ST_ENTRY_MAX_TIMEOUT_MS; 
     233    pj_time_val_normalize(&delay); 
     234    status = pj_timer_heap_schedule_w_grp_lock(ht, e, &delay, 1, grp_lock); 
     235    return status; 
     236} 
     237 
     238static void st_entry_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 
     239{ 
     240    /* try to cancel this */ 
     241    pj_timer_heap_cancel_if_active(ht, e, 10); 
     242     
     243    /* busy doing something */ 
     244    pj_thread_sleep(pj_rand() % 50); 
     245 
     246    /* reschedule entry */ 
     247    st_schedule_entry(ht, e); 
     248} 
     249 
     250/* Poll worker thread function. */ 
     251static int poll_worker(void *arg) 
     252{ 
     253    struct thread_param *tparam = (struct thread_param*)arg; 
     254    int idx; 
     255 
     256    idx = pj_atomic_inc_and_get(tparam->idx); 
     257    tparam->stat[idx].is_poll = PJ_TRUE; 
     258 
     259    PJ_LOG(4,("test", "...thread #%d (poll) started", idx)); 
     260    while (!tparam->stopping) { 
     261        unsigned count; 
     262        count = pj_timer_heap_poll(tparam->timer, NULL); 
     263        if (count > 0) { 
     264            /* Count expired entries */ 
     265            PJ_LOG(5,("test", "...thread #%d called %d entries", 
     266                      idx, count)); 
     267            tparam->stat[idx].cnt += count; 
     268        } else { 
     269            pj_thread_sleep(10); 
     270        } 
     271    } 
     272    PJ_LOG(4,("test", "...thread #%d (poll) stopped", idx)); 
     273 
     274    return 0; 
     275} 
     276 
     277/* Cancel worker thread function. */ 
     278static int cancel_worker(void *arg) 
     279{ 
     280    struct thread_param *tparam = (struct thread_param*)arg; 
     281    int idx; 
     282 
     283    idx = pj_atomic_inc_and_get(tparam->idx); 
     284    tparam->stat[idx].is_poll = PJ_FALSE; 
     285 
     286    PJ_LOG(4,("test", "...thread #%d (cancel) started", idx)); 
     287    while (!tparam->stopping) { 
     288        int count; 
     289        pj_timer_entry *e = &tparam->entries[pj_rand() % ST_ENTRY_COUNT]; 
     290 
     291        count = pj_timer_heap_cancel_if_active(tparam->timer, e, 2); 
     292        if (count > 0) { 
     293            /* Count cancelled entries */ 
     294            PJ_LOG(5,("test", "...thread #%d cancelled %d entries", 
     295                      idx, count)); 
     296            tparam->stat[idx].cnt += count; 
     297 
     298            /* Reschedule entry after some delay */ 
     299            pj_thread_sleep(pj_rand() % 100); 
     300            st_schedule_entry(tparam->timer, e); 
     301        } 
     302    } 
     303    PJ_LOG(4,("test", "...thread #%d (cancel) stopped", idx)); 
     304 
     305    return 0; 
     306} 
     307 
     308static int timer_stress_test(void) 
     309{ 
     310    int i; 
     311    pj_timer_entry *entries = NULL; 
     312    pj_grp_lock_t **grp_locks = NULL; 
     313    pj_pool_t *pool; 
     314    pj_timer_heap_t *timer = NULL; 
     315    pj_lock_t *timer_lock; 
     316    pj_status_t status; 
     317    int err=0; 
     318    pj_thread_t **poll_threads = NULL; 
     319    pj_thread_t **cancel_threads = NULL; 
     320    struct thread_param tparam = {0}; 
     321    pj_time_val now; 
     322 
     323    PJ_LOG(3,("test", "...Stress test")); 
     324 
     325    pj_gettimeofday(&now); 
     326    pj_srand(now.sec); 
     327 
     328    pool = pj_pool_create( mem, NULL, 128, 128, NULL); 
     329    if (!pool) { 
     330        PJ_LOG(3,("test", "...error: unable to create pool")); 
     331        err = -10; 
     332        goto on_return; 
     333    } 
     334 
     335    /* Create timer heap */ 
     336    status = pj_timer_heap_create(pool, ST_ENTRY_COUNT, &timer); 
     337    if (status != PJ_SUCCESS) { 
     338        app_perror("...error: unable to create timer heap", status); 
     339        err = -20; 
     340        goto on_return; 
     341    } 
     342 
     343    /* Set recursive lock for the timer heap. */ 
     344    status = pj_lock_create_recursive_mutex( pool, "lock", &timer_lock); 
     345    if (status != PJ_SUCCESS) { 
     346        app_perror("...error: unable to create lock", status); 
     347        err = -30; 
     348        goto on_return; 
     349    } 
     350    pj_timer_heap_set_lock(timer, timer_lock, PJ_TRUE); 
     351 
     352    /* Create group locks for the timer entry. */ 
     353    if (ST_ENTRY_GROUP_LOCK_COUNT) { 
     354        grp_locks = (pj_grp_lock_t**) 
     355                    pj_pool_calloc(pool, ST_ENTRY_GROUP_LOCK_COUNT, 
     356                                   sizeof(pj_grp_lock_t*)); 
     357    } 
     358    for (i=0; i<ST_ENTRY_GROUP_LOCK_COUNT; ++i) {     
     359        status = pj_grp_lock_create(pool, NULL, &grp_locks[i]); 
     360        if (status != PJ_SUCCESS) { 
     361            app_perror("...error: unable to create group lock", status); 
     362            err = -40; 
     363            goto on_return; 
     364        } 
     365        pj_grp_lock_add_ref(grp_locks[i]); 
     366    } 
     367 
     368    /* Create and schedule timer entries */ 
     369    entries = (pj_timer_entry*)pj_pool_calloc(pool, ST_ENTRY_COUNT, 
     370                                              sizeof(*entries)); 
     371    if (!entries) { 
     372        err = -50; 
     373        goto on_return; 
     374    } 
     375 
     376    for (i=0; i<ST_ENTRY_COUNT; ++i) { 
     377        pj_grp_lock_t *grp_lock = NULL; 
     378 
     379        if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 
     380            /* About 90% of entries should have group lock */ 
     381            grp_lock = grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 
     382        } 
     383 
     384        pj_timer_entry_init(&entries[i], 0, grp_lock, &st_entry_callback); 
     385        status = st_schedule_entry(timer, &entries[i]); 
     386        if (status != PJ_SUCCESS) { 
     387            app_perror("...error: unable to schedule entry", status); 
     388            err = -60; 
     389            goto on_return; 
     390        } 
     391    } 
     392 
     393    tparam.stopping = PJ_FALSE; 
     394    tparam.timer = timer; 
     395    tparam.entries = entries; 
     396    status = pj_atomic_create(pool, -1, &tparam.idx); 
     397    if (status != PJ_SUCCESS) { 
     398        app_perror("...error: unable to create atomic", status); 
     399        err = -70; 
     400        goto on_return; 
     401    } 
     402 
     403    /* Start poll worker threads */ 
     404    if (ST_POLL_THREAD_COUNT) { 
     405        poll_threads = (pj_thread_t**) 
     406                        pj_pool_calloc(pool, ST_POLL_THREAD_COUNT, 
     407                                       sizeof(pj_thread_t*)); 
     408    } 
     409    for (i=0; i<ST_POLL_THREAD_COUNT; ++i) { 
     410        status = pj_thread_create( pool, "poll", &poll_worker, &tparam, 
     411                                   0, 0, &poll_threads[i]); 
     412        if (status != PJ_SUCCESS) { 
     413            app_perror("...error: unable to create poll thread", status); 
     414            err = -80; 
     415            goto on_return; 
     416        } 
     417    } 
     418 
     419    /* Start cancel worker threads */ 
     420    if (ST_CANCEL_THREAD_COUNT) { 
     421        cancel_threads = (pj_thread_t**) 
     422                          pj_pool_calloc(pool, ST_CANCEL_THREAD_COUNT, 
     423                                         sizeof(pj_thread_t*)); 
     424    } 
     425    for (i=0; i<ST_CANCEL_THREAD_COUNT; ++i) { 
     426        status = pj_thread_create( pool, "cancel", &cancel_worker, &tparam, 
     427                                   0, 0, &cancel_threads[i]); 
     428        if (status != PJ_SUCCESS) { 
     429            app_perror("...error: unable to create cancel thread", status); 
     430            err = -90; 
     431            goto on_return; 
     432        } 
     433    } 
     434 
     435    /* Wait 30s */ 
     436    pj_thread_sleep(30*1000); 
     437 
     438 
     439on_return: 
     440     
     441    PJ_LOG(3,("test", "...Cleaning up resources")); 
     442    tparam.stopping = PJ_TRUE; 
     443     
     444    for (i=0; i<ST_POLL_THREAD_COUNT; ++i) { 
     445        if (!poll_threads[i]) 
     446            continue; 
     447        pj_thread_join(poll_threads[i]); 
     448        pj_thread_destroy(poll_threads[i]); 
     449    } 
     450     
     451    for (i=0; i<ST_CANCEL_THREAD_COUNT; ++i) { 
     452        if (!cancel_threads[i]) 
     453            continue; 
     454        pj_thread_join(cancel_threads[i]); 
     455        pj_thread_destroy(cancel_threads[i]); 
     456    } 
     457     
     458    for (i=0; i<ST_POLL_THREAD_COUNT+ST_CANCEL_THREAD_COUNT; ++i) { 
     459        PJ_LOG(3,("test", "...Thread #%d (%s) executed %d entries", 
     460                  i, (tparam.stat[i].is_poll? "poll":"cancel"), 
     461                  tparam.stat[i].cnt)); 
     462    } 
     463 
     464    for (i=0; i<ST_ENTRY_COUNT; ++i) { 
     465        pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 
     466    } 
     467 
     468    for (i=0; i<ST_ENTRY_GROUP_LOCK_COUNT; ++i) { 
     469        /* Ref count must be equal to 1 */ 
     470        if (pj_grp_lock_get_ref(grp_locks[i]) != 1) { 
     471            pj_assert(!"Group lock ref count must be equal to 1"); 
     472            if (!err) err = -100; 
     473        } 
     474        pj_grp_lock_dec_ref(grp_locks[i]); 
     475    } 
     476 
     477    if (timer) 
     478        pj_timer_heap_destroy(timer); 
     479 
     480    if (tparam.idx) 
     481        pj_atomic_destroy(tparam.idx); 
     482 
     483    pj_pool_safe_release(&pool); 
     484 
     485    return err; 
     486} 
     487 
    186488int timer_test() 
    187489{ 
    188     return test_timer_heap(); 
     490    int rc; 
     491 
     492    rc = test_timer_heap(); 
     493    if (rc != 0) 
     494        return rc; 
     495 
     496    rc = timer_stress_test(); 
     497    if (rc != 0) 
     498        return rc; 
     499 
     500    return 0; 
    189501} 
    190502 
Note: See TracChangeset for help on using the changeset viewer.