Changeset 365 for pjproject/trunk/pjlib/src/pj/ioqueue_select.c
- Timestamp:
- Mar 30, 2006 4:32:18 PM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/src/pj/ioqueue_select.c
r126 r365 110 110 111 111 unsigned max, count; 112 pj_ioqueue_key_t key_list;112 pj_ioqueue_key_t active_list; 113 113 pj_fd_set_t rfdset; 114 114 pj_fd_set_t wfdset; … … 116 116 pj_fd_set_t xfdset; 117 117 #endif 118 119 #if PJ_IOQUEUE_HAS_SAFE_UNREG 120 pj_mutex_t *ref_cnt_mutex; 121 pj_ioqueue_key_t closing_list; 122 pj_ioqueue_key_t free_list; 123 #endif 118 124 }; 119 125 … … 142 148 pj_ioqueue_t *ioqueue; 143 149 pj_lock_t *lock; 150 unsigned i; 144 151 pj_status_t rc; 145 152 … … 153 160 sizeof(union operation_key), PJ_EBUG); 154 161 162 /* Create and init common ioqueue stuffs */ 155 163 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 156 157 164 ioqueue_init(ioqueue); 158 165 … … 164 171 PJ_FD_ZERO(&ioqueue->xfdset); 165 172 #endif 166 pj_list_init(&ioqueue->key_list); 167 173 pj_list_init(&ioqueue->active_list); 174 175 #if PJ_IOQUEUE_HAS_SAFE_UNREG 176 /* When safe unregistration is used (the default), we pre-create 177 * all keys and put them in the free list. 178 */ 179 180 /* Mutex to protect key's reference counter 181 * We don't want to use key's mutex or ioqueue's mutex because 182 * that would create deadlock situation in some cases. 183 */ 184 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); 185 if (rc != PJ_SUCCESS) 186 return rc; 187 188 189 /* Init key list */ 190 pj_list_init(&ioqueue->free_list); 191 pj_list_init(&ioqueue->closing_list); 192 193 194 /* Pre-create all keys according to max_fd */ 195 for (i=0; i<max_fd; ++i) { 196 pj_ioqueue_key_t *key; 197 198 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 199 key->ref_count = 0; 200 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 201 if (rc != PJ_SUCCESS) { 202 key = ioqueue->free_list.next; 203 while (key != &ioqueue->free_list) { 204 pj_mutex_destroy(key->mutex); 205 key = key->next; 206 } 207 pj_mutex_destroy(ioqueue->ref_cnt_mutex); 208 return rc; 209 } 210 211 pj_list_push_back(&ioqueue->free_list, key); 212 } 213 #endif 214 215 /* Create and init ioqueue mutex */ 168 216 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); 169 217 if (rc != PJ_SUCCESS) … … 187 235 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 188 236 { 237 pj_ioqueue_key_t *key; 238 189 239 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 190 240 191 241 pj_lock_acquire(ioqueue->lock); 242 243 #if PJ_IOQUEUE_HAS_SAFE_UNREG 244 /* Destroy reference counters */ 245 key = ioqueue->active_list.next; 246 while (key != &ioqueue->active_list) { 247 pj_mutex_destroy(key->mutex); 248 key = key->next; 249 } 250 251 key = ioqueue->closing_list.next; 252 while (key != &ioqueue->closing_list) { 253 pj_mutex_destroy(key->mutex); 254 key = key->next; 255 } 256 257 key = ioqueue->free_list.next; 258 while (key != &ioqueue->free_list) { 259 pj_mutex_destroy(key->mutex); 260 key = key->next; 261 } 262 263 pj_mutex_destroy(ioqueue->ref_cnt_mutex); 264 #endif 265 192 266 return ioqueue_destroy(ioqueue); 193 267 } … … 197 271 * pj_ioqueue_register_sock() 198 272 * 199 * Register ahandle to ioqueue.273 * Register socket handle to ioqueue. 200 274 */ 201 275 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, … … 217 291 if (ioqueue->count >= ioqueue->max) { 218 292 rc = PJ_ETOOMANY; 293 goto on_return; 294 } 295 296 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get 297 * the key from the free list. Otherwise allocate a new one. 298 */ 299 #if PJ_IOQUEUE_HAS_SAFE_UNREG 300 pj_assert(!pj_list_empty(&ioqueue->free_list)); 301 if (pj_list_empty(&ioqueue->free_list)) { 302 rc = PJ_ETOOMANY; 303 goto on_return; 304 } 305 306 key = ioqueue->free_list.next; 307 pj_list_erase(key); 308 #else 309 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 310 #endif 311 312 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 313 if (rc != PJ_SUCCESS) { 314 key = NULL; 219 315 goto on_return; 220 316 } … … 232 328 } 233 329 234 /* Create key. */ 235 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 236 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 237 if (rc != PJ_SUCCESS) { 238 key = NULL; 239 goto on_return; 240 } 241 242 /* Register */ 243 pj_list_insert_before(&ioqueue->key_list, key); 330 331 /* Put in active list. */ 332 pj_list_insert_before(&ioqueue->active_list, key); 244 333 ++ioqueue->count; 245 334 … … 252 341 } 253 342 343 #if PJ_IOQUEUE_HAS_SAFE_UNREG 344 /* Increment key's reference counter */ 345 static void increment_counter(pj_ioqueue_key_t *key) 346 { 347 pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 348 ++key->ref_count; 349 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 350 } 351 352 /* Decrement the key's reference counter, and when the counter reach zero, 353 * destroy the key. 354 * 355 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. 356 */ 357 static void decrement_counter(pj_ioqueue_key_t *key) 358 { 359 pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 360 --key->ref_count; 361 if (key->ref_count == 0) { 362 363 pj_assert(key->closing == 1); 364 pj_gettimeofday(&key->free_time); 365 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 366 pj_time_val_normalize(&key->free_time); 367 368 pj_lock_acquire(key->ioqueue->lock); 369 pj_list_erase(key); 370 pj_list_push_back(&key->ioqueue->closing_list, key); 371 pj_lock_release(key->ioqueue->lock); 372 } 373 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 374 } 375 #endif 376 377 254 378 /* 255 379 * pj_ioqueue_unregister() … … 265 389 ioqueue = key->ioqueue; 266 390 391 /* Lock the key to make sure no callback is simultaneously modifying 392 * the key. We need to lock the key before ioqueue here to prevent 393 * deadlock. 394 */ 395 pj_mutex_lock(key->mutex); 396 397 /* Also lock ioqueue */ 267 398 pj_lock_acquire(ioqueue->lock); 268 399 … … 276 407 #endif 277 408 278 /* ioqueue_destroy may try to acquire key's mutex. 279 * Since normally the order of locking is to lock key's mutex first 280 * then ioqueue's mutex, ioqueue_destroy may deadlock unless we 281 * release ioqueue's mutex first. 409 /* Close socket. */ 410 pj_sock_close(key->fd); 411 412 /* Clear callback */ 413 key->cb.on_accept_complete = NULL; 414 key->cb.on_connect_complete = NULL; 415 key->cb.on_read_complete = NULL; 416 key->cb.on_write_complete = NULL; 417 418 /* Must release ioqueue lock first before decrementing counter, to 419 * prevent deadlock. 282 420 */ 283 421 pj_lock_release(ioqueue->lock); 284 422 285 /* Destroy the key. */ 286 ioqueue_destroy_key(key); 423 #if PJ_IOQUEUE_HAS_SAFE_UNREG 424 /* Mark key is closing. */ 425 key->closing = 1; 426 427 /* Decrement counter. */ 428 decrement_counter(key); 429 430 /* Done. */ 431 pj_mutex_unlock(key->mutex); 432 #else 433 pj_mutex_destroy(key->mutex); 434 #endif 287 435 288 436 return PJ_SUCCESS; … … 309 457 pj_assert(0); 310 458 311 key = ioqueue-> key_list.next;312 while (key != &ioqueue-> key_list) {459 key = ioqueue->active_list.next; 460 while (key != &ioqueue->active_list) { 313 461 if (!pj_list_empty(&key->read_list) 314 462 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 … … 396 544 } 397 545 546 #if PJ_IOQUEUE_HAS_SAFE_UNREG 547 /* Scan closing keys to be put to free list again */ 548 static void scan_closing_keys(pj_ioqueue_t *ioqueue) 549 { 550 pj_time_val now; 551 pj_ioqueue_key_t *h; 552 553 pj_gettimeofday(&now); 554 h = ioqueue->closing_list.next; 555 while (h != &ioqueue->closing_list) { 556 pj_ioqueue_key_t *next = h->next; 557 558 pj_assert(h->closing != 0); 559 560 if (PJ_TIME_VAL_GTE(now, h->free_time)) { 561 pj_list_erase(h); 562 pj_list_push_back(&ioqueue->free_list, h); 563 } 564 h = next; 565 } 566 } 567 #endif 568 569 398 570 /* 399 571 * pj_ioqueue_poll() … … 436 608 PJ_FD_COUNT(&ioqueue->xfdset)==0) 437 609 { 438 pj_lock_release(ioqueue->lock); 610 #if PJ_IOQUEUE_HAS_SAFE_UNREG 611 scan_closing_keys(ioqueue); 612 #endif 613 pj_lock_release(ioqueue->lock); 439 614 if (timeout) 440 615 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); … … 476 651 * coming with accept(). 477 652 */ 478 h = ioqueue->key_list.next; 479 for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { 653 h = ioqueue->active_list.next; 654 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) { 655 480 656 if ( (key_has_pending_write(h) || key_has_pending_connect(h)) 481 && PJ_FD_ISSET(h->fd, &wfdset) )657 && PJ_FD_ISSET(h->fd, &wfdset) && !h->closing) 482 658 { 659 #if PJ_IOQUEUE_HAS_SAFE_UNREG 660 increment_counter(h); 661 #endif 483 662 event[counter].key = h; 484 663 event[counter].event_type = WRITEABLE_EVENT; … … 488 667 /* Scan for readable socket. */ 489 668 if ((key_has_pending_read(h) || key_has_pending_accept(h)) 490 && PJ_FD_ISSET(h->fd, &rfdset) )669 && PJ_FD_ISSET(h->fd, &rfdset) && !h->closing) 491 670 { 671 #if PJ_IOQUEUE_HAS_SAFE_UNREG 672 increment_counter(h); 673 #endif 492 674 event[counter].key = h; 493 675 event[counter].event_type = READABLE_EVENT; … … 496 678 497 679 #if PJ_HAS_TCP 498 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { 680 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && 681 !h->closing) 682 { 683 #if PJ_IOQUEUE_HAS_SAFE_UNREG 684 increment_counter(h); 685 #endif 499 686 event[counter].key = h; 500 687 event[counter].event_type = EXCEPTION_EVENT; … … 526 713 break; 527 714 } 528 } 715 716 #if PJ_IOQUEUE_HAS_SAFE_UNREG 717 decrement_counter(event[counter].key); 718 #endif 719 } 720 529 721 530 722 return count;
Note: See TracChangeset
for help on using the changeset viewer.