- Timestamp:
- Feb 13, 2019 6:41:34 AM (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/src/pjlib-test/timer.c
r5170 r5933 63 63 pj_size_t size; 64 64 unsigned count; 65 66 PJ_LOG(3,("test", "...Basic test")); 65 67 66 68 size = pj_timer_heap_mem_size(MAX_COUNT)+MAX_COUNT*sizeof(pj_timer_entry); … … 184 186 185 187 188 /*************** 189 * Stress test * 190 *************** 191 * Test scenario: 192 * 1. Create and schedule a number of timer entries. 193 * 2. Start threads for polling (simulating normal worker thread). 194 * Each expired entry will try to cancel and re-schedule itself 195 * from within the callback. 196 * 3. Start threads for cancelling random entries. Each successfully 197 * cancelled entry will be re-scheduled after some random delay. 198 */ 199 #define ST_POLL_THREAD_COUNT 10 200 #define ST_CANCEL_THREAD_COUNT 10 201 202 #define ST_ENTRY_COUNT 1000 203 #define ST_ENTRY_MAX_TIMEOUT_MS 100 204 205 /* Number of group lock, may be zero, shared by timer entries, group lock 206 * can be useful to evaluate poll vs cancel race condition scenario, i.e: 207 * each group lock must have ref count==1 at the end of the test, otherwise 208 * assertion will raise. 209 */ 210 #define ST_ENTRY_GROUP_LOCK_COUNT 1 211 212 213 struct thread_param 214 { 215 pj_timer_heap_t *timer; 216 pj_bool_t stopping; 217 pj_timer_entry *entries; 218 219 pj_atomic_t *idx; 220 struct { 221 pj_bool_t is_poll; 222 unsigned cnt; 223 } stat[ST_POLL_THREAD_COUNT + ST_CANCEL_THREAD_COUNT]; 224 }; 225 226 static pj_status_t st_schedule_entry(pj_timer_heap_t *ht, pj_timer_entry *e) 227 { 228 pj_time_val delay = {0}; 229 pj_grp_lock_t *grp_lock = (pj_grp_lock_t*)e->user_data; 230 pj_status_t status; 231 232 delay.msec = pj_rand() % ST_ENTRY_MAX_TIMEOUT_MS; 233 pj_time_val_normalize(&delay); 234 status = pj_timer_heap_schedule_w_grp_lock(ht, e, &delay, 1, grp_lock); 235 return status; 236 } 237 238 static void st_entry_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 239 { 240 /* try to cancel this */ 241 pj_timer_heap_cancel_if_active(ht, e, 10); 242 243 /* busy doing something */ 244 pj_thread_sleep(pj_rand() % 50); 245 246 /* reschedule entry */ 247 st_schedule_entry(ht, e); 248 } 249 250 /* Poll worker thread function. */ 251 static int poll_worker(void *arg) 252 { 253 struct thread_param *tparam = (struct thread_param*)arg; 254 int idx; 255 256 idx = pj_atomic_inc_and_get(tparam->idx); 257 tparam->stat[idx].is_poll = PJ_TRUE; 258 259 PJ_LOG(4,("test", "...thread #%d (poll) started", idx)); 260 while (!tparam->stopping) { 261 unsigned count; 262 count = pj_timer_heap_poll(tparam->timer, NULL); 263 if (count > 0) { 264 /* Count expired entries */ 265 PJ_LOG(5,("test", "...thread #%d called %d entries", 266 idx, count)); 267 tparam->stat[idx].cnt += count; 268 } else { 269 pj_thread_sleep(10); 270 } 271 } 272 PJ_LOG(4,("test", "...thread #%d (poll) stopped", idx)); 273 274 return 0; 275 } 276 277 /* Cancel worker thread function. */ 278 static int cancel_worker(void *arg) 279 { 280 struct thread_param *tparam = (struct thread_param*)arg; 281 int idx; 282 283 idx = pj_atomic_inc_and_get(tparam->idx); 284 tparam->stat[idx].is_poll = PJ_FALSE; 285 286 PJ_LOG(4,("test", "...thread #%d (cancel) started", idx)); 287 while (!tparam->stopping) { 288 int count; 289 pj_timer_entry *e = &tparam->entries[pj_rand() % ST_ENTRY_COUNT]; 290 291 count = pj_timer_heap_cancel_if_active(tparam->timer, e, 2); 292 if (count > 0) { 293 /* Count cancelled entries */ 294 PJ_LOG(5,("test", "...thread #%d cancelled %d entries", 295 idx, count)); 296 tparam->stat[idx].cnt += count; 297 298 /* Reschedule entry after some delay */ 299 pj_thread_sleep(pj_rand() % 100); 300 st_schedule_entry(tparam->timer, e); 301 } 302 } 303 PJ_LOG(4,("test", "...thread #%d (cancel) stopped", idx)); 304 305 return 0; 306 } 307 308 static int timer_stress_test(void) 309 { 310 int i; 311 pj_timer_entry *entries = NULL; 312 pj_grp_lock_t **grp_locks = NULL; 313 pj_pool_t *pool; 314 pj_timer_heap_t *timer = NULL; 315 pj_lock_t *timer_lock; 316 pj_status_t status; 317 int err=0; 318 pj_thread_t **poll_threads = NULL; 319 pj_thread_t **cancel_threads = NULL; 320 struct thread_param tparam = {0}; 321 pj_time_val now; 322 323 PJ_LOG(3,("test", "...Stress test")); 324 325 pj_gettimeofday(&now); 326 pj_srand(now.sec); 327 328 pool = pj_pool_create( mem, NULL, 128, 128, NULL); 329 if (!pool) { 330 PJ_LOG(3,("test", "...error: unable to create pool")); 331 err = -10; 332 goto on_return; 333 } 334 335 /* Create timer heap */ 336 status = pj_timer_heap_create(pool, ST_ENTRY_COUNT, &timer); 337 if (status != PJ_SUCCESS) { 338 app_perror("...error: unable to create timer heap", status); 339 err = -20; 340 goto on_return; 341 } 342 343 /* Set recursive lock for the timer heap. */ 344 status = pj_lock_create_recursive_mutex( pool, "lock", &timer_lock); 345 if (status != PJ_SUCCESS) { 346 app_perror("...error: unable to create lock", status); 347 err = -30; 348 goto on_return; 349 } 350 pj_timer_heap_set_lock(timer, timer_lock, PJ_TRUE); 351 352 /* Create group locks for the timer entry. */ 353 if (ST_ENTRY_GROUP_LOCK_COUNT) { 354 grp_locks = (pj_grp_lock_t**) 355 pj_pool_calloc(pool, ST_ENTRY_GROUP_LOCK_COUNT, 356 sizeof(pj_grp_lock_t*)); 357 } 358 for (i=0; i<ST_ENTRY_GROUP_LOCK_COUNT; ++i) { 359 status = pj_grp_lock_create(pool, NULL, &grp_locks[i]); 360 if (status != PJ_SUCCESS) { 361 app_perror("...error: unable to create group lock", status); 362 err = -40; 363 goto on_return; 364 } 365 pj_grp_lock_add_ref(grp_locks[i]); 366 } 367 368 /* Create and schedule timer entries */ 369 entries = (pj_timer_entry*)pj_pool_calloc(pool, ST_ENTRY_COUNT, 370 sizeof(*entries)); 371 if (!entries) { 372 err = -50; 373 goto on_return; 374 } 375 376 for (i=0; i<ST_ENTRY_COUNT; ++i) { 377 pj_grp_lock_t *grp_lock = NULL; 378 379 if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 380 /* About 90% of entries should have group lock */ 381 grp_lock = grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 382 } 383 384 pj_timer_entry_init(&entries[i], 0, grp_lock, &st_entry_callback); 385 status = st_schedule_entry(timer, &entries[i]); 386 if (status != PJ_SUCCESS) { 387 app_perror("...error: unable to schedule entry", status); 388 err = -60; 389 goto on_return; 390 } 391 } 392 393 tparam.stopping = PJ_FALSE; 394 tparam.timer = timer; 395 tparam.entries = entries; 396 status = pj_atomic_create(pool, -1, &tparam.idx); 397 if (status != PJ_SUCCESS) { 398 app_perror("...error: unable to create atomic", status); 399 err = -70; 400 goto on_return; 401 } 402 403 /* Start poll worker threads */ 404 if (ST_POLL_THREAD_COUNT) { 405 poll_threads = (pj_thread_t**) 406 pj_pool_calloc(pool, ST_POLL_THREAD_COUNT, 407 sizeof(pj_thread_t*)); 408 } 409 for (i=0; i<ST_POLL_THREAD_COUNT; ++i) { 410 status = pj_thread_create( pool, "poll", &poll_worker, &tparam, 411 0, 0, &poll_threads[i]); 412 if (status != PJ_SUCCESS) { 413 app_perror("...error: unable to create poll thread", status); 414 err = -80; 415 goto on_return; 416 } 417 } 418 419 /* Start cancel worker threads */ 420 if (ST_CANCEL_THREAD_COUNT) { 421 cancel_threads = (pj_thread_t**) 422 pj_pool_calloc(pool, ST_CANCEL_THREAD_COUNT, 423 sizeof(pj_thread_t*)); 424 } 425 for (i=0; i<ST_CANCEL_THREAD_COUNT; ++i) { 426 status = pj_thread_create( pool, "cancel", &cancel_worker, &tparam, 427 0, 0, &cancel_threads[i]); 428 if (status != PJ_SUCCESS) { 429 app_perror("...error: unable to create cancel thread", status); 430 err = -90; 431 goto on_return; 432 } 433 } 434 435 /* Wait 30s */ 436 pj_thread_sleep(30*1000); 437 438 439 on_return: 440 441 PJ_LOG(3,("test", "...Cleaning up resources")); 442 tparam.stopping = PJ_TRUE; 443 444 for (i=0; i<ST_POLL_THREAD_COUNT; ++i) { 445 if (!poll_threads[i]) 446 continue; 447 pj_thread_join(poll_threads[i]); 448 pj_thread_destroy(poll_threads[i]); 449 } 450 451 for (i=0; i<ST_CANCEL_THREAD_COUNT; ++i) { 452 if (!cancel_threads[i]) 453 continue; 454 pj_thread_join(cancel_threads[i]); 455 pj_thread_destroy(cancel_threads[i]); 456 } 457 458 for (i=0; i<ST_POLL_THREAD_COUNT+ST_CANCEL_THREAD_COUNT; ++i) { 459 PJ_LOG(3,("test", "...Thread #%d (%s) executed %d entries", 460 i, (tparam.stat[i].is_poll? "poll":"cancel"), 461 tparam.stat[i].cnt)); 462 } 463 464 for (i=0; i<ST_ENTRY_COUNT; ++i) { 465 pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 466 } 467 468 for (i=0; i<ST_ENTRY_GROUP_LOCK_COUNT; ++i) { 469 /* Ref count must be equal to 1 */ 470 if (pj_grp_lock_get_ref(grp_locks[i]) != 1) { 471 pj_assert(!"Group lock ref count must be equal to 1"); 472 if (!err) err = -100; 473 } 474 pj_grp_lock_dec_ref(grp_locks[i]); 475 } 476 477 if (timer) 478 pj_timer_heap_destroy(timer); 479 480 if (tparam.idx) 481 pj_atomic_destroy(tparam.idx); 482 483 pj_pool_safe_release(&pool); 484 485 return err; 486 } 487 186 488 int timer_test() 187 489 { 188 return test_timer_heap(); 490 int rc; 491 492 rc = test_timer_heap(); 493 if (rc != 0) 494 return rc; 495 496 rc = timer_stress_test(); 497 if (rc != 0) 498 return rc; 499 500 return 0; 189 501 } 190 502
Note: See TracChangeset
for help on using the changeset viewer.