Changeset 6058 for pjproject/trunk/pjlib/src/pjlib-test/timer.c
- Timestamp:
- Sep 3, 2019 2:10:45 AM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/src/pjlib-test/timer.c
r5933 r6058 189 189 * Stress test * 190 190 *************** 191 * Test scenario :191 * Test scenario (if RANDOMIZED_TEST is 0): 192 192 * 1. Create and schedule a number of timer entries. 193 193 * 2. Start threads for polling (simulating normal worker thread). … … 196 196 * 3. Start threads for cancelling random entries. Each successfully 197 197 * cancelled entry will be re-scheduled after some random delay. 198 * 199 * Test scenario (if RANDOMIZED_TEST is 1): 200 * 1. Create and schedule a number of timer entries. 201 * 2. Start threads which will, based on a configurable probability 202 * setting, randomly perform timer scheduling, cancelling, or 203 * polling (simulating normal worker thread). 204 * This test is considered a failure if: 205 * - It triggers assertion/crash. 206 * - There's an error message in the log, which indicates a potential 207 * bug in the implementation (note that race message is ok). 198 208 */ 199 #define ST_POLL_THREAD_COUNT 10 200 #define ST_CANCEL_THREAD_COUNT 10 201 202 #define ST_ENTRY_COUNT 1000 203 #define ST_ENTRY_MAX_TIMEOUT_MS 100 209 #define RANDOMIZED_TEST 1 210 #define SIMULATE_CRASH PJ_TIMER_HEAP_USE_COPY 211 212 #if RANDOMIZED_TEST 213 #define ST_STRESS_THREAD_COUNT 20 214 #define ST_POLL_THREAD_COUNT 0 215 #define ST_CANCEL_THREAD_COUNT 0 216 #else 217 #define ST_STRESS_THREAD_COUNT 0 218 #define ST_POLL_THREAD_COUNT 10 219 #define ST_CANCEL_THREAD_COUNT 10 220 #endif 221 222 #define ST_ENTRY_COUNT 10000 223 #define ST_DURATION 30000 224 #define ST_ENTRY_MAX_TIMEOUT_MS ST_DURATION/10 204 225 205 226 /* Number of group lock, may be zero, shared by timer entries, group lock … … 216 237 pj_bool_t stopping; 217 238 pj_timer_entry *entries; 239 pj_atomic_t **status; 240 pj_atomic_t *n_sched, *n_cancel, *n_poll; 241 pj_grp_lock_t **grp_locks; 242 int err; 218 243 219 244 pj_atomic_t *idx; … … 227 252 { 228 253 pj_time_val delay = {0}; 229 pj_grp_lock_t *grp_lock = (pj_grp_lock_t*)e->user_data;254 pj_grp_lock_t *grp_lock = NULL; 230 255 pj_status_t status; 256 struct thread_param *tparam = (struct thread_param *)e->user_data; 257 258 if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 259 /* About 90% of entries should have group lock */ 260 grp_lock = tparam->grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 261 } 231 262 232 263 delay.msec = pj_rand() % ST_ENTRY_MAX_TIMEOUT_MS; … … 236 267 } 237 268 269 static void dummy_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 270 { 271 PJ_LOG(4,("test", "dummy callback called %p %p", e, e->user_data)); 272 } 273 238 274 static void st_entry_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 239 275 { 276 struct thread_param *tparam = (struct thread_param *)e->user_data; 277 278 #if RANDOMIZED_TEST 279 /* Make sure the flag has been set. */ 280 while (pj_atomic_get(tparam->status[e - tparam->entries]) != 1) 281 pj_thread_sleep(10); 282 pj_atomic_set(tparam->status[e - tparam->entries], 0); 283 #endif 284 240 285 /* try to cancel this */ 241 286 pj_timer_heap_cancel_if_active(ht, e, 10); … … 245 290 246 291 /* reschedule entry */ 247 st_schedule_entry(ht, e); 292 if (!ST_STRESS_THREAD_COUNT) 293 st_schedule_entry(ht, e); 294 } 295 296 /* Randomized stress worker thread function. */ 297 static int stress_worker(void *arg) 298 { 299 /* Enumeration of possible task. */ 300 enum { 301 SCHEDULING = 0, 302 CANCELLING = 1, 303 POLLING = 2, 304 NOTHING = 3 305 }; 306 /* Probability of a certain task being chosen. 307 * The first number indicates the probability of the first task, 308 * the second number for the second task, and so on. 309 */ 310 int prob[3] = {75, 15, 5}; 311 struct thread_param *tparam = (struct thread_param*)arg; 312 int t_idx, i; 313 314 t_idx = pj_atomic_inc_and_get(tparam->idx); 315 PJ_LOG(4,("test", "...thread #%d (random) started", t_idx)); 316 while (!tparam->stopping) { 317 int job, task; 318 int idx, count; 319 pj_status_t prev_status, status; 320 321 /* Randomly choose which task to do */ 322 job = pj_rand() % 100; 323 if (job < prob[0]) task = SCHEDULING; 324 else if (job < (prob[0] + prob[1])) task = CANCELLING; 325 else if (job < (prob[0] + prob[1] + prob[2])) task = POLLING; 326 else task = NOTHING; 327 328 idx = pj_rand() % ST_ENTRY_COUNT; 329 prev_status = pj_atomic_get(tparam->status[idx]); 330 if (task == SCHEDULING) { 331 if (prev_status != 0) continue; 332 status = st_schedule_entry(tparam->timer, &tparam->entries[idx]); 333 if (prev_status == 0 && status != PJ_SUCCESS) { 334 /* To make sure the flag has been set. */ 335 pj_thread_sleep(20); 336 if (pj_atomic_get(tparam->status[idx]) == 1) { 337 /* Race condition with another scheduling. */ 338 PJ_LOG(3,("test", "race schedule-schedule %d: %p", 339 idx, &tparam->entries[idx])); 340 } else { 341 if (tparam->err != 0) tparam->err = -210; 342 PJ_LOG(3,("test", "error: failed to schedule entry %d: %p", 343 idx, &tparam->entries[idx])); 344 } 345 } else if (prev_status == 1 && status == PJ_SUCCESS) { 346 /* Race condition with another cancellation or 347 * timer poll. 348 */ 349 pj_thread_sleep(20); 350 PJ_LOG(3,("test", "race schedule-cancel/poll %d: %p", 351 idx, &tparam->entries[idx])); 352 } 353 if (status == PJ_SUCCESS) { 354 pj_atomic_set(tparam->status[idx], 1); 355 pj_atomic_inc(tparam->n_sched); 356 } 357 } else if (task == CANCELLING) { 358 count = pj_timer_heap_cancel_if_active(tparam->timer, 359 &tparam->entries[idx], 10); 360 if (prev_status == 0 && count > 0) { 361 /* To make sure the flag has been set. */ 362 pj_thread_sleep(20); 363 if (pj_atomic_get(tparam->status[idx]) == 1) { 364 /* Race condition with scheduling. */ 365 PJ_LOG(3,("test", "race cancel-schedule %d: %p", 366 idx, &tparam->entries[idx])); 367 } else { 368 if (tparam->err != 0) tparam->err = -220; 369 PJ_LOG(3,("test", "error: cancelling invalid entry %d: %p", 370 idx, &tparam->entries[idx])); 371 } 372 } else if (prev_status == 1 && count == 0) { 373 /* To make sure the flag has been cleared. */ 374 pj_thread_sleep(20); 375 if (pj_atomic_get(tparam->status[idx]) == 0) { 376 /* Race condition with polling. */ 377 PJ_LOG(3,("test", "race cancel-poll %d: %p", 378 idx, &tparam->entries[idx])); 379 } else { 380 if (tparam->err != 0) tparam->err = -230; 381 PJ_LOG(3,("test", "error: failed to cancel entry %d: %p", 382 idx, &tparam->entries[idx])); 383 } 384 } 385 if (count > 0) { 386 /* Make sure the flag has been set. */ 387 while (pj_atomic_get(tparam->status[idx]) != 1) 388 pj_thread_sleep(10); 389 pj_atomic_set(tparam->status[idx], 0); 390 pj_atomic_inc(tparam->n_cancel); 391 } 392 } else if (task == POLLING) { 393 count = pj_timer_heap_poll(tparam->timer, NULL); 394 for (i = 0; i < count; i++) { 395 pj_atomic_inc_and_get(tparam->n_poll); 396 } 397 } else { 398 pj_thread_sleep(10); 399 } 400 } 401 PJ_LOG(4,("test", "...thread #%d (poll) stopped", t_idx)); 402 403 return 0; 248 404 } 249 405 … … 308 464 static int timer_stress_test(void) 309 465 { 466 unsigned count = 0, n_sched = 0, n_cancel = 0, n_poll = 0; 310 467 int i; 311 468 pj_timer_entry *entries = NULL; 469 pj_atomic_t **entries_status = NULL; 312 470 pj_grp_lock_t **grp_locks = NULL; 313 471 pj_pool_t *pool; … … 316 474 pj_status_t status; 317 475 int err=0; 476 pj_thread_t **stress_threads = NULL; 318 477 pj_thread_t **poll_threads = NULL; 319 478 pj_thread_t **cancel_threads = NULL; 320 479 struct thread_param tparam = {0}; 321 480 pj_time_val now; 481 #if SIMULATE_CRASH 482 pj_timer_entry *entry; 483 pj_pool_t *tmp_pool; 484 pj_time_val delay = {0}; 485 #endif 322 486 323 487 PJ_LOG(3,("test", "...Stress test")); … … 333 497 } 334 498 335 /* Create timer heap */ 336 status = pj_timer_heap_create(pool, ST_ENTRY_COUNT, &timer); 499 /* Create timer heap. 500 * Initially we only create a fraction of what's required, 501 * to test the timer heap growth algorithm. 502 */ 503 status = pj_timer_heap_create(pool, ST_ENTRY_COUNT/64, &timer); 337 504 if (status != PJ_SUCCESS) { 338 505 app_perror("...error: unable to create timer heap", status); … … 355 522 pj_pool_calloc(pool, ST_ENTRY_GROUP_LOCK_COUNT, 356 523 sizeof(pj_grp_lock_t*)); 524 tparam.grp_locks = grp_locks; 357 525 } 358 526 for (i=0; i<ST_ENTRY_GROUP_LOCK_COUNT; ++i) { … … 373 541 goto on_return; 374 542 } 375 543 entries_status = (pj_atomic_t**)pj_pool_calloc(pool, ST_ENTRY_COUNT, 544 sizeof(*entries_status)); 545 if (!entries_status) { 546 err = -55; 547 goto on_return; 548 } 549 376 550 for (i=0; i<ST_ENTRY_COUNT; ++i) { 377 pj_grp_lock_t *grp_lock = NULL; 378 379 if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 380 /* About 90% of entries should have group lock */ 381 grp_lock = grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 382 } 383 384 pj_timer_entry_init(&entries[i], 0, grp_lock, &st_entry_callback); 385 status = st_schedule_entry(timer, &entries[i]); 551 pj_timer_entry_init(&entries[i], 0, &tparam, &st_entry_callback); 552 553 status = pj_atomic_create(pool, -1, &entries_status[i]); 386 554 if (status != PJ_SUCCESS) { 387 app_perror("...error: unable to schedule entry", status);388 555 err = -60; 389 556 goto on_return; 557 } 558 pj_atomic_set(entries_status[i], 0); 559 560 /* For randomized test, we schedule the entry inside the thread */ 561 if (!ST_STRESS_THREAD_COUNT) { 562 status = st_schedule_entry(timer, &entries[i]); 563 if (status != PJ_SUCCESS) { 564 app_perror("...error: unable to schedule entry", status); 565 err = -60; 566 goto on_return; 567 } 390 568 } 391 569 } … … 394 572 tparam.timer = timer; 395 573 tparam.entries = entries; 574 tparam.status = entries_status; 396 575 status = pj_atomic_create(pool, -1, &tparam.idx); 397 576 if (status != PJ_SUCCESS) { … … 399 578 err = -70; 400 579 goto on_return; 580 } 581 status = pj_atomic_create(pool, -1, &tparam.n_sched); 582 pj_assert (status == PJ_SUCCESS); 583 pj_atomic_set(tparam.n_sched, 0); 584 status = pj_atomic_create(pool, -1, &tparam.n_cancel); 585 pj_assert (status == PJ_SUCCESS); 586 pj_atomic_set(tparam.n_cancel, 0); 587 status = pj_atomic_create(pool, -1, &tparam.n_poll); 588 pj_assert (status == PJ_SUCCESS); 589 pj_atomic_set(tparam.n_poll, 0); 590 591 /* Start stress worker threads */ 592 if (ST_STRESS_THREAD_COUNT) { 593 stress_threads = (pj_thread_t**) 594 pj_pool_calloc(pool, ST_STRESS_THREAD_COUNT, 595 sizeof(pj_thread_t*)); 596 } 597 for (i=0; i<ST_STRESS_THREAD_COUNT; ++i) { 598 status = pj_thread_create( pool, "poll", &stress_worker, &tparam, 599 0, 0, &stress_threads[i]); 600 if (status != PJ_SUCCESS) { 601 app_perror("...error: unable to create stress thread", status); 602 err = -75; 603 goto on_return; 604 } 401 605 } 402 606 … … 433 637 } 434 638 435 /* Wait 30s */ 436 pj_thread_sleep(30*1000); 437 639 #if SIMULATE_CRASH 640 tmp_pool = pj_pool_create( mem, NULL, 4096, 128, NULL); 641 pj_assert(tmp_pool); 642 entry = (pj_timer_entry*)pj_pool_calloc(tmp_pool, 1, sizeof(*entry)); 643 pj_assert(entry); 644 pj_timer_entry_init(entry, 0, &tparam, &dummy_callback); 645 delay.sec = 6; 646 status = pj_timer_heap_schedule(timer, entry, &delay); 647 pj_assert(status == PJ_SUCCESS); 648 pj_thread_sleep(1000); 649 PJ_LOG(3,("test", "...Releasing timer entry %p without cancelling it", 650 entry)); 651 pj_pool_secure_release(tmp_pool); 652 //pj_pool_release(tmp_pool); 653 //pj_memset(tmp_pool, 128, 4096); 654 #endif 655 656 /* Wait */ 657 pj_thread_sleep(ST_DURATION); 438 658 439 659 on_return: … … 442 662 tparam.stopping = PJ_TRUE; 443 663 664 for (i=0; i<ST_STRESS_THREAD_COUNT; ++i) { 665 if (!stress_threads[i]) 666 continue; 667 pj_thread_join(stress_threads[i]); 668 pj_thread_destroy(stress_threads[i]); 669 } 670 444 671 for (i=0; i<ST_POLL_THREAD_COUNT; ++i) { 445 672 if (!poll_threads[i]) … … 463 690 464 691 for (i=0; i<ST_ENTRY_COUNT; ++i) { 465 pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 692 count += pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 693 if (entries_status) 694 pj_atomic_destroy(entries_status[i]); 466 695 } 467 696 … … 478 707 pj_timer_heap_destroy(timer); 479 708 709 PJ_LOG(3,("test", "Total memory of timer heap: %d", 710 pj_timer_heap_mem_size(ST_ENTRY_COUNT))); 711 480 712 if (tparam.idx) 481 713 pj_atomic_destroy(tparam.idx); 714 if (tparam.n_sched) { 715 n_sched = pj_atomic_get(tparam.n_sched); 716 PJ_LOG(3,("test", "Total number of scheduled entries: %d", n_sched)); 717 pj_atomic_destroy(tparam.n_sched); 718 } 719 if (tparam.n_cancel) { 720 n_cancel = pj_atomic_get(tparam.n_cancel); 721 PJ_LOG(3,("test", "Total number of cancelled entries: %d", n_cancel)); 722 pj_atomic_destroy(tparam.n_cancel); 723 } 724 if (tparam.n_poll) { 725 n_poll = pj_atomic_get(tparam.n_poll); 726 PJ_LOG(3,("test", "Total number of polled entries: %d", n_poll)); 727 pj_atomic_destroy(tparam.n_poll); 728 } 729 PJ_LOG(3,("test", "Number of remaining active entries: %d", count)); 730 if (n_sched) { 731 pj_bool_t match = PJ_TRUE; 732 733 #if SIMULATE_CRASH 734 n_sched++; 735 #endif 736 if (n_sched != (n_cancel + n_poll + count)) { 737 if (tparam.err != 0) tparam.err = -250; 738 match = PJ_FALSE; 739 } 740 PJ_LOG(3,("test", "Scheduled = cancelled + polled + remaining?: %s", 741 (match? "yes": "no"))); 742 } 482 743 483 744 pj_pool_safe_release(&pool); 484 745 485 return err;746 return (err? err: tparam.err); 486 747 } 487 748
Note: See TracChangeset
for help on using the changeset viewer.