Changeset 2295 for pjproject/trunk/pjlib/src/pj/ioqueue_epoll.c
- Timestamp:
- Sep 18, 2008 9:22:16 PM (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/src/pj/ioqueue_epoll.c
r2039 r2295 176 176 struct epoll_event *events; 177 177 struct queue *queue; 178 179 #if PJ_IOQUEUE_HAS_SAFE_UNREG 180 pj_mutex_t *ref_cnt_mutex; 181 pj_ioqueue_key_t active_list; 182 pj_ioqueue_key_t closing_list; 183 pj_ioqueue_key_t free_list; 184 #endif 178 185 }; 179 186 … … 182 189 */ 183 190 #include "ioqueue_common_abs.c" 191 192 #if PJ_IOQUEUE_HAS_SAFE_UNREG 193 /* Scan closing keys to be put to free list again */ 194 static void scan_closing_keys(pj_ioqueue_t *ioqueue); 195 #endif 184 196 185 197 /* … … 207 219 pj_status_t rc; 208 220 pj_lock_t *lock; 221 int i; 209 222 210 223 /* Check that arguments are valid. */ … … 223 236 ioqueue->count = 0; 224 237 pj_list_init(&ioqueue->hlist); 238 239 #if PJ_IOQUEUE_HAS_SAFE_UNREG 240 /* When safe unregistration is used (the default), we pre-create 241 * all keys and put them in the free list. 242 */ 243 244 /* Mutex to protect key's reference counter 245 * We don't want to use key's mutex or ioqueue's mutex because 246 * that would create deadlock situation in some cases. 247 */ 248 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); 249 if (rc != PJ_SUCCESS) 250 return rc; 251 252 253 /* Init key list */ 254 pj_list_init(&ioqueue->free_list); 255 pj_list_init(&ioqueue->closing_list); 256 257 258 /* Pre-create all keys according to max_fd */ 259 for ( i=0; i<max_fd; ++i) { 260 pj_ioqueue_key_t *key; 261 262 key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); 263 key->ref_count = 0; 264 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 265 if (rc != PJ_SUCCESS) { 266 key = ioqueue->free_list.next; 267 while (key != &ioqueue->free_list) { 268 pj_mutex_destroy(key->mutex); 269 key = key->next; 270 } 271 pj_mutex_destroy(ioqueue->ref_cnt_mutex); 272 return rc; 273 } 274 275 pj_list_push_back(&ioqueue->free_list, key); 276 } 277 #endif 225 278 226 279 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); … … 257 310 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 258 311 { 312 pj_ioqueue_key_t *key; 313 259 314 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 260 315 PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP); … … 263 318 os_close(ioqueue->epfd); 264 319 ioqueue->epfd = 0; 320 321 #if PJ_IOQUEUE_HAS_SAFE_UNREG 322 /* Destroy reference counters */ 323 key = ioqueue->active_list.next; 324 while (key != &ioqueue->active_list) { 325 pj_mutex_destroy(key->mutex); 326 key = key->next; 327 } 328 329 key = ioqueue->closing_list.next; 330 while (key != &ioqueue->closing_list) { 331 pj_mutex_destroy(key->mutex); 332 key = key->next; 333 } 334 335 key = ioqueue->free_list.next; 336 while (key != &ioqueue->free_list) { 337 pj_mutex_destroy(key->mutex); 338 key = key->next; 339 } 340 341 pj_mutex_destroy(ioqueue->ref_cnt_mutex); 342 #endif 265 343 return ioqueue_destroy(ioqueue); 266 344 } … … 304 382 } 305 383 384 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get 385 * the key from the free list. Otherwise allocate a new one. 386 */ 387 #if PJ_IOQUEUE_HAS_SAFE_UNREG 388 389 /* Scan closing_keys first to let them come back to free_list */ 390 scan_closing_keys(ioqueue); 391 392 pj_assert(!pj_list_empty(&ioqueue->free_list)); 393 if (pj_list_empty(&ioqueue->free_list)) { 394 rc = PJ_ETOOMANY; 395 goto on_return; 396 } 397 398 key = ioqueue->free_list.next; 399 pj_list_erase(key); 400 #else 306 401 /* Create key. */ 307 402 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 403 #endif 404 308 405 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 309 406 if (rc != PJ_SUCCESS) { … … 313 410 314 411 /* Create key's mutex */ 315 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);412 /* rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 316 413 if (rc != PJ_SUCCESS) { 317 414 key = NULL; 318 415 goto on_return; 319 416 } 320 417 */ 321 418 /* os_epoll_ctl. */ 322 419 ev.events = EPOLLIN | EPOLLERR; … … 346 443 } 347 444 445 #if PJ_IOQUEUE_HAS_SAFE_UNREG 446 /* Increment key's reference counter */ 447 static void increment_counter(pj_ioqueue_key_t *key) 448 { 449 pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 450 ++key->ref_count; 451 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 452 } 453 454 /* Decrement the key's reference counter, and when the counter reach zero, 455 * destroy the key. 456 * 457 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. 458 */ 459 static void decrement_counter(pj_ioqueue_key_t *key) 460 { 461 pj_lock_acquire(key->ioqueue->lock); 462 pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 463 --key->ref_count; 464 if (key->ref_count == 0) { 465 466 pj_assert(key->closing == 1); 467 pj_gettimeofday(&key->free_time); 468 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 469 pj_time_val_normalize(&key->free_time); 470 471 pj_list_erase(key); 472 pj_list_push_back(&key->ioqueue->closing_list, key); 473 474 } 475 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 476 pj_lock_release(key->ioqueue->lock); 477 } 478 #endif 479 348 480 /* 349 481 * pj_ioqueue_unregister() … … 364 496 pj_assert(ioqueue->count > 0); 365 497 --ioqueue->count; 498 #if !PJ_IOQUEUE_HAS_SAFE_UNREG 366 499 pj_list_erase(key); 500 #endif 367 501 368 502 ev.events = 0; … … 375 509 } 376 510 377 pj_lock_release(ioqueue->lock);378 379 511 /* Destroy the key. */ 380 512 pj_sock_close(key->fd); 513 514 pj_lock_release(ioqueue->lock); 515 516 517 #if PJ_IOQUEUE_HAS_SAFE_UNREG 518 /* Mark key is closing. */ 519 key->closing = 1; 520 521 /* Decrement counter. */ 522 decrement_counter(key); 523 524 /* Done. */ 525 pj_mutex_unlock(key->mutex); 526 #else 381 527 pj_mutex_destroy(key->mutex); 528 #endif 382 529 383 530 return PJ_SUCCESS; … … 420 567 } 421 568 } 569 570 #if PJ_IOQUEUE_HAS_SAFE_UNREG 571 /* Scan closing keys to be put to free list again */ 572 static void scan_closing_keys(pj_ioqueue_t *ioqueue) 573 { 574 pj_time_val now; 575 pj_ioqueue_key_t *h; 576 577 pj_gettimeofday(&now); 578 h = ioqueue->closing_list.next; 579 while (h != &ioqueue->closing_list) { 580 pj_ioqueue_key_t *next = h->next; 581 582 pj_assert(h->closing != 0); 583 584 if (PJ_TIME_VAL_GTE(now, h->free_time)) { 585 pj_list_erase(h); 586 pj_list_push_back(&ioqueue->free_list, h); 587 } 588 h = next; 589 } 590 } 591 #endif 422 592 423 593 /* … … 442 612 count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); 443 613 if (count == 0) { 614 #if PJ_IOQUEUE_HAS_SAFE_UNREG 615 /* Check the closing keys only when there's no activity and when there are 616 * pending closing keys. 617 */ 618 if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 619 pj_lock_acquire(ioqueue->lock); 620 scan_closing_keys(ioqueue); 621 pj_lock_release(ioqueue->lock); 622 } 623 #endif 444 624 TRACE_((THIS_FILE, "os_epoll_wait timed out")); 445 625 return count; … … 468 648 if ((events[i].events & EPOLLIN) && 469 649 (key_has_pending_read(h) || key_has_pending_accept(h))) { 650 651 #if PJ_IOQUEUE_HAS_SAFE_UNREG 652 increment_counter(h); 653 #endif 470 654 queue[processed].key = h; 471 655 queue[processed].event_type = READABLE_EVENT; … … 477 661 */ 478 662 if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) { 663 664 #if PJ_IOQUEUE_HAS_SAFE_UNREG 665 increment_counter(h); 666 #endif 479 667 queue[processed].key = h; 480 668 queue[processed].event_type = WRITEABLE_EVENT; … … 487 675 */ 488 676 if ((events[i].events & EPOLLOUT) && (h->connecting)) { 677 678 #if PJ_IOQUEUE_HAS_SAFE_UNREG 679 increment_counter(h); 680 #endif 489 681 queue[processed].key = h; 490 682 queue[processed].event_type = WRITEABLE_EVENT; … … 497 689 */ 498 690 if (events[i].events & EPOLLERR && (h->connecting)) { 691 692 #if PJ_IOQUEUE_HAS_SAFE_UNREG 693 increment_counter(h); 694 #endif 499 695 queue[processed].key = h; 500 696 queue[processed].event_type = EXCEPTION_EVENT; … … 520 716 break; 521 717 } 718 719 #if PJ_IOQUEUE_HAS_SAFE_UNREG 720 decrement_counter(queue[i].key); 721 #endif 522 722 } 523 723
Note: See TracChangeset
for help on using the changeset viewer.