Changeset 1789 for pjproject/trunk
- Timestamp:
- Feb 13, 2008 3:17:28 PM (17 years ago)
- Location:
- pjproject/trunk/pjlib
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/include/pj/config.h
r1788 r1789 286 286 # undef PJ_EXCEPTION_USE_WIN32_SEH 287 287 # undef PJ_HAS_ERROR_STRING 288 289 # define PJ_HAS_IPV6 1 288 290 #endif 289 291 … … 510 512 #ifndef PJ_IOQUEUE_HAS_SAFE_UNREG 511 513 # define PJ_IOQUEUE_HAS_SAFE_UNREG 1 514 #endif 515 516 517 /** 518 * Default concurrency setting for sockets/handles registered to ioqueue. 519 * This controls whether the ioqueue is allowed to call the key's callback 520 * concurrently/in parallel. The default is yes, which means that if there 521 * are more than one pending operations complete simultaneously, more 522 * than one threads may call the key's callback at the same time. This 523 * generally would promote good scalability for application, at the 524 * expense of more complexity to manage the concurrent accesses. 525 * 526 * Please see the ioqueue documentation for more info. 527 */ 528 #ifndef PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY 529 # define PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY 1 530 #endif 531 532 533 /* Sanity check: 534 * if ioqueue concurrency is disallowed, PJ_IOQUEUE_HAS_SAFE_UNREG 535 * must be enabled. 536 */ 537 #if (PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY==0) && (PJ_IOQUEUE_HAS_SAFE_UNREG==0) 538 # error PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if ioqueue concurrency \ 539 is disabled 512 540 #endif 513 541 -
pjproject/trunk/pjlib/include/pj/ioqueue.h
r1748 r1789 102 102 * \section pj_ioqueue_concurrency_sec Concurrency Rules 103 103 * 104 * The items below describe rules that must be obeyed when using the I/O 105 * queue, with regard to concurrency: 106 * - simultaneous operations (by different threads) to different key is safe. 107 * - simultaneous operations to the same key is also safe, except 108 * <b>unregistration</b>, which is described below. 109 * - <b>care must be taken when unregistering a key</b> from the 104 * The ioqueue has been fine tuned to allow multiple threads to poll the 105 * handles simultaneously, to maximize scalability when the application is 106 * running on multiprocessor systems. When more than one threads are polling 107 * the ioqueue and there are more than one handles are signaled, more than 108 * one threads will execute the callback simultaneously to serve the events. 109 * These parallel executions are completely safe when the events happen for 110 * two different handles. 111 * 112 * However, with multithreading, care must be taken when multiple events 113 * happen on the same handle, or when event is happening on a handle (and 114 * the callback is being executed) and application is performing 115 * unregistration to the handle at the same time. 116 * 117 * The treatments of above scenario differ according to the concurrency 118 * setting that are applied to the handle. 119 * 120 * \subsection pj_ioq_concur_set Concurrency Settings for Handles 121 * 122 * Concurrency can be set on per handle (key) basis, by using 123 * #pj_ioqueue_set_concurrency() function. The default key concurrency value 124 * for the handle is inherited from the key concurrency setting of the ioqueue, 125 * and the key concurrency setting for the ioqueue can be changed by using 126 * #pj_ioqueue_set_default_concurrency(). The default key concurrency setting 127 * for ioqueue itself is controlled by compile time setting 128 * PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY. 129 * 130 * Note that this key concurrency setting only controls whether multiple 131 * threads are allowed to operate <b>on the same key</b> at the same time. 132 * The ioqueue itself always allows multiple threads to enter the ioqeuue at 133 * the same time, and also simultaneous callback calls to <b>differrent 134 * keys</b> is always allowed regardless to the key concurrency setting. 135 * 136 * \subsection pj_ioq_parallel Parallel Callback Executions for the Same Handle 137 * 138 * Note that when key concurrency is enabled (i.e. parallel callback calls on 139 * the same key is allowed; this is the default setting), the ioqueue will only 140 * perform simultaneous callback executions on the same key when the key has 141 * invoked multiple pending operations. This could be done for example by 142 * calling #pj_ioqueue_recvfrom() more than once on the same key, each with 143 * the same key but different operation key (pj_ioqueue_op_key_t). With this 144 * scenario, when multiple packets arrive on the key at the same time, more 145 * than one threads may execute the callback simultaneously, each with the 146 * same key but different operation key. 147 * 148 * When there is only one pending operation on the key (e.g. there is only one 149 * #pj_ioqueue_recvfrom() invoked on the key), then events occuring to the 150 * same key will be queued by the ioqueue, thus no simultaneous callback calls 151 * will be performed. 152 * 153 * \subsection pj_ioq_allow_concur Concurrency is Enabled (Default Value) 154 * 155 * The default setting for the ioqueue is to allow multiple threads to 156 * execute callbacks for the same handle/key. This setting is selected to 157 * promote good performance and scalability for application. 158 * 159 * However this setting has a major drawback with regard to synchronization, 160 * and application MUST carefully follow the following guidelines to ensure 161 * that parallel access to the key does not cause problems: 162 * 163 * - Always note that callback may be called simultaneously for the same 164 * key. 165 * - <b>Care must be taken when unregistering a key</b> from the 110 166 * ioqueue. Application must take care that when one thread is issuing 111 * an unregistration, other thread is not simultaneously invoking an112 * operation<b>to the same key</b>.167 * an unregistration, other thread is not simultaneously invoking the 168 * callback <b>to the same key</b>. 113 169 *\n 114 170 * This happens because the ioqueue functions are working with a pointer … … 116 172 * has been rendered invalid by other threads before the ioqueue has a 117 173 * chance to acquire mutex on it. 174 * 175 * \subsection pj_ioq_disallow_concur Concurrency is Disabled 176 * 177 * Alternatively, application may disable key concurrency to make 178 * synchronization easier. As noted above, there are three ways to control 179 * key concurrency setting: 180 * - by controlling on per handle/key basis, with #pj_ioqueue_set_concurrency(). 181 * - by changing default key concurrency setting on the ioqueue, with 182 * #pj_ioqueue_set_default_concurrency(). 183 * - by changing the default concurrency on compile time, by declaring 184 * PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY macro to zero in your config_site.h 118 185 * 119 186 * \section pj_ioqeuue_examples_sec Examples … … 292 359 293 360 /** 361 * Set default concurrency policy for this ioqueue. If this function is not 362 * called, the default concurrency policy for the ioqueue is controlled by 363 * compile time setting PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY. 364 * 365 * Note that changing the concurrency setting to the ioqueue will only affect 366 * subsequent key registrations. To modify the concurrency setting for 367 * individual key, use #pj_ioqueue_set_concurrency(). 368 * 369 * @param ioqueue The ioqueue instance. 370 * @param allow Non-zero to allow concurrent callback calls, or 371 * PJ_FALSE to disallow it. 372 * 373 * @return PJ_SUCCESS on success or the appropriate error code. 374 */ 375 PJ_DECL(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, 376 pj_bool_t allow); 377 378 /** 294 379 * Register a socket to the I/O queue framework. 295 380 * When a socket is registered to the IOQueue, it may be modified to use … … 367 452 void **old_data); 368 453 454 /** 455 * Configure whether the ioqueue is allowed to call the key's callback 456 * concurrently/in parallel. The default concurrency setting for the key 457 * is controlled by ioqueue's default concurrency value, which can be 458 * changed by calling #pj_ioqueue_set_default_concurrency(). 459 * 460 * If concurrency is allowed for the key, it means that if there are more 461 * than one pending operations complete simultaneously, more than one 462 * threads may call the key's callback at the same time. This generally 463 * would promote good scalability for application, at the expense of more 464 * complexity to manage the concurrent accesses in application's code. 465 * 466 * Alternatively application may disable the concurrent access by 467 * setting the \a allow flag to false. With concurrency disabled, only 468 * one thread can call the key's callback at one time. 469 * 470 * @param key The key that was previously obtained from registration. 471 * @param allow Set this to non-zero to allow concurrent callback calls 472 * and zero (PJ_FALSE) to disallow it. 473 * 474 * @return PJ_SUCCESS on success or the appropriate error code. 475 */ 476 PJ_DECL(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 477 pj_bool_t allow); 478 479 /** 480 * Acquire the key's mutex. When the key's concurrency is disabled, 481 * application may call this function to synchronize its operation 482 * with the key's callback (i.e. this function will block until the 483 * key's callback returns). 484 * 485 * @param key The key that was previously obtained from registration. 486 * 487 * @return PJ_SUCCESS on success or the appropriate error code. 488 */ 489 PJ_DECL(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key); 490 491 /** 492 * Release the lock previously acquired with pj_ioqueue_lock_key(). 493 * 494 * @param key The key that was previously obtained from registration. 495 * 496 * @return PJ_SUCCESS on success or the appropriate error code. 497 */ 498 PJ_DECL(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key); 369 499 370 500 /** -
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c
r1405 r1789 97 97 #endif 98 98 99 rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency); 100 if (rc != PJ_SUCCESS) 101 return rc; 102 99 103 /* Get socket type. When socket type is datagram, some optimization 100 104 * will be performed during send to allow parallel send operations. … … 194 198 /* Completion of connect() operation */ 195 199 pj_ssize_t bytes_transfered; 200 pj_bool_t has_lock; 196 201 197 202 /* Clear operation. */ … … 247 252 #endif 248 253 249 /* Unlock; from this point we don't need to hold key's mutex. */ 250 pj_mutex_unlock(h->mutex); 254 /* Unlock; from this point we don't need to hold key's mutex 255 * (unless concurrency is disabled, which in this case we should 256 * hold the mutex while calling the callback) */ 257 if (h->allow_concurrent) { 258 /* concurrency may be changed while we're in the callback, so 259 * save it to a flag. 260 */ 261 has_lock = PJ_FALSE; 262 pj_mutex_unlock(h->mutex); 263 } else { 264 has_lock = PJ_TRUE; 265 } 251 266 252 267 /* Call callback. */ 253 268 if (h->cb.on_connect_complete && !IS_CLOSING(h)) 254 269 (*h->cb.on_connect_complete)(h, bytes_transfered); 270 271 /* Unlock if we still hold the lock */ 272 if (has_lock) { 273 pj_mutex_unlock(h->mutex); 274 } 255 275 256 276 /* Done. */ … … 318 338 h->fd_type == pj_SOCK_DGRAM()) 319 339 { 340 pj_bool_t has_lock; 320 341 321 342 write_op->op = PJ_IOQUEUE_OP_NONE; … … 331 352 } 332 353 333 /* No need to hold mutex anymore */ 334 pj_mutex_unlock(h->mutex); 354 /* Unlock; from this point we don't need to hold key's mutex 355 * (unless concurrency is disabled, which in this case we should 356 * hold the mutex while calling the callback) */ 357 if (h->allow_concurrent) { 358 /* concurrency may be changed while we're in the callback, so 359 * save it to a flag. 360 */ 361 has_lock = PJ_FALSE; 362 pj_mutex_unlock(h->mutex); 363 } else { 364 has_lock = PJ_TRUE; 365 } 335 366 336 367 /* Call callback. */ … … 340 371 write_op->written); 341 372 } 373 374 if (has_lock) { 375 pj_mutex_unlock(h->mutex); 376 } 342 377 343 378 } else { … … 372 407 373 408 struct accept_operation *accept_op; 409 pj_bool_t has_lock; 374 410 375 411 /* Get one accept operation from the list. */ … … 390 426 } 391 427 392 /* Unlock; from this point we don't need to hold key's mutex. */ 393 pj_mutex_unlock(h->mutex); 428 /* Unlock; from this point we don't need to hold key's mutex 429 * (unless concurrency is disabled, which in this case we should 430 * hold the mutex while calling the callback) */ 431 if (h->allow_concurrent) { 432 /* concurrency may be changed while we're in the callback, so 433 * save it to a flag. 434 */ 435 has_lock = PJ_FALSE; 436 pj_mutex_unlock(h->mutex); 437 } else { 438 has_lock = PJ_TRUE; 439 } 394 440 395 441 /* Call callback. */ … … 400 446 } 401 447 448 if (has_lock) { 449 pj_mutex_unlock(h->mutex); 450 } 402 451 } 403 452 else … … 406 455 struct read_operation *read_op; 407 456 pj_ssize_t bytes_read; 457 pj_bool_t has_lock; 408 458 409 459 /* Get one pending read operation from the list. */ … … 480 530 } 481 531 482 /* Unlock; from this point we don't need to hold key's mutex. */ 483 pj_mutex_unlock(h->mutex); 532 /* Unlock; from this point we don't need to hold key's mutex 533 * (unless concurrency is disabled, which in this case we should 534 * hold the mutex while calling the callback) */ 535 if (h->allow_concurrent) { 536 /* concurrency may be changed while we're in the callback, so 537 * save it to a flag. 538 */ 539 has_lock = PJ_FALSE; 540 pj_mutex_unlock(h->mutex); 541 } else { 542 has_lock = PJ_TRUE; 543 } 484 544 485 545 /* Call callback. */ … … 489 549 bytes_read); 490 550 } 551 552 if (has_lock) { 553 pj_mutex_unlock(h->mutex); 554 } 491 555 492 556 } else { … … 504 568 pj_ioqueue_key_t *h ) 505 569 { 570 pj_bool_t has_lock; 571 506 572 pj_mutex_lock(h->mutex); 507 573 … … 526 592 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT); 527 593 528 pj_mutex_unlock(h->mutex); 594 /* Unlock; from this point we don't need to hold key's mutex 595 * (unless concurrency is disabled, which in this case we should 596 * hold the mutex while calling the callback) */ 597 if (h->allow_concurrent) { 598 /* concurrency may be changed while we're in the callback, so 599 * save it to a flag. 600 */ 601 has_lock = PJ_FALSE; 602 pj_mutex_unlock(h->mutex); 603 } else { 604 has_lock = PJ_TRUE; 605 } 529 606 530 607 /* Call callback. */ … … 542 619 543 620 (*h->cb.on_connect_complete)(h, status); 621 } 622 623 if (has_lock) { 624 pj_mutex_unlock(h->mutex); 544 625 } 545 626 } … … 1097 1178 } 1098 1179 1180 PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue, 1181 pj_bool_t allow) 1182 { 1183 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); 1184 ioqueue->default_concurrency = allow; 1185 return PJ_SUCCESS; 1186 } 1187 1188 1189 PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 1190 pj_bool_t allow) 1191 { 1192 PJ_ASSERT_RETURN(key, PJ_EINVAL); 1193 1194 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is 1195 * disabled. 1196 */ 1197 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); 1198 1199 key->allow_concurrent = allow; 1200 return PJ_SUCCESS; 1201 } 1202 1203 PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 1204 { 1205 return pj_mutex_lock(key->mutex); 1206 } 1207 1208 PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 1209 { 1210 return pj_mutex_unlock(key->mutex); 1211 } 1212 -
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.h
r974 r1789 104 104 pj_bool_t inside_callback; \ 105 105 pj_bool_t destroy_requested; \ 106 pj_bool_t allow_concurrent; \ 106 107 pj_sock_t fd; \ 107 108 int fd_type; \ … … 117 118 #define DECLARE_COMMON_IOQUEUE \ 118 119 pj_lock_t *lock; \ 119 pj_bool_t auto_delete_lock; 120 pj_bool_t auto_delete_lock; \ 121 pj_bool_t default_concurrency; 120 122 121 123 -
pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c
r1405 r1789 115 115 enum handle_type hnd_type; 116 116 pj_ioqueue_callback cb; 117 pj_bool_t allow_concurrent; 117 118 118 119 #if PJ_HAS_TCP … … 124 125 pj_bool_t closing; 125 126 pj_time_val free_time; 127 pj_mutex_t *mutex; 126 128 #endif 127 129 … … 136 138 pj_lock_t *lock; 137 139 pj_bool_t auto_delete_lock; 140 pj_bool_t default_concurrency; 138 141 139 142 #if PJ_IOQUEUE_HAS_SAFE_UNREG … … 152 155 #endif 153 156 }; 157 158 159 #if PJ_IOQUEUE_HAS_SAFE_UNREG 160 /* Prototype */ 161 static void scan_closing_keys(pj_ioqueue_t *ioqueue); 162 #endif 154 163 155 164 … … 316 325 317 326 /* Create IOCP mutex */ 318 rc = pj_lock_create_ simple_mutex(pool, NULL, &ioqueue->lock);327 rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock); 319 328 if (rc != PJ_SUCCESS) { 320 329 CloseHandle(ioqueue->iocp); … … 323 332 324 333 ioqueue->auto_delete_lock = PJ_TRUE; 334 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY; 325 335 326 336 #if PJ_IOQUEUE_HAS_SAFE_UNREG … … 345 355 while (key != &ioqueue->free_list) { 346 356 pj_atomic_destroy(key->ref_count); 357 pj_mutex_destroy(key->mutex); 347 358 key = key->next; 348 359 } … … 351 362 } 352 363 364 rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex); 365 if (rc != PJ_SUCCESS) { 366 pj_atomic_destroy(key->ref_count); 367 key = ioqueue->free_list.next; 368 while (key != &ioqueue->free_list) { 369 pj_atomic_destroy(key->ref_count); 370 pj_mutex_destroy(key->mutex); 371 key = key->next; 372 } 373 CloseHandle(ioqueue->iocp); 374 return rc; 375 } 376 353 377 pj_list_push_back(&ioqueue->free_list, key); 354 355 378 } 356 379 #endif … … 393 416 while (key != &ioqueue->active_list) { 394 417 pj_atomic_destroy(key->ref_count); 418 pj_mutex_destroy(key->mutex); 395 419 key = key->next; 396 420 } … … 399 423 while (key != &ioqueue->closing_list) { 400 424 pj_atomic_destroy(key->ref_count); 425 pj_mutex_destroy(key->mutex); 401 426 key = key->next; 402 427 } … … 405 430 while (key != &ioqueue->free_list) { 406 431 pj_atomic_destroy(key->ref_count); 432 pj_mutex_destroy(key->mutex); 407 433 key = key->next; 408 434 } … … 412 438 pj_lock_destroy(ioqueue->lock); 413 439 440 return PJ_SUCCESS; 441 } 442 443 444 PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, 445 pj_bool_t allow) 446 { 447 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); 448 ioqueue->default_concurrency = allow; 414 449 return PJ_SUCCESS; 415 450 } … … 454 489 455 490 #if PJ_IOQUEUE_HAS_SAFE_UNREG 491 /* Scan closing list first to release unused keys. 492 * Must do this with lock acquired. 493 */ 494 scan_closing_keys(ioqueue); 495 456 496 /* If safe unregistration is used, then get the key record from 457 497 * the free list. … … 482 522 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 483 523 524 /* Set concurrency for this handle */ 525 rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency); 526 if (rc != PJ_SUCCESS) { 527 pj_lock_release(ioqueue->lock); 528 return rc; 529 } 530 484 531 #if PJ_HAS_TCP 485 532 rec->connecting = 0; … … 586 633 */ 587 634 if (pOv) { 635 pj_bool_t has_lock; 636 588 637 /* Event was dequeued for either successfull or failed I/O */ 589 638 key = (pj_ioqueue_key_t*)dwKey; … … 601 650 return PJ_TRUE; 602 651 652 /* If concurrency is disabled, lock the key 653 * (and save the lock status to local var since app may change 654 * concurrency setting while in the callback) */ 655 if (key->allow_concurrent == PJ_FALSE) { 656 pj_mutex_lock(key->mutex); 657 has_lock = PJ_TRUE; 658 } else { 659 has_lock = PJ_FALSE; 660 } 661 662 /* Now that we get the lock, check again that key is not closing */ 663 if (key->closing) { 664 if (has_lock) { 665 pj_mutex_unlock(key->mutex); 666 } 667 return PJ_TRUE; 668 } 669 603 670 /* Increment reference counter to prevent this key from being 604 671 * deleted 605 672 */ 606 673 pj_atomic_inc(key->ref_count); 674 #else 675 PJ_UNUSED_ARG(has_lock); 607 676 #endif 608 677 … … 655 724 #if PJ_IOQUEUE_HAS_SAFE_UNREG 656 725 decrement_counter(key); 726 if (has_lock) 727 pj_mutex_unlock(key->mutex); 657 728 #endif 658 729 … … 670 741 { 671 742 unsigned i; 743 pj_bool_t has_lock; 672 744 enum { RETRY = 10 }; 673 745 … … 697 769 /* Mark key as closing before closing handle. */ 698 770 key->closing = 1; 771 772 /* If concurrency is disabled, wait until the key has finished 773 * processing the callback 774 */ 775 if (key->allow_concurrent == PJ_FALSE) { 776 pj_mutex_lock(key->mutex); 777 has_lock = PJ_TRUE; 778 } else { 779 has_lock = PJ_FALSE; 780 } 781 #else 782 PJ_UNUSED_ARG(has_lock); 699 783 #endif 700 784 … … 718 802 * Forcing context switch seems to have fixed that, but this is quite 719 803 * an ugly solution.. 804 * 805 * Update 2008/02/13: 806 * This should not happen if concurrency is disallowed for the key. 807 * So at least application has a solution for this (i.e. by disallowing 808 * concurrency in the key). 720 809 */ 721 810 //This will loop forever if unregistration is done on the callback. … … 729 818 /* Decrement reference counter to destroy the key. */ 730 819 decrement_counter(key); 820 821 if (has_lock) 822 pj_mutex_unlock(key->mutex); 731 823 #endif 732 824 733 825 return PJ_SUCCESS; 734 826 } 827 828 #if PJ_IOQUEUE_HAS_SAFE_UNREG 829 /* Scan the closing list, and put pending closing keys to free list. 830 * Must do this with ioqueue mutex held. 831 */ 832 static void scan_closing_keys(pj_ioqueue_t *ioqueue) 833 { 834 if (!pj_list_empty(&ioqueue->closing_list)) { 835 pj_time_val now; 836 pj_ioqueue_key_t *key; 837 838 pj_gettimeofday(&now); 839 840 /* Move closing keys to free list when they've finished the closing 841 * idle time. 842 */ 843 key = ioqueue->closing_list.next; 844 while (key != &ioqueue->closing_list) { 845 pj_ioqueue_key_t *next = key->next; 846 847 pj_assert(key->closing != 0); 848 849 if (PJ_TIME_VAL_GTE(now, key->free_time)) { 850 pj_list_erase(key); 851 pj_list_push_back(&ioqueue->free_list, key); 852 } 853 key = next; 854 } 855 } 856 } 857 #endif 735 858 736 859 /* … … 767 890 /* Check the closing keys only when there's no activity and when there are 768 891 * pending closing keys. 769 * blp: 770 * no, always check the list. Otherwise on busy activity, this will cause 771 * ioqueue to reject new registration. 772 */ 773 if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) { 774 pj_time_val now; 775 pj_ioqueue_key_t *key; 776 777 pj_gettimeofday(&now); 778 779 /* Move closing keys to free list when they've finished the closing 780 * idle time. 781 */ 892 */ 893 if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 782 894 pj_lock_acquire(ioqueue->lock); 783 key = ioqueue->closing_list.next; 784 while (key != &ioqueue->closing_list) { 785 pj_ioqueue_key_t *next = key->next; 786 787 pj_assert(key->closing != 0); 788 789 if (PJ_TIME_VAL_GTE(now, key->free_time)) { 790 pj_list_erase(key); 791 pj_list_push_back(&ioqueue->free_list, key); 792 } 793 key = next; 794 } 895 scan_closing_keys(ioqueue); 795 896 pj_lock_release(ioqueue->lock); 796 897 } … … 1269 1370 } 1270 1371 1372 PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 1373 pj_bool_t allow) 1374 { 1375 PJ_ASSERT_RETURN(key, PJ_EINVAL); 1376 1377 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is 1378 * disabled. 1379 */ 1380 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); 1381 1382 key->allow_concurrent = allow; 1383 return PJ_SUCCESS; 1384 } 1385 1386 PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 1387 { 1388 #if PJ_IOQUEUE_HAS_SAFE_UNREG 1389 return pj_mutex_lock(key->mutex); 1390 #else 1391 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); 1392 #endif 1393 } 1394 1395 PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 1396 { 1397 #if PJ_IOQUEUE_HAS_SAFE_UNREG 1398 return pj_mutex_unlock(key->mutex); 1399 #else 1400 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); 1401 #endif 1402 } 1403 -
pjproject/trunk/pjlib/src/pjlib-test/ioq_perf.c
r1405 r1789 222 222 * period of time. 223 223 */ 224 static int perform_test(int sock_type, const char *type_name, 224 static int perform_test(pj_bool_t allow_concur, 225 int sock_type, const char *type_name, 225 226 unsigned thread_cnt, unsigned sockpair_cnt, 226 227 pj_size_t buffer_size, … … 261 262 } 262 263 264 rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); 265 if (rc != PJ_SUCCESS) { 266 app_perror("...error: pj_ioqueue_set_default_concurrency()", rc); 267 return -16; 268 } 269 263 270 /* Initialize each producer-consumer pair. */ 264 271 for (i=0; i<sockpair_cnt; ++i) { … … 438 445 } 439 446 440 /* 441 * main test entry. 442 */ 443 int ioqueue_perf_test(void) 447 static int ioqueue_perf_test_imp(pj_bool_t allow_concur) 444 448 { 445 449 enum { BUF_SIZE = 512 }; … … 501 505 502 506 PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name())); 507 PJ_LOG(3,(THIS_FILE, " Testing with concurency=%d", allow_concur)); 503 508 PJ_LOG(3,(THIS_FILE, " =======================================")); 504 509 PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Bandwidth")); … … 509 514 pj_size_t bandwidth; 510 515 511 rc = perform_test(test_param[i].type, 516 rc = perform_test(allow_concur, 517 test_param[i].type, 512 518 test_param[i].type_name, 513 519 test_param[i].thread_cnt, … … 538 544 } 539 545 546 /* 547 * main test entry. 548 */ 549 int ioqueue_perf_test(void) 550 { 551 int rc; 552 553 rc = ioqueue_perf_test_imp(PJ_TRUE); 554 if (rc != 0) 555 return rc; 556 557 rc = ioqueue_perf_test_imp(PJ_FALSE); 558 if (rc != 0) 559 return rc; 560 561 return 0; 562 } 563 540 564 #else 541 565 /* To prevent warning about "translation unit is empty" -
pjproject/trunk/pjlib/src/pjlib-test/ioq_tcp.c
r1405 r1789 233 233 * Compliance test for success scenario. 234 234 */ 235 static int compliance_test_0( void)235 static int compliance_test_0(pj_bool_t allow_concur) 236 236 { 237 237 pj_sock_t ssock=-1, csock0=-1, csock1=-1; … … 291 291 app_perror("...ERROR in pj_ioqueue_create()", rc); 292 292 status=-20; goto on_error; 293 } 294 295 // Concurrency 296 rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 297 if (rc != PJ_SUCCESS) { 298 app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc); 299 status=-21; goto on_error; 293 300 } 294 301 … … 459 466 * In this case, the client connects to a non-existant service. 460 467 */ 461 static int compliance_test_1( void)468 static int compliance_test_1(pj_bool_t allow_concur) 462 469 { 463 470 pj_sock_t csock1=PJ_INVALID_SOCKET; … … 478 485 if (!ioque) { 479 486 status=-20; goto on_error; 487 } 488 489 // Concurrency 490 rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 491 if (rc != PJ_SUCCESS) { 492 status=-21; goto on_error; 480 493 } 481 494 … … 582 595 * Repeated connect/accept on the same listener socket. 583 596 */ 584 static int compliance_test_2( void)597 static int compliance_test_2(pj_bool_t allow_concur) 585 598 { 586 599 #if defined(PJ_SYMBIAN) && PJ_SYMBIAN!=0 … … 648 661 } 649 662 663 664 // Concurrency 665 rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 666 if (rc != PJ_SUCCESS) { 667 app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc); 668 return -11; 669 } 650 670 651 671 // Allocate buffers for send and receive. … … 888 908 889 909 890 int tcp_ioqueue_test()910 static int tcp_ioqueue_test_impl(pj_bool_t allow_concur) 891 911 { 892 912 int status; 913 914 PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur)); 893 915 894 916 PJ_LOG(3, (THIS_FILE, "..%s compliance test 0 (success scenario)", 895 917 pj_ioqueue_name())); 896 if ((status=compliance_test_0( )) != 0) {918 if ((status=compliance_test_0(allow_concur)) != 0) { 897 919 PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status)); 898 920 return status; … … 900 922 PJ_LOG(3, (THIS_FILE, "..%s compliance test 1 (failed scenario)", 901 923 pj_ioqueue_name())); 902 if ((status=compliance_test_1( )) != 0) {924 if ((status=compliance_test_1(allow_concur)) != 0) { 903 925 PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status)); 904 926 return status; … … 907 929 PJ_LOG(3, (THIS_FILE, "..%s compliance test 2 (repeated accept)", 908 930 pj_ioqueue_name())); 909 if ((status=compliance_test_2( )) != 0) {931 if ((status=compliance_test_2(allow_concur)) != 0) { 910 932 PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status)); 911 933 return status; 912 934 } 935 936 return 0; 937 } 938 939 int tcp_ioqueue_test() 940 { 941 int rc; 942 943 rc = tcp_ioqueue_test_impl(PJ_TRUE); 944 if (rc != 0) 945 return rc; 946 947 rc = tcp_ioqueue_test_impl(PJ_FALSE); 948 if (rc != 0) 949 return rc; 913 950 914 951 return 0; -
pjproject/trunk/pjlib/src/pjlib-test/ioq_udp.c
r1405 r1789 126 126 * data between two sockets. 127 127 */ 128 static int compliance_test( void)128 static int compliance_test(pj_bool_t allow_concur) 129 129 { 130 130 pj_sock_t ssock=-1, csock=-1; … … 177 177 if (rc != PJ_SUCCESS) { 178 178 status=-20; goto on_error; 179 } 180 181 // Set concurrency 182 TRACE_("set concurrency..."); 183 rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 184 if (rc != PJ_SUCCESS) { 185 status=-21; goto on_error; 179 186 } 180 187 … … 352 359 * closed. 353 360 */ 354 static int unregister_test( void)361 static int unregister_test(pj_bool_t allow_concur) 355 362 { 356 363 enum { RPORT = 50000, SPORT = 50001 }; … … 382 389 } 383 390 391 // Set concurrency 392 TRACE_("set concurrency..."); 393 status = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); 394 if (status != PJ_SUCCESS) { 395 return -112; 396 } 397 384 398 /* Create sender socket */ 385 399 status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, SPORT, &ssock); … … 513 527 * of sockets to the ioqueue. 514 528 */ 515 static int many_handles_test( void)529 static int many_handles_test(pj_bool_t allow_concur) 516 530 { 517 531 enum { MAX = PJ_IOQUEUE_MAX_HANDLES }; … … 538 552 app_perror("...error in pj_ioqueue_create", rc); 539 553 return -10; 554 } 555 556 // Set concurrency 557 rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); 558 if (rc != PJ_SUCCESS) { 559 return -11; 540 560 } 541 561 … … 601 621 * Benchmarking IOQueue 602 622 */ 603 static int bench_test(int bufsize, int inactive_sock_count) 623 static int bench_test(pj_bool_t allow_concur, int bufsize, 624 int inactive_sock_count) 604 625 { 605 626 pj_sock_t ssock=-1, csock=-1; … … 649 670 if (rc != PJ_SUCCESS) { 650 671 app_perror("...error: pj_ioqueue_create()", rc); 672 goto on_error; 673 } 674 675 // Set concurrency 676 rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); 677 if (rc != PJ_SUCCESS) { 678 app_perror("...error: pj_ioqueue_set_default_concurrency()", rc); 651 679 goto on_error; 652 680 } … … 840 868 } 841 869 842 int udp_ioqueue_test()870 static int udp_ioqueue_test_imp(pj_bool_t allow_concur) 843 871 { 844 872 int status; 845 873 int bufsize, sock_count; 846 874 875 PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur)); 876 847 877 //goto pass1; 848 878 849 879 PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name())); 850 if ((status=compliance_test( )) != 0) {880 if ((status=compliance_test(allow_concur)) != 0) { 851 881 return status; 852 882 } … … 855 885 856 886 PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name())); 857 if ((status=unregister_test( )) != 0) {887 if ((status=unregister_test(allow_concur)) != 0) { 858 888 return status; 859 889 } 860 890 PJ_LOG(3, (THIS_FILE, "....unregister test ok")); 861 891 862 if ((status=many_handles_test( )) != 0) {892 if ((status=many_handles_test(allow_concur)) != 0) { 863 893 return status; 864 894 } … … 880 910 881 911 for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) { 882 if ((status=bench_test( bufsize, SOCK_INACTIVE_MIN)) != 0)912 if ((status=bench_test(allow_concur, bufsize, SOCK_INACTIVE_MIN)) != 0) 883 913 return status; 884 914 } … … 890 920 { 891 921 //PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count)); 892 if ((status=bench_test( bufsize, sock_count-2)) != 0)922 if ((status=bench_test(allow_concur, bufsize, sock_count-2)) != 0) 893 923 return status; 894 924 } 925 return 0; 926 } 927 928 int udp_ioqueue_test() 929 { 930 int rc; 931 932 rc = udp_ioqueue_test_imp(PJ_TRUE); 933 if (rc != 0) 934 return rc; 935 936 rc = udp_ioqueue_test_imp(PJ_FALSE); 937 if (rc != 0) 938 return rc; 939 895 940 return 0; 896 941 } -
pjproject/trunk/pjlib/src/pjlib-test/ioq_unreg.c
r1405 r1789 287 287 } 288 288 289 int udp_ioqueue_unreg_test(void)289 static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur) 290 290 { 291 291 enum { LOOP = 10 }; … … 294 294 pj_ioqueue_t *ioqueue; 295 295 pj_pool_t *test_pool; 296 296 297 PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur)); 298 297 299 test_method = UNREGISTER_IN_APP; 298 300 … … 305 307 } 306 308 309 rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); 310 if (rc != PJ_SUCCESS) { 311 app_perror("Error in pj_ioqueue_set_default_concurrency()", rc); 312 return -12; 313 } 307 314 308 315 PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3 (%s)", … … 352 359 } 353 360 354 361 int udp_ioqueue_unreg_test(void) 362 { 363 int rc; 364 365 rc = udp_ioqueue_unreg_test_imp(PJ_TRUE); 366 if (rc != 0) 367 return rc; 368 369 rc = udp_ioqueue_unreg_test_imp(PJ_FALSE); 370 if (rc != 0) 371 return rc; 372 373 return 0; 374 } 355 375 356 376 #else
Note: See TracChangeset
for help on using the changeset viewer.