Changeset 6058
- Timestamp:
- Sep 3, 2019 2:10:45 AM (5 years ago)
- Location:
- pjproject/trunk/pjlib
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/include/pj/config.h
r6031 r6058 533 533 534 534 /** 535 * If enabled, when calling pj_pool_release(), the memory pool content 536 * will be wiped out first before released. 537 * 538 * Default: 0 539 */ 540 #ifndef PJ_POOL_RELEASE_WIPE_DATA 541 # define PJ_POOL_RELEASE_WIPE_DATA 0 542 #endif 543 544 545 /** 535 546 * Enable timer heap debugging facility. When this is enabled, application 536 547 * can call pj_timer_heap_dump() to show the contents of the timer heap … … 538 549 * See https://trac.pjsip.org/repos/ticket/1527 for more info. 539 550 * 540 * Default: 0551 * Default: 1 541 552 */ 542 553 #ifndef PJ_TIMER_DEBUG 543 # define PJ_TIMER_DEBUG 0 554 # define PJ_TIMER_DEBUG 1 555 #endif 556 557 558 /** 559 * If enabled, the timer heap will keep internal copies of the timer entries. 560 * This will increase the robustness and stability of the timer heap (against 561 * accidental modification or premature deallocation of the timer entries) and 562 * makes it easier to troubleshoot any timer related issues, with the overhead 563 * of additional memory space required. 564 * 565 * Note that the detection against premature deallocation only works if the 566 * freed memory content has changed (such as if it's been reallocated and 567 * overwritten by another data. Alternatively, you can enable 568 * PJ_POOL_RELEASE_WIPE_DATA which will erase the data first before releasing 569 * the memory). 570 * 571 * Default: 1 (enabled) 572 */ 573 #ifndef PJ_TIMER_HEAP_USE_COPY 574 # define PJ_TIMER_HEAP_USE_COPY 1 544 575 #endif 545 576 -
pjproject/trunk/pjlib/include/pj/pool_i.h
r5990 r6058 89 89 PJ_IDEF(void) pj_pool_release( pj_pool_t *pool ) 90 90 { 91 #if PJ_POOL_RELEASE_WIPE_DATA 92 pj_pool_block *b; 93 94 b = pool->block_list.next; 95 while (b != &pool->block_list) { 96 volatile unsigned char *p = b->buf; 97 while (p < b->end) *p++ = 0; 98 b = b->next; 99 } 100 #endif 101 91 102 if (pool->factory->release_pool) 92 103 (*pool->factory->release_pool)(pool->factory, pool); -
pjproject/trunk/pjlib/include/pj/timer.h
r5971 r6058 114 114 pj_timer_id_t _timer_id; 115 115 116 #if !PJ_TIMER_HEAP_USE_COPY 116 117 /** 117 118 * The future time when the timer expires, which the value is updated … … 129 130 const char *src_file; 130 131 int src_line; 132 #endif 133 131 134 #endif 132 135 } pj_timer_entry; -
pjproject/trunk/pjlib/src/pj/timer.c
r5971 r6058 47 47 #define DEFAULT_MAX_TIMED_OUT_PER_POLL (64) 48 48 49 /* Enable this to raise assertion in order to catch bug of timer entry 50 * which has been deallocated without being cancelled. If disabled, 51 * the timer heap will simply remove the destroyed entry (and print log) 52 * and resume normally. 53 * This setting only works if PJ_TIMER_HEAP_USE_COPY is enabled. 54 */ 55 #define ASSERT_IF_ENTRY_DESTROYED (PJ_TIMER_HEAP_USE_COPY? 0: 0) 56 57 49 58 enum 50 59 { … … 54 63 }; 55 64 65 #if PJ_TIMER_HEAP_USE_COPY 66 67 /* Duplicate/copy of the timer entry. */ 68 typedef struct pj_timer_entry_dup 69 { 70 /** 71 * The duplicate copy. 72 */ 73 pj_timer_entry dup; 74 75 /** 76 * Pointer of the original timer entry. 77 */ 78 pj_timer_entry *entry; 79 80 /** 81 * The future time when the timer expires, which the value is updated 82 * by timer heap when the timer is scheduled. 83 */ 84 pj_time_val _timer_value; 85 86 /** 87 * Internal: the group lock used by this entry, set when 88 * pj_timer_heap_schedule_w_lock() is used. 89 */ 90 pj_grp_lock_t *_grp_lock; 91 92 #if PJ_TIMER_DEBUG 93 const char *src_file; 94 int src_line; 95 #endif 96 97 } pj_timer_entry_dup; 98 99 #define GET_TIMER(ht, node) &ht->timer_dups[node->_timer_id] 100 #define GET_ENTRY(node) node->entry 101 #define GET_FIELD(node, _timer_id) node->dup._timer_id 102 103 #else 104 105 typedef pj_timer_entry pj_timer_entry_dup; 106 107 #define GET_TIMER(ht, node) node 108 #define GET_ENTRY(node) node 109 #define GET_FIELD(node, _timer_id) node->_timer_id 110 111 #endif 56 112 57 113 /** … … 84 140 * array. 85 141 */ 86 pj_timer_entry **heap;142 pj_timer_entry_dup **heap; 87 143 88 144 /** … … 97 153 */ 98 154 pj_timer_id_t *timer_ids; 155 156 /** 157 * An array of timer entry copies. 158 */ 159 pj_timer_entry_dup *timer_dups; 99 160 100 161 /** … … 127 188 128 189 static void copy_node( pj_timer_heap_t *ht, pj_size_t slot, 129 pj_timer_entry *moved_node )190 pj_timer_entry_dup *moved_node ) 130 191 { 131 192 PJ_CHECK_STACK(); … … 135 196 136 197 // Update the corresponding slot in the parallel <timer_ids_> array. 137 ht->timer_ids[ moved_node->_timer_id] = (int)slot;198 ht->timer_ids[GET_FIELD(moved_node, _timer_id)] = (int)slot; 138 199 } 139 200 … … 165 226 166 227 167 static void reheap_down(pj_timer_heap_t *ht, pj_timer_entry *moved_node,228 static void reheap_down(pj_timer_heap_t *ht, pj_timer_entry_dup *moved_node, 168 229 size_t slot, size_t child) 169 230 { … … 175 236 { 176 237 // Choose the smaller of the two children. 177 if (child + 1 < ht->cur_size 178 && PJ_TIME_VAL_LT(ht->heap[child + 1]->_timer_value, ht->heap[child]->_timer_value)) 238 if (child + 1 < ht->cur_size && 239 PJ_TIME_VAL_LT(ht->heap[child + 1]->_timer_value, 240 ht->heap[child]->_timer_value)) 241 { 179 242 child++; 243 } 180 244 181 245 // Perform a <copy> if the child has a larger timeout value than 182 246 // the <moved_node>. 183 if (PJ_TIME_VAL_LT(ht->heap[child]->_timer_value, moved_node->_timer_value)) 247 if (PJ_TIME_VAL_LT(ht->heap[child]->_timer_value, 248 moved_node->_timer_value)) 184 249 { 185 250 copy_node( ht, slot, ht->heap[child]); … … 195 260 } 196 261 197 static void reheap_up( pj_timer_heap_t *ht, pj_timer_entry *moved_node,262 static void reheap_up( pj_timer_heap_t *ht, pj_timer_entry_dup *moved_node, 198 263 size_t slot, size_t parent) 199 264 { … … 204 269 // If the parent node is greater than the <moved_node> we need 205 270 // to copy it down. 206 if (PJ_TIME_VAL_LT(moved_node->_timer_value, ht->heap[parent]->_timer_value)) 271 if (PJ_TIME_VAL_LT(moved_node->_timer_value, 272 ht->heap[parent]->_timer_value)) 207 273 { 208 274 copy_node(ht, slot, ht->heap[parent]); … … 220 286 221 287 222 static pj_timer_entry * remove_node( pj_timer_heap_t *ht, size_t slot)223 { 224 pj_timer_entry *removed_node = ht->heap[slot];288 static pj_timer_entry_dup * remove_node( pj_timer_heap_t *ht, size_t slot) 289 { 290 pj_timer_entry_dup *removed_node = ht->heap[slot]; 225 291 226 292 // Return this timer id to the freelist. 227 push_freelist( ht, removed_node->_timer_id);293 push_freelist( ht, GET_FIELD(removed_node, _timer_id) ); 228 294 229 295 // Decrement the size of the heap by one since we're removing the … … 232 298 233 299 // Set the ID 234 removed_node->_timer_id = -1; 300 if (GET_FIELD(removed_node, _timer_id) != 301 GET_ENTRY(removed_node)->_timer_id) 302 { 303 PJ_LOG(3,(THIS_FILE, "Bug! Trying to remove entry %p from %s " 304 "line %d, which has been deallocated " 305 "without being cancelled", 306 GET_ENTRY(removed_node), 307 #if PJ_TIMER_DEBUG 308 removed_node->src_file, 309 removed_node->src_line)); 310 #else 311 "N/A", 0)); 312 #endif 313 #if ASSERT_IF_ENTRY_DESTROYED 314 pj_assert(removed_node->dup._timer_id==removed_node->entry->_timer_id); 315 #endif 316 } 317 GET_ENTRY(removed_node)->_timer_id = -1; 318 GET_FIELD(removed_node, _timer_id) = -1; 235 319 236 320 // Only try to reheapify if we're not deleting the last entry. … … 239 323 { 240 324 pj_size_t parent; 241 pj_timer_entry *moved_node = ht->heap[ht->cur_size];325 pj_timer_entry_dup *moved_node = ht->heap[ht->cur_size]; 242 326 243 327 // Move the end node to the location being removed and update … … 249 333 parent = HEAP_PARENT (slot); 250 334 251 if (PJ_TIME_VAL_GTE(moved_node->_timer_value, ht->heap[parent]->_timer_value)) 335 if (PJ_TIME_VAL_GTE(moved_node->_timer_value, 336 ht->heap[parent]->_timer_value)) 337 { 252 338 reheap_down( ht, moved_node, slot, HEAP_LEFT(slot)); 253 else339 } else { 254 340 reheap_up( ht, moved_node, slot, parent); 341 } 255 342 } 256 343 … … 258 345 } 259 346 260 static voidgrow_heap(pj_timer_heap_t *ht)347 static pj_status_t grow_heap(pj_timer_heap_t *ht) 261 348 { 262 349 // All the containers will double in size from max_size_ 263 350 size_t new_size = ht->max_size * 2; 351 pj_timer_entry_dup *new_timer_dups = 0; 264 352 pj_timer_id_t *new_timer_ids; 265 353 pj_size_t i; 266 354 355 PJ_LOG(6,(THIS_FILE, "Growing heap size from %d to %d", 356 ht->max_size, new_size)); 357 267 358 // First grow the heap itself. 268 359 269 pj_timer_entry **new_heap = 0; 270 271 new_heap = (pj_timer_entry**) 272 pj_pool_alloc(ht->pool, sizeof(pj_timer_entry*) * new_size); 273 memcpy(new_heap, ht->heap, ht->max_size * sizeof(pj_timer_entry*)); 274 //delete [] this->heap_; 360 pj_timer_entry_dup **new_heap = 0; 361 362 new_heap = (pj_timer_entry_dup**) 363 pj_pool_calloc(ht->pool, new_size, sizeof(pj_timer_entry_dup*)); 364 if (!new_heap) 365 return PJ_ENOMEM; 366 367 #if PJ_TIMER_HEAP_USE_COPY 368 // Grow the array of timer copies. 369 370 new_timer_dups = (pj_timer_entry_dup*) 371 pj_pool_alloc(ht->pool, 372 sizeof(pj_timer_entry_dup) * new_size); 373 if (!new_timer_dups) 374 return PJ_ENOMEM; 375 376 memcpy(new_timer_dups, ht->timer_dups, 377 ht->max_size * sizeof(pj_timer_entry_dup)); 378 for (i = 0; i < ht->cur_size; i++) { 379 int idx = ht->heap[i] - ht->timer_dups; 380 // Point to the address in the new array 381 pj_assert(idx >= 0 && idx < ht->max_size); 382 new_heap[i] = &new_timer_dups[idx]; 383 } 384 ht->timer_dups = new_timer_dups; 385 #else 386 memcpy(new_heap, ht->heap, ht->max_size * sizeof(pj_timer_entry *)); 387 #endif 275 388 ht->heap = new_heap; 276 389 … … 280 393 new_timer_ids = (pj_timer_id_t*) 281 394 pj_pool_alloc(ht->pool, new_size * sizeof(pj_timer_id_t)); 282 395 if (!new_timer_ids) 396 return PJ_ENOMEM; 397 283 398 memcpy( new_timer_ids, ht->timer_ids, ht->max_size * sizeof(pj_timer_id_t)); 284 399 … … 291 406 292 407 ht->max_size = new_size; 293 } 294 295 static void insert_node(pj_timer_heap_t *ht, pj_timer_entry *new_node) 296 { 297 if (ht->cur_size + 2 >= ht->max_size) 298 grow_heap(ht); 299 300 reheap_up( ht, new_node, ht->cur_size, HEAP_PARENT(ht->cur_size)); 408 409 return PJ_SUCCESS; 410 } 411 412 static pj_status_t insert_node(pj_timer_heap_t *ht, 413 pj_timer_entry *new_node, 414 const pj_time_val *future_time) 415 { 416 pj_timer_entry_dup *timer_copy; 417 418 if (ht->cur_size + 2 >= ht->max_size) { 419 pj_status_t status = grow_heap(ht); 420 if (status != PJ_SUCCESS) 421 return status; 422 } 423 424 timer_copy = GET_TIMER(ht, new_node); 425 #if PJ_TIMER_HEAP_USE_COPY 426 // Create a duplicate of the timer entry. 427 pj_bzero(timer_copy, sizeof(*timer_copy)); 428 pj_memcpy(&timer_copy->dup, new_node, sizeof(*new_node)); 429 timer_copy->entry = new_node; 430 #endif 431 timer_copy->_timer_value = *future_time; 432 433 reheap_up( ht, timer_copy, ht->cur_size, HEAP_PARENT(ht->cur_size)); 301 434 ht->cur_size++; 435 436 return PJ_SUCCESS; 302 437 } 303 438 … … 312 447 // Set the entry 313 448 entry->_timer_id = pop_freelist(ht); 314 entry->_timer_value = *future_time; 315 insert_node( ht, entry); 316 return 0; 449 450 return insert_node( ht, entry, future_time ); 317 451 } 318 452 else … … 325 459 unsigned flags) 326 460 { 327 long timer_node_slot; 328 329 PJ_CHECK_STACK(); 330 331 // Check to see if the timer_id is out of range 332 if (entry->_timer_id < 0 || (pj_size_t)entry->_timer_id > ht->max_size) { 333 entry->_timer_id = -1; 334 return 0; 335 } 336 337 timer_node_slot = ht->timer_ids[entry->_timer_id]; 338 339 if (timer_node_slot < 0) { // Check to see if timer_id is still valid. 340 entry->_timer_id = -1; 341 return 0; 342 } 343 344 if (entry != ht->heap[timer_node_slot]) 345 { 346 if ((flags & F_DONT_ASSERT) == 0) 347 pj_assert(entry == ht->heap[timer_node_slot]); 348 entry->_timer_id = -1; 349 return 0; 350 } 351 else 352 { 353 remove_node( ht, timer_node_slot); 354 355 if ((flags & F_DONT_CALL) == 0) 356 // Call the close hook. 357 (*ht->callback)(ht, entry); 358 return 1; 461 long timer_node_slot; 462 463 PJ_CHECK_STACK(); 464 465 // Check to see if the timer_id is out of range 466 if (entry->_timer_id < 0 || (pj_size_t)entry->_timer_id > ht->max_size) { 467 entry->_timer_id = -1; 468 return 0; 469 } 470 471 timer_node_slot = ht->timer_ids[entry->_timer_id]; 472 473 if (timer_node_slot < 0) { // Check to see if timer_id is still valid. 474 entry->_timer_id = -1; 475 return 0; 476 } 477 478 if (entry != GET_ENTRY(ht->heap[timer_node_slot])) { 479 if ((flags & F_DONT_ASSERT) == 0) 480 pj_assert(entry == GET_ENTRY(ht->heap[timer_node_slot])); 481 entry->_timer_id = -1; 482 return 0; 483 } else { 484 remove_node( ht, timer_node_slot); 485 486 if ((flags & F_DONT_CALL) == 0) { 487 // Call the close hook. 488 (*ht->callback)(ht, entry); 489 } 490 return 1; 359 491 } 360 492 } … … 369 501 sizeof(pj_timer_heap_t) + 370 502 /* size of each entry: */ 371 (count+2) * (sizeof(pj_timer_entry*)+sizeof(pj_timer_id_t)) + 503 (count+2) * (sizeof(pj_timer_entry_dup*)+sizeof(pj_timer_id_t)+ 504 sizeof(pj_timer_entry_dup)) + 372 505 /* lock, pool etc: */ 373 506 132; … … 392 525 393 526 /* Allocate timer heap data structure from the pool */ 394 ht = PJ_POOL_ ALLOC_T(pool, pj_timer_heap_t);527 ht = PJ_POOL_ZALLOC_T(pool, pj_timer_heap_t); 395 528 if (!ht) 396 529 return PJ_ENOMEM; … … 408 541 409 542 // Create the heap array. 410 ht->heap = (pj_timer_entry **)411 pj_pool_ alloc(pool, sizeof(pj_timer_entry*) * size);543 ht->heap = (pj_timer_entry_dup**) 544 pj_pool_calloc(pool, size, sizeof(pj_timer_entry_dup*)); 412 545 if (!ht->heap) 413 546 return PJ_ENOMEM; 547 548 #if PJ_TIMER_HEAP_USE_COPY 549 // Create the timer entry copies array. 550 ht->timer_dups = (pj_timer_entry_dup*) 551 pj_pool_alloc(pool, sizeof(pj_timer_entry_dup) * size); 552 if (!ht->timer_dups) 553 return PJ_ENOMEM; 554 #endif 414 555 415 556 // Create the parallel … … 468 609 entry->user_data = user_data; 469 610 entry->cb = cb; 611 #if !PJ_TIMER_HEAP_USE_COPY 470 612 entry->_grp_lock = NULL; 613 #endif 471 614 472 615 return entry; … … 505 648 //PJ_ASSERT_RETURN(entry->_timer_id < 1, PJ_EINVALIDOP); 506 649 507 #if PJ_TIMER_DEBUG508 entry->src_file = src_file;509 entry->src_line = src_line;510 #endif511 650 pj_gettickcount(&expires); 512 651 PJ_TIME_VAL_ADD(expires, *delay); … … 517 656 if (pj_timer_entry_running(entry)) { 518 657 unlock_timer_heap(ht); 519 PJ_LOG(3,(THIS_FILE, " Bug! Rescheduling outstanding entry (%p)",658 PJ_LOG(3,(THIS_FILE, "Warning! Rescheduling outstanding entry (%p)", 520 659 entry)); 521 660 return PJ_EINVALIDOP; … … 524 663 status = schedule_entry(ht, entry, &expires); 525 664 if (status == PJ_SUCCESS) { 665 pj_timer_entry_dup *timer_copy = GET_TIMER(ht, entry); 666 526 667 if (set_id) 527 entry->id = id_val;528 entry->_grp_lock = grp_lock;529 if ( entry->_grp_lock) {530 pj_grp_lock_add_ref( entry->_grp_lock);668 GET_FIELD(timer_copy, id) = entry->id = id_val; 669 timer_copy->_grp_lock = grp_lock; 670 if (timer_copy->_grp_lock) { 671 pj_grp_lock_add_ref(timer_copy->_grp_lock); 531 672 } 673 #if PJ_TIMER_DEBUG 674 timer_copy->src_file = src_file; 675 timer_copy->src_line = src_line; 676 #endif 532 677 } 533 678 unlock_timer_heap(ht); … … 584 729 int id_val) 585 730 { 731 pj_timer_entry_dup *timer_copy; 732 pj_grp_lock_t *grp_lock; 586 733 int count; 587 734 … … 589 736 590 737 lock_timer_heap(ht); 738 timer_copy = GET_TIMER(ht, entry); 739 grp_lock = timer_copy->_grp_lock; 740 591 741 count = cancel(ht, entry, flags | F_DONT_CALL); 592 742 if (count > 0) { … … 595 745 entry->id = id_val; 596 746 } 597 if (entry->_grp_lock) { 598 pj_grp_lock_t *grp_lock = entry->_grp_lock; 599 entry->_grp_lock = NULL; 747 if (grp_lock) { 600 748 pj_grp_lock_dec_ref(grp_lock); 601 749 } … … 641 789 count < ht->max_entries_per_poll ) 642 790 { 643 pj_timer_entry *node = remove_node(ht, 0); 791 pj_timer_entry_dup *node = remove_node(ht, 0); 792 pj_timer_entry *entry = GET_ENTRY(node); 644 793 /* Avoid re-use of this timer until the callback is done. */ 645 794 ///Not necessary, even causes problem (see also #2176). 646 795 ///pj_timer_id_t node_timer_id = pop_freelist(ht); 647 796 pj_grp_lock_t *grp_lock; 797 pj_bool_t valid = PJ_TRUE; 648 798 649 799 ++count; … … 651 801 grp_lock = node->_grp_lock; 652 802 node->_grp_lock = NULL; 803 if (GET_FIELD(node, cb) != entry->cb || 804 GET_FIELD(node, user_data) != entry->user_data) 805 { 806 valid = PJ_FALSE; 807 PJ_LOG(3,(THIS_FILE, "Bug! Polling entry %p from %s line %d has " 808 "been deallocated without being cancelled", 809 GET_ENTRY(node), 810 #if PJ_TIMER_DEBUG 811 node->src_file, node->src_line)); 812 #else 813 "N/A", 0)); 814 #endif 815 #if ASSERT_IF_ENTRY_DESTROYED 816 pj_assert(node->dup.cb == entry->cb); 817 pj_assert(node->dup.user_data == entry->user_data); 818 #endif 819 } 653 820 654 821 unlock_timer_heap(ht); … … 656 823 PJ_RACE_ME(5); 657 824 658 if ( node->cb)659 (* node->cb)(ht, node);660 661 if ( grp_lock)825 if (valid && entry->cb) 826 (*entry->cb)(ht, entry); 827 828 if (valid && grp_lock) 662 829 pj_grp_lock_dec_ref(grp_lock); 663 830 … … 720 887 721 888 for (i=0; i<(unsigned)ht->cur_size; ++i) { 722 pj_timer_entry *e = ht->heap[i];889 pj_timer_entry_dup *e = ht->heap[i]; 723 890 pj_time_val delta; 724 891 … … 731 898 732 899 PJ_LOG(3,(THIS_FILE, " %d\t%d\t%d.%03d\t%s:%d", 733 e->_timer_id, e->id,900 GET_FIELD(e, _timer_id), GET_FIELD(e, id), 734 901 (int)delta.sec, (int)delta.msec, 735 902 e->src_file, e->src_line)); -
pjproject/trunk/pjlib/src/pjlib-test/timer.c
r5933 r6058 189 189 * Stress test * 190 190 *************** 191 * Test scenario :191 * Test scenario (if RANDOMIZED_TEST is 0): 192 192 * 1. Create and schedule a number of timer entries. 193 193 * 2. Start threads for polling (simulating normal worker thread). … … 196 196 * 3. Start threads for cancelling random entries. Each successfully 197 197 * cancelled entry will be re-scheduled after some random delay. 198 * 199 * Test scenario (if RANDOMIZED_TEST is 1): 200 * 1. Create and schedule a number of timer entries. 201 * 2. Start threads which will, based on a configurable probability 202 * setting, randomly perform timer scheduling, cancelling, or 203 * polling (simulating normal worker thread). 204 * This test is considered a failure if: 205 * - It triggers assertion/crash. 206 * - There's an error message in the log, which indicates a potential 207 * bug in the implementation (note that race message is ok). 198 208 */ 199 #define ST_POLL_THREAD_COUNT 10 200 #define ST_CANCEL_THREAD_COUNT 10 201 202 #define ST_ENTRY_COUNT 1000 203 #define ST_ENTRY_MAX_TIMEOUT_MS 100 209 #define RANDOMIZED_TEST 1 210 #define SIMULATE_CRASH PJ_TIMER_HEAP_USE_COPY 211 212 #if RANDOMIZED_TEST 213 #define ST_STRESS_THREAD_COUNT 20 214 #define ST_POLL_THREAD_COUNT 0 215 #define ST_CANCEL_THREAD_COUNT 0 216 #else 217 #define ST_STRESS_THREAD_COUNT 0 218 #define ST_POLL_THREAD_COUNT 10 219 #define ST_CANCEL_THREAD_COUNT 10 220 #endif 221 222 #define ST_ENTRY_COUNT 10000 223 #define ST_DURATION 30000 224 #define ST_ENTRY_MAX_TIMEOUT_MS ST_DURATION/10 204 225 205 226 /* Number of group lock, may be zero, shared by timer entries, group lock … … 216 237 pj_bool_t stopping; 217 238 pj_timer_entry *entries; 239 pj_atomic_t **status; 240 pj_atomic_t *n_sched, *n_cancel, *n_poll; 241 pj_grp_lock_t **grp_locks; 242 int err; 218 243 219 244 pj_atomic_t *idx; … … 227 252 { 228 253 pj_time_val delay = {0}; 229 pj_grp_lock_t *grp_lock = (pj_grp_lock_t*)e->user_data;254 pj_grp_lock_t *grp_lock = NULL; 230 255 pj_status_t status; 256 struct thread_param *tparam = (struct thread_param *)e->user_data; 257 258 if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 259 /* About 90% of entries should have group lock */ 260 grp_lock = tparam->grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 261 } 231 262 232 263 delay.msec = pj_rand() % ST_ENTRY_MAX_TIMEOUT_MS; … … 236 267 } 237 268 269 static void dummy_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 270 { 271 PJ_LOG(4,("test", "dummy callback called %p %p", e, e->user_data)); 272 } 273 238 274 static void st_entry_callback(pj_timer_heap_t *ht, pj_timer_entry *e) 239 275 { 276 struct thread_param *tparam = (struct thread_param *)e->user_data; 277 278 #if RANDOMIZED_TEST 279 /* Make sure the flag has been set. */ 280 while (pj_atomic_get(tparam->status[e - tparam->entries]) != 1) 281 pj_thread_sleep(10); 282 pj_atomic_set(tparam->status[e - tparam->entries], 0); 283 #endif 284 240 285 /* try to cancel this */ 241 286 pj_timer_heap_cancel_if_active(ht, e, 10); … … 245 290 246 291 /* reschedule entry */ 247 st_schedule_entry(ht, e); 292 if (!ST_STRESS_THREAD_COUNT) 293 st_schedule_entry(ht, e); 294 } 295 296 /* Randomized stress worker thread function. */ 297 static int stress_worker(void *arg) 298 { 299 /* Enumeration of possible task. */ 300 enum { 301 SCHEDULING = 0, 302 CANCELLING = 1, 303 POLLING = 2, 304 NOTHING = 3 305 }; 306 /* Probability of a certain task being chosen. 307 * The first number indicates the probability of the first task, 308 * the second number for the second task, and so on. 309 */ 310 int prob[3] = {75, 15, 5}; 311 struct thread_param *tparam = (struct thread_param*)arg; 312 int t_idx, i; 313 314 t_idx = pj_atomic_inc_and_get(tparam->idx); 315 PJ_LOG(4,("test", "...thread #%d (random) started", t_idx)); 316 while (!tparam->stopping) { 317 int job, task; 318 int idx, count; 319 pj_status_t prev_status, status; 320 321 /* Randomly choose which task to do */ 322 job = pj_rand() % 100; 323 if (job < prob[0]) task = SCHEDULING; 324 else if (job < (prob[0] + prob[1])) task = CANCELLING; 325 else if (job < (prob[0] + prob[1] + prob[2])) task = POLLING; 326 else task = NOTHING; 327 328 idx = pj_rand() % ST_ENTRY_COUNT; 329 prev_status = pj_atomic_get(tparam->status[idx]); 330 if (task == SCHEDULING) { 331 if (prev_status != 0) continue; 332 status = st_schedule_entry(tparam->timer, &tparam->entries[idx]); 333 if (prev_status == 0 && status != PJ_SUCCESS) { 334 /* To make sure the flag has been set. */ 335 pj_thread_sleep(20); 336 if (pj_atomic_get(tparam->status[idx]) == 1) { 337 /* Race condition with another scheduling. */ 338 PJ_LOG(3,("test", "race schedule-schedule %d: %p", 339 idx, &tparam->entries[idx])); 340 } else { 341 if (tparam->err != 0) tparam->err = -210; 342 PJ_LOG(3,("test", "error: failed to schedule entry %d: %p", 343 idx, &tparam->entries[idx])); 344 } 345 } else if (prev_status == 1 && status == PJ_SUCCESS) { 346 /* Race condition with another cancellation or 347 * timer poll. 348 */ 349 pj_thread_sleep(20); 350 PJ_LOG(3,("test", "race schedule-cancel/poll %d: %p", 351 idx, &tparam->entries[idx])); 352 } 353 if (status == PJ_SUCCESS) { 354 pj_atomic_set(tparam->status[idx], 1); 355 pj_atomic_inc(tparam->n_sched); 356 } 357 } else if (task == CANCELLING) { 358 count = pj_timer_heap_cancel_if_active(tparam->timer, 359 &tparam->entries[idx], 10); 360 if (prev_status == 0 && count > 0) { 361 /* To make sure the flag has been set. */ 362 pj_thread_sleep(20); 363 if (pj_atomic_get(tparam->status[idx]) == 1) { 364 /* Race condition with scheduling. */ 365 PJ_LOG(3,("test", "race cancel-schedule %d: %p", 366 idx, &tparam->entries[idx])); 367 } else { 368 if (tparam->err != 0) tparam->err = -220; 369 PJ_LOG(3,("test", "error: cancelling invalid entry %d: %p", 370 idx, &tparam->entries[idx])); 371 } 372 } else if (prev_status == 1 && count == 0) { 373 /* To make sure the flag has been cleared. */ 374 pj_thread_sleep(20); 375 if (pj_atomic_get(tparam->status[idx]) == 0) { 376 /* Race condition with polling. */ 377 PJ_LOG(3,("test", "race cancel-poll %d: %p", 378 idx, &tparam->entries[idx])); 379 } else { 380 if (tparam->err != 0) tparam->err = -230; 381 PJ_LOG(3,("test", "error: failed to cancel entry %d: %p", 382 idx, &tparam->entries[idx])); 383 } 384 } 385 if (count > 0) { 386 /* Make sure the flag has been set. */ 387 while (pj_atomic_get(tparam->status[idx]) != 1) 388 pj_thread_sleep(10); 389 pj_atomic_set(tparam->status[idx], 0); 390 pj_atomic_inc(tparam->n_cancel); 391 } 392 } else if (task == POLLING) { 393 count = pj_timer_heap_poll(tparam->timer, NULL); 394 for (i = 0; i < count; i++) { 395 pj_atomic_inc_and_get(tparam->n_poll); 396 } 397 } else { 398 pj_thread_sleep(10); 399 } 400 } 401 PJ_LOG(4,("test", "...thread #%d (poll) stopped", t_idx)); 402 403 return 0; 248 404 } 249 405 … … 308 464 static int timer_stress_test(void) 309 465 { 466 unsigned count = 0, n_sched = 0, n_cancel = 0, n_poll = 0; 310 467 int i; 311 468 pj_timer_entry *entries = NULL; 469 pj_atomic_t **entries_status = NULL; 312 470 pj_grp_lock_t **grp_locks = NULL; 313 471 pj_pool_t *pool; … … 316 474 pj_status_t status; 317 475 int err=0; 476 pj_thread_t **stress_threads = NULL; 318 477 pj_thread_t **poll_threads = NULL; 319 478 pj_thread_t **cancel_threads = NULL; 320 479 struct thread_param tparam = {0}; 321 480 pj_time_val now; 481 #if SIMULATE_CRASH 482 pj_timer_entry *entry; 483 pj_pool_t *tmp_pool; 484 pj_time_val delay = {0}; 485 #endif 322 486 323 487 PJ_LOG(3,("test", "...Stress test")); … … 333 497 } 334 498 335 /* Create timer heap */ 336 status = pj_timer_heap_create(pool, ST_ENTRY_COUNT, &timer); 499 /* Create timer heap. 500 * Initially we only create a fraction of what's required, 501 * to test the timer heap growth algorithm. 502 */ 503 status = pj_timer_heap_create(pool, ST_ENTRY_COUNT/64, &timer); 337 504 if (status != PJ_SUCCESS) { 338 505 app_perror("...error: unable to create timer heap", status); … … 355 522 pj_pool_calloc(pool, ST_ENTRY_GROUP_LOCK_COUNT, 356 523 sizeof(pj_grp_lock_t*)); 524 tparam.grp_locks = grp_locks; 357 525 } 358 526 for (i=0; i<ST_ENTRY_GROUP_LOCK_COUNT; ++i) { … … 373 541 goto on_return; 374 542 } 375 543 entries_status = (pj_atomic_t**)pj_pool_calloc(pool, ST_ENTRY_COUNT, 544 sizeof(*entries_status)); 545 if (!entries_status) { 546 err = -55; 547 goto on_return; 548 } 549 376 550 for (i=0; i<ST_ENTRY_COUNT; ++i) { 377 pj_grp_lock_t *grp_lock = NULL; 378 379 if (ST_ENTRY_GROUP_LOCK_COUNT && pj_rand() % 10) { 380 /* About 90% of entries should have group lock */ 381 grp_lock = grp_locks[pj_rand() % ST_ENTRY_GROUP_LOCK_COUNT]; 382 } 383 384 pj_timer_entry_init(&entries[i], 0, grp_lock, &st_entry_callback); 385 status = st_schedule_entry(timer, &entries[i]); 551 pj_timer_entry_init(&entries[i], 0, &tparam, &st_entry_callback); 552 553 status = pj_atomic_create(pool, -1, &entries_status[i]); 386 554 if (status != PJ_SUCCESS) { 387 app_perror("...error: unable to schedule entry", status);388 555 err = -60; 389 556 goto on_return; 557 } 558 pj_atomic_set(entries_status[i], 0); 559 560 /* For randomized test, we schedule the entry inside the thread */ 561 if (!ST_STRESS_THREAD_COUNT) { 562 status = st_schedule_entry(timer, &entries[i]); 563 if (status != PJ_SUCCESS) { 564 app_perror("...error: unable to schedule entry", status); 565 err = -60; 566 goto on_return; 567 } 390 568 } 391 569 } … … 394 572 tparam.timer = timer; 395 573 tparam.entries = entries; 574 tparam.status = entries_status; 396 575 status = pj_atomic_create(pool, -1, &tparam.idx); 397 576 if (status != PJ_SUCCESS) { … … 399 578 err = -70; 400 579 goto on_return; 580 } 581 status = pj_atomic_create(pool, -1, &tparam.n_sched); 582 pj_assert (status == PJ_SUCCESS); 583 pj_atomic_set(tparam.n_sched, 0); 584 status = pj_atomic_create(pool, -1, &tparam.n_cancel); 585 pj_assert (status == PJ_SUCCESS); 586 pj_atomic_set(tparam.n_cancel, 0); 587 status = pj_atomic_create(pool, -1, &tparam.n_poll); 588 pj_assert (status == PJ_SUCCESS); 589 pj_atomic_set(tparam.n_poll, 0); 590 591 /* Start stress worker threads */ 592 if (ST_STRESS_THREAD_COUNT) { 593 stress_threads = (pj_thread_t**) 594 pj_pool_calloc(pool, ST_STRESS_THREAD_COUNT, 595 sizeof(pj_thread_t*)); 596 } 597 for (i=0; i<ST_STRESS_THREAD_COUNT; ++i) { 598 status = pj_thread_create( pool, "poll", &stress_worker, &tparam, 599 0, 0, &stress_threads[i]); 600 if (status != PJ_SUCCESS) { 601 app_perror("...error: unable to create stress thread", status); 602 err = -75; 603 goto on_return; 604 } 401 605 } 402 606 … … 433 637 } 434 638 435 /* Wait 30s */ 436 pj_thread_sleep(30*1000); 437 639 #if SIMULATE_CRASH 640 tmp_pool = pj_pool_create( mem, NULL, 4096, 128, NULL); 641 pj_assert(tmp_pool); 642 entry = (pj_timer_entry*)pj_pool_calloc(tmp_pool, 1, sizeof(*entry)); 643 pj_assert(entry); 644 pj_timer_entry_init(entry, 0, &tparam, &dummy_callback); 645 delay.sec = 6; 646 status = pj_timer_heap_schedule(timer, entry, &delay); 647 pj_assert(status == PJ_SUCCESS); 648 pj_thread_sleep(1000); 649 PJ_LOG(3,("test", "...Releasing timer entry %p without cancelling it", 650 entry)); 651 pj_pool_secure_release(tmp_pool); 652 //pj_pool_release(tmp_pool); 653 //pj_memset(tmp_pool, 128, 4096); 654 #endif 655 656 /* Wait */ 657 pj_thread_sleep(ST_DURATION); 438 658 439 659 on_return: … … 442 662 tparam.stopping = PJ_TRUE; 443 663 664 for (i=0; i<ST_STRESS_THREAD_COUNT; ++i) { 665 if (!stress_threads[i]) 666 continue; 667 pj_thread_join(stress_threads[i]); 668 pj_thread_destroy(stress_threads[i]); 669 } 670 444 671 for (i=0; i<ST_POLL_THREAD_COUNT; ++i) { 445 672 if (!poll_threads[i]) … … 463 690 464 691 for (i=0; i<ST_ENTRY_COUNT; ++i) { 465 pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 692 count += pj_timer_heap_cancel_if_active(timer, &entries[i], 10); 693 if (entries_status) 694 pj_atomic_destroy(entries_status[i]); 466 695 } 467 696 … … 478 707 pj_timer_heap_destroy(timer); 479 708 709 PJ_LOG(3,("test", "Total memory of timer heap: %d", 710 pj_timer_heap_mem_size(ST_ENTRY_COUNT))); 711 480 712 if (tparam.idx) 481 713 pj_atomic_destroy(tparam.idx); 714 if (tparam.n_sched) { 715 n_sched = pj_atomic_get(tparam.n_sched); 716 PJ_LOG(3,("test", "Total number of scheduled entries: %d", n_sched)); 717 pj_atomic_destroy(tparam.n_sched); 718 } 719 if (tparam.n_cancel) { 720 n_cancel = pj_atomic_get(tparam.n_cancel); 721 PJ_LOG(3,("test", "Total number of cancelled entries: %d", n_cancel)); 722 pj_atomic_destroy(tparam.n_cancel); 723 } 724 if (tparam.n_poll) { 725 n_poll = pj_atomic_get(tparam.n_poll); 726 PJ_LOG(3,("test", "Total number of polled entries: %d", n_poll)); 727 pj_atomic_destroy(tparam.n_poll); 728 } 729 PJ_LOG(3,("test", "Number of remaining active entries: %d", count)); 730 if (n_sched) { 731 pj_bool_t match = PJ_TRUE; 732 733 #if SIMULATE_CRASH 734 n_sched++; 735 #endif 736 if (n_sched != (n_cancel + n_poll + count)) { 737 if (tparam.err != 0) tparam.err = -250; 738 match = PJ_FALSE; 739 } 740 PJ_LOG(3,("test", "Scheduled = cancelled + polled + remaining?: %s", 741 (match? "yes": "no"))); 742 } 482 743 483 744 pj_pool_safe_release(&pool); 484 745 485 return err;746 return (err? err: tparam.err); 486 747 } 487 748
Note: See TracChangeset
for help on using the changeset viewer.