Changeset 1789 for pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c
- Timestamp:
- Feb 13, 2008 3:17:28 PM (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c
r1405 r1789 115 115 enum handle_type hnd_type; 116 116 pj_ioqueue_callback cb; 117 pj_bool_t allow_concurrent; 117 118 118 119 #if PJ_HAS_TCP … … 124 125 pj_bool_t closing; 125 126 pj_time_val free_time; 127 pj_mutex_t *mutex; 126 128 #endif 127 129 … … 136 138 pj_lock_t *lock; 137 139 pj_bool_t auto_delete_lock; 140 pj_bool_t default_concurrency; 138 141 139 142 #if PJ_IOQUEUE_HAS_SAFE_UNREG … … 152 155 #endif 153 156 }; 157 158 159 #if PJ_IOQUEUE_HAS_SAFE_UNREG 160 /* Prototype */ 161 static void scan_closing_keys(pj_ioqueue_t *ioqueue); 162 #endif 154 163 155 164 … … 316 325 317 326 /* Create IOCP mutex */ 318 rc = pj_lock_create_ simple_mutex(pool, NULL, &ioqueue->lock);327 rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock); 319 328 if (rc != PJ_SUCCESS) { 320 329 CloseHandle(ioqueue->iocp); … … 323 332 324 333 ioqueue->auto_delete_lock = PJ_TRUE; 334 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY; 325 335 326 336 #if PJ_IOQUEUE_HAS_SAFE_UNREG … … 345 355 while (key != &ioqueue->free_list) { 346 356 pj_atomic_destroy(key->ref_count); 357 pj_mutex_destroy(key->mutex); 347 358 key = key->next; 348 359 } … … 351 362 } 352 363 364 rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex); 365 if (rc != PJ_SUCCESS) { 366 pj_atomic_destroy(key->ref_count); 367 key = ioqueue->free_list.next; 368 while (key != &ioqueue->free_list) { 369 pj_atomic_destroy(key->ref_count); 370 pj_mutex_destroy(key->mutex); 371 key = key->next; 372 } 373 CloseHandle(ioqueue->iocp); 374 return rc; 375 } 376 353 377 pj_list_push_back(&ioqueue->free_list, key); 354 355 378 } 356 379 #endif … … 393 416 while (key != &ioqueue->active_list) { 394 417 pj_atomic_destroy(key->ref_count); 418 pj_mutex_destroy(key->mutex); 395 419 key = key->next; 396 420 } … … 399 423 while (key != &ioqueue->closing_list) { 400 424 pj_atomic_destroy(key->ref_count); 425 pj_mutex_destroy(key->mutex); 401 426 key = key->next; 402 427 } … … 405 430 while (key != &ioqueue->free_list) { 406 431 pj_atomic_destroy(key->ref_count); 432 pj_mutex_destroy(key->mutex); 407 433 key = key->next; 408 434 } … … 412 438 pj_lock_destroy(ioqueue->lock); 413 439 440 return PJ_SUCCESS; 441 } 442 443 444 PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, 445 pj_bool_t allow) 446 { 447 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); 448 ioqueue->default_concurrency = allow; 414 449 return PJ_SUCCESS; 415 450 } … … 454 489 455 490 #if PJ_IOQUEUE_HAS_SAFE_UNREG 491 /* Scan closing list first to release unused keys. 492 * Must do this with lock acquired. 493 */ 494 scan_closing_keys(ioqueue); 495 456 496 /* If safe unregistration is used, then get the key record from 457 497 * the free list. … … 482 522 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 483 523 524 /* Set concurrency for this handle */ 525 rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency); 526 if (rc != PJ_SUCCESS) { 527 pj_lock_release(ioqueue->lock); 528 return rc; 529 } 530 484 531 #if PJ_HAS_TCP 485 532 rec->connecting = 0; … … 586 633 */ 587 634 if (pOv) { 635 pj_bool_t has_lock; 636 588 637 /* Event was dequeued for either successfull or failed I/O */ 589 638 key = (pj_ioqueue_key_t*)dwKey; … … 601 650 return PJ_TRUE; 602 651 652 /* If concurrency is disabled, lock the key 653 * (and save the lock status to local var since app may change 654 * concurrency setting while in the callback) */ 655 if (key->allow_concurrent == PJ_FALSE) { 656 pj_mutex_lock(key->mutex); 657 has_lock = PJ_TRUE; 658 } else { 659 has_lock = PJ_FALSE; 660 } 661 662 /* Now that we get the lock, check again that key is not closing */ 663 if (key->closing) { 664 if (has_lock) { 665 pj_mutex_unlock(key->mutex); 666 } 667 return PJ_TRUE; 668 } 669 603 670 /* Increment reference counter to prevent this key from being 604 671 * deleted 605 672 */ 606 673 pj_atomic_inc(key->ref_count); 674 #else 675 PJ_UNUSED_ARG(has_lock); 607 676 #endif 608 677 … … 655 724 #if PJ_IOQUEUE_HAS_SAFE_UNREG 656 725 decrement_counter(key); 726 if (has_lock) 727 pj_mutex_unlock(key->mutex); 657 728 #endif 658 729 … … 670 741 { 671 742 unsigned i; 743 pj_bool_t has_lock; 672 744 enum { RETRY = 10 }; 673 745 … … 697 769 /* Mark key as closing before closing handle. */ 698 770 key->closing = 1; 771 772 /* If concurrency is disabled, wait until the key has finished 773 * processing the callback 774 */ 775 if (key->allow_concurrent == PJ_FALSE) { 776 pj_mutex_lock(key->mutex); 777 has_lock = PJ_TRUE; 778 } else { 779 has_lock = PJ_FALSE; 780 } 781 #else 782 PJ_UNUSED_ARG(has_lock); 699 783 #endif 700 784 … … 718 802 * Forcing context switch seems to have fixed that, but this is quite 719 803 * an ugly solution.. 804 * 805 * Update 2008/02/13: 806 * This should not happen if concurrency is disallowed for the key. 807 * So at least application has a solution for this (i.e. by disallowing 808 * concurrency in the key). 720 809 */ 721 810 //This will loop forever if unregistration is done on the callback. … … 729 818 /* Decrement reference counter to destroy the key. */ 730 819 decrement_counter(key); 820 821 if (has_lock) 822 pj_mutex_unlock(key->mutex); 731 823 #endif 732 824 733 825 return PJ_SUCCESS; 734 826 } 827 828 #if PJ_IOQUEUE_HAS_SAFE_UNREG 829 /* Scan the closing list, and put pending closing keys to free list. 830 * Must do this with ioqueue mutex held. 831 */ 832 static void scan_closing_keys(pj_ioqueue_t *ioqueue) 833 { 834 if (!pj_list_empty(&ioqueue->closing_list)) { 835 pj_time_val now; 836 pj_ioqueue_key_t *key; 837 838 pj_gettimeofday(&now); 839 840 /* Move closing keys to free list when they've finished the closing 841 * idle time. 842 */ 843 key = ioqueue->closing_list.next; 844 while (key != &ioqueue->closing_list) { 845 pj_ioqueue_key_t *next = key->next; 846 847 pj_assert(key->closing != 0); 848 849 if (PJ_TIME_VAL_GTE(now, key->free_time)) { 850 pj_list_erase(key); 851 pj_list_push_back(&ioqueue->free_list, key); 852 } 853 key = next; 854 } 855 } 856 } 857 #endif 735 858 736 859 /* … … 767 890 /* Check the closing keys only when there's no activity and when there are 768 891 * pending closing keys. 769 * blp: 770 * no, always check the list. Otherwise on busy activity, this will cause 771 * ioqueue to reject new registration. 772 */ 773 if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) { 774 pj_time_val now; 775 pj_ioqueue_key_t *key; 776 777 pj_gettimeofday(&now); 778 779 /* Move closing keys to free list when they've finished the closing 780 * idle time. 781 */ 892 */ 893 if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 782 894 pj_lock_acquire(ioqueue->lock); 783 key = ioqueue->closing_list.next; 784 while (key != &ioqueue->closing_list) { 785 pj_ioqueue_key_t *next = key->next; 786 787 pj_assert(key->closing != 0); 788 789 if (PJ_TIME_VAL_GTE(now, key->free_time)) { 790 pj_list_erase(key); 791 pj_list_push_back(&ioqueue->free_list, key); 792 } 793 key = next; 794 } 895 scan_closing_keys(ioqueue); 795 896 pj_lock_release(ioqueue->lock); 796 897 } … … 1269 1370 } 1270 1371 1372 PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 1373 pj_bool_t allow) 1374 { 1375 PJ_ASSERT_RETURN(key, PJ_EINVAL); 1376 1377 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is 1378 * disabled. 1379 */ 1380 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); 1381 1382 key->allow_concurrent = allow; 1383 return PJ_SUCCESS; 1384 } 1385 1386 PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 1387 { 1388 #if PJ_IOQUEUE_HAS_SAFE_UNREG 1389 return pj_mutex_lock(key->mutex); 1390 #else 1391 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); 1392 #endif 1393 } 1394 1395 PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 1396 { 1397 #if PJ_IOQUEUE_HAS_SAFE_UNREG 1398 return pj_mutex_unlock(key->mutex); 1399 #else 1400 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); 1401 #endif 1402 } 1403
Note: See TracChangeset
for help on using the changeset viewer.