Changeset 365 for pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c
- Timestamp:
- Mar 30, 2006 4:32:18 PM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c
r349 r365 107 107 struct pj_ioqueue_key_t 108 108 { 109 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); 110 109 111 pj_ioqueue_t *ioqueue; 110 112 HANDLE hnd; 111 113 void *user_data; 112 114 enum handle_type hnd_type; 115 pj_ioqueue_callback cb; 116 113 117 #if PJ_HAS_TCP 114 118 int connecting; 115 119 #endif 116 pj_ioqueue_callback cb; 117 pj_bool_t has_quit_signal; 120 121 #if PJ_IOQUEUE_HAS_SAFE_UNREG 122 pj_atomic_t *ref_count; 123 pj_bool_t closing; 124 pj_time_val free_time; 125 #endif 126 118 127 }; 119 128 … … 126 135 pj_lock_t *lock; 127 136 pj_bool_t auto_delete_lock; 137 138 #if PJ_IOQUEUE_HAS_SAFE_UNREG 139 pj_ioqueue_key_t active_list; 140 pj_ioqueue_key_t free_list; 141 pj_ioqueue_key_t closing_list; 142 #endif 143 144 /* These are to keep track of connecting sockets */ 145 #if PJ_HAS_TCP 128 146 unsigned event_count; 129 147 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1]; 130 #if PJ_HAS_TCP131 148 unsigned connecting_count; 132 149 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; … … 280 297 { 281 298 pj_ioqueue_t *ioqueue; 299 unsigned i; 282 300 pj_status_t rc; 283 301 … … 291 309 sizeof(union operation_key), PJ_EBUG); 292 310 311 /* Create IOCP */ 293 312 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); 294 313 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); … … 296 315 return PJ_RETURN_OS_ERROR(GetLastError()); 297 316 317 /* Create IOCP mutex */ 298 318 rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); 299 319 if (rc != PJ_SUCCESS) { … … 304 324 ioqueue->auto_delete_lock = PJ_TRUE; 305 325 326 #if PJ_IOQUEUE_HAS_SAFE_UNREG 327 /* 328 * Create and initialize key pools. 329 */ 330 pj_list_init(&ioqueue->active_list); 331 pj_list_init(&ioqueue->free_list); 332 pj_list_init(&ioqueue->closing_list); 333 334 /* Preallocate keys according to max_fd setting, and put them 335 * in free_list. 336 */ 337 for (i=0; i<max_fd; ++i) { 338 pj_ioqueue_key_t *key; 339 340 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 341 342 rc = pj_atomic_create(pool, 0, &key->ref_count); 343 if (rc != PJ_SUCCESS) { 344 key = ioqueue->free_list.next; 345 while (key != &ioqueue->free_list) { 346 pj_atomic_destroy(key->ref_count); 347 key = key->next; 348 } 349 CloseHandle(ioqueue->iocp); 350 return rc; 351 } 352 353 pj_list_push_back(&ioqueue->free_list, key); 354 355 } 356 #endif 357 306 358 *p_ioqueue = ioqueue; 307 359 … … 316 368 { 317 369 unsigned i; 370 pj_ioqueue_key_t *key; 318 371 319 372 PJ_CHECK_STACK(); 320 373 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 321 374 375 pj_lock_acquire(ioqueue->lock); 376 377 #if PJ_HAS_TCP 322 378 /* Destroy events in the pool */ 323 379 for (i=0; i<ioqueue->event_count; ++i) { … … 325 381 } 326 382 ioqueue->event_count = 0; 383 #endif 327 384 328 385 if (CloseHandle(ioqueue->iocp) != TRUE) 329 386 return PJ_RETURN_OS_ERROR(GetLastError()); 387 388 #if PJ_IOQUEUE_HAS_SAFE_UNREG 389 /* Destroy reference counters */ 390 key = ioqueue->active_list.next; 391 while (key != &ioqueue->active_list) { 392 pj_atomic_destroy(key->ref_count); 393 key = key->next; 394 } 395 396 key = ioqueue->closing_list.next; 397 while (key != &ioqueue->closing_list) { 398 pj_atomic_destroy(key->ref_count); 399 key = key->next; 400 } 401 402 key = ioqueue->free_list.next; 403 while (key != &ioqueue->free_list) { 404 pj_atomic_destroy(key->ref_count); 405 key = key->next; 406 } 407 #endif 330 408 331 409 if (ioqueue->auto_delete_lock) … … 371 449 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); 372 450 451 pj_lock_acquire(ioqueue->lock); 452 453 #if PJ_IOQUEUE_HAS_SAFE_UNREG 454 /* If safe unregistration is used, then get the key record from 455 * the free list. 456 */ 457 if (pj_list_empty(&ioqueue->free_list)) { 458 pj_lock_release(ioqueue->lock); 459 return PJ_ETOOMANY; 460 } 461 462 rec = ioqueue->free_list.next; 463 pj_list_erase(rec); 464 465 /* Set initial reference count to 1 */ 466 pj_assert(pj_atomic_get(rec->ref_count) == 0); 467 pj_atomic_inc(rec->ref_count); 468 469 rec->closing = 0; 470 471 #else 472 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 473 #endif 474 373 475 /* Build the key for this socket. */ 374 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));375 476 rec->ioqueue = ioqueue; 376 477 rec->hnd = (HANDLE)sock; … … 379 480 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 380 481 482 #if PJ_HAS_TCP 483 rec->connecting = 0; 484 #endif 485 381 486 /* Set socket to nonblocking. */ 382 487 value = 1; 383 488 rc = ioctlsocket(sock, FIONBIO, &value); 384 489 if (rc != 0) { 490 pj_lock_release(ioqueue->lock); 385 491 return PJ_RETURN_OS_ERROR(WSAGetLastError()); 386 492 } … … 389 495 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); 390 496 if (!hioq) { 497 pj_lock_release(ioqueue->lock); 391 498 return PJ_RETURN_OS_ERROR(GetLastError()); 392 499 } 393 500 394 501 *key = rec; 502 503 #if PJ_IOQUEUE_HAS_SAFE_UNREG 504 pj_list_push_back(&ioqueue->active_list, rec); 505 #endif 506 507 pj_lock_release(ioqueue->lock); 508 395 509 return PJ_SUCCESS; 396 510 } … … 423 537 424 538 425 426 /* 427 * Internal function to poll the I/O Completion Port, execute callback, 539 #if PJ_IOQUEUE_HAS_SAFE_UNREG 540 /* Decrement the key's reference counter, and when the counter reach zero, 541 * destroy the key. 542 */ 543 static void decrement_counter(pj_ioqueue_key_t *key) 544 { 545 if (pj_atomic_dec_and_get(key->ref_count) == 0) { 546 547 pj_lock_acquire(key->ioqueue->lock); 548 549 pj_assert(key->closing == 1); 550 pj_gettimeofday(&key->free_time); 551 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 552 pj_time_val_normalize(&key->free_time); 553 554 pj_list_erase(key); 555 pj_list_push_back(&key->ioqueue->closing_list, key); 556 557 pj_lock_release(key->ioqueue->lock); 558 } 559 } 560 #endif 561 562 /* 563 * Poll the I/O Completion Port, execute callback, 428 564 * and return the key and bytes transfered of the last operation. 429 565 */ … … 458 594 *p_key = key; 459 595 460 /* If size_status is POST_QUIT_LEN, mark the key as quitting */ 461 if (size_status == POST_QUIT_LEN) {462 key->has_quit_signal = 1;596 #if PJ_IOQUEUE_HAS_SAFE_UNREG 597 /* We shouldn't call callbacks if key is quitting. */ 598 if (key->closing) 463 599 return PJ_TRUE; 464 } 465 466 /* We shouldn't call callbacks if key is quitting. 467 * But this should have been taken care by unregister function 468 * (the unregister function should have cleared out the callbacks) 600 601 /* Increment reference counter to prevent this key from being 602 * deleted 469 603 */ 604 pj_atomic_inc(key->ref_count); 605 #endif 470 606 471 607 /* Carry out the callback */ … … 505 641 break; 506 642 } 643 644 #if PJ_IOQUEUE_HAS_SAFE_UNREG 645 decrement_counter(key); 646 #endif 647 507 648 return PJ_TRUE; 508 649 } … … 517 658 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 518 659 { 519 pj_ssize_t polled_len;520 pj_ioqueue_key_t *polled_key;521 generic_overlapped ov;522 BOOL rc;523 524 660 PJ_ASSERT_RETURN(key, PJ_EINVAL); 525 661 … … 543 679 } 544 680 #endif 545 546 547 /* Unregistering handle from IOCP is pretty tricky. 548 * 549 * Even after the socket has been closed, GetQueuedCompletionStatus 550 * may still return events for the handle. This will likely to 551 * cause crash in pjlib, because the key associated with the handle 552 * most likely will have been destroyed. 553 * 554 * The solution is to poll the IOCP until we're sure that there are 555 * no further events for the handle. 556 */ 557 558 /* Clear up callbacks for the key. 559 * We don't want the callback to be called for this key. 560 */ 681 682 /* Close handle (the only way to disassociate handle from IOCP). 683 * We also need to close handle to make sure that no further events 684 * will come to the handle. 685 */ 686 CloseHandle(key->hnd); 687 688 /* Reset callbacks */ 689 key->cb.on_accept_complete = NULL; 690 key->cb.on_connect_complete = NULL; 561 691 key->cb.on_read_complete = NULL; 562 692 key->cb.on_write_complete = NULL; 563 key->cb.on_accept_complete = NULL; 564 key->cb.on_connect_complete = NULL; 565 566 /* Init overlapped struct */ 567 pj_memset(&ov, 0, sizeof(ov)); 568 ov.operation = PJ_IOQUEUE_OP_READ; 569 570 /* Post queued completion status with a special length. */ 571 rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN, 572 (DWORD)key, &ov.overlapped); 573 574 /* Poll IOCP until has_quit_signal is set in the key. 575 * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN 576 * is detected. We need to have this flag because POST_QUIT_LEN may be 577 * detected by other threads. 578 */ 579 do { 580 polled_len = 0; 581 polled_key = NULL; 582 583 rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key); 584 585 } while (rc && !key->has_quit_signal); 586 587 588 /* Close handle if this is a file. */ 589 if (key->hnd_type == HND_IS_FILE) { 590 CloseHandle(key->hnd); 591 } 693 694 #if PJ_IOQUEUE_HAS_SAFE_UNREG 695 /* Mark key as closing. */ 696 key->closing = 1; 697 698 /* Decrement reference counter. */ 699 decrement_counter(key); 700 701 /* Even after handle is closed, I suspect that IOCP may still try to 702 * do something with the handle, causing memory corruption when pool 703 * debugging is enabled. 704 * 705 * Forcing context switch seems to have fixed that, but this is quite 706 * an ugly solution.. 707 */ 708 pj_thread_sleep(0); 709 #endif 592 710 593 711 return PJ_SUCCESS; … … 603 721 DWORD dwMsec; 604 722 int connect_count = 0; 605 pj_bool_t has_event;723 int event_count = 0; 606 724 607 725 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 608 609 /* Check the connecting array. */610 #if PJ_HAS_TCP611 connect_count = check_connecting(ioqueue);612 #endif613 726 614 727 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ … … 616 729 617 730 /* Poll for completion status. */ 618 has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 731 event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 732 733 #if PJ_HAS_TCP 734 /* Check the connecting array, only when there's no activity. */ 735 if (event_count == 0) { 736 connect_count = check_connecting(ioqueue); 737 if (connect_count > 0) 738 event_count += connect_count; 739 } 740 #endif 741 742 #if PJ_IOQUEUE_HAS_SAFE_UNREG 743 /* Check the closing keys only when there's no activity and when there are 744 * pending closing keys. 745 */ 746 if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 747 pj_time_val now; 748 pj_ioqueue_key_t *key; 749 750 pj_gettimeofday(&now); 751 752 /* Move closing keys to free list when they've finished the closing 753 * idle time. 754 */ 755 pj_lock_acquire(ioqueue->lock); 756 key = ioqueue->closing_list.next; 757 while (key != &ioqueue->closing_list) { 758 pj_ioqueue_key_t *next = key->next; 759 760 pj_assert(key->closing != 0); 761 762 if (PJ_TIME_VAL_GTE(now, key->free_time)) { 763 pj_list_erase(key); 764 pj_list_push_back(&ioqueue->free_list, key); 765 } 766 key = next; 767 } 768 pj_lock_release(ioqueue->lock); 769 } 770 #endif 619 771 620 772 /* Return number of events. */ 621 return connect_count + has_event;773 return event_count; 622 774 } 623 775 … … 645 797 PJ_CHECK_STACK(); 646 798 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 799 800 #if PJ_IOQUEUE_HAS_SAFE_UNREG 801 /* Check key is not closing */ 802 if (key->closing) 803 return PJ_ECANCELLED; 804 #endif 647 805 648 806 op_key_rec = (union operation_key*)op_key->internal__; … … 716 874 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); 717 875 876 #if PJ_IOQUEUE_HAS_SAFE_UNREG 877 /* Check key is not closing */ 878 if (key->closing) 879 return PJ_ECANCELLED; 880 #endif 881 718 882 op_key_rec = (union operation_key*)op_key->internal__; 719 883 op_key_rec->overlapped.wsabuf.buf = buffer; … … 800 964 PJ_CHECK_STACK(); 801 965 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); 802 966 967 #if PJ_IOQUEUE_HAS_SAFE_UNREG 968 /* Check key is not closing */ 969 if (key->closing) 970 return PJ_ECANCELLED; 971 #endif 972 803 973 op_key_rec = (union operation_key*)op_key->internal__; 804 974 … … 873 1043 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 874 1044 1045 #if PJ_IOQUEUE_HAS_SAFE_UNREG 1046 /* Check key is not closing */ 1047 if (key->closing) 1048 return PJ_ECANCELLED; 1049 #endif 1050 875 1051 /* 876 1052 * See if there is a new connection immediately available. … … 962 1138 PJ_CHECK_STACK(); 963 1139 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 1140 1141 #if PJ_IOQUEUE_HAS_SAFE_UNREG 1142 /* Check key is not closing */ 1143 if (key->closing) 1144 return PJ_ECANCELLED; 1145 #endif 964 1146 965 1147 /* Initiate connect() */
Note: See TracChangeset
for help on using the changeset viewer.