Ignore:
Timestamp:
Sep 18, 2008 9:22:16 PM (16 years ago)
Author:
bennylp
Message:

Ticket #622: initial integration of ioqueue_epoll patch, updated the configure script

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_epoll.c

    r2039 r2295  
    176176    struct epoll_event *events; 
    177177    struct queue       *queue; 
     178 
     179#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     180    pj_mutex_t         *ref_cnt_mutex; 
     181    pj_ioqueue_key_t    active_list; 
     182    pj_ioqueue_key_t    closing_list; 
     183    pj_ioqueue_key_t    free_list; 
     184#endif 
    178185}; 
    179186 
     
    182189 */ 
    183190#include "ioqueue_common_abs.c" 
     191 
     192#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     193/* Scan closing keys to be put to free list again */ 
     194static void scan_closing_keys(pj_ioqueue_t *ioqueue); 
     195#endif 
    184196 
    185197/* 
     
    207219    pj_status_t rc; 
    208220    pj_lock_t *lock; 
     221    int i; 
    209222 
    210223    /* Check that arguments are valid. */ 
     
    223236    ioqueue->count = 0; 
    224237    pj_list_init(&ioqueue->hlist); 
     238 
     239#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     240    /* When safe unregistration is used (the default), we pre-create 
     241     * all keys and put them in the free list. 
     242     */ 
     243 
     244    /* Mutex to protect key's reference counter  
     245     * We don't want to use key's mutex or ioqueue's mutex because 
     246     * that would create deadlock situation in some cases. 
     247     */ 
     248    rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); 
     249    if (rc != PJ_SUCCESS) 
     250        return rc; 
     251 
     252 
     253    /* Init key list */ 
     254    pj_list_init(&ioqueue->free_list); 
     255    pj_list_init(&ioqueue->closing_list); 
     256 
     257 
     258    /* Pre-create all keys according to max_fd */ 
     259    for ( i=0; i<max_fd; ++i) { 
     260        pj_ioqueue_key_t *key; 
     261 
     262        key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); 
     263        key->ref_count = 0; 
     264        rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 
     265        if (rc != PJ_SUCCESS) { 
     266            key = ioqueue->free_list.next; 
     267            while (key != &ioqueue->free_list) { 
     268                pj_mutex_destroy(key->mutex); 
     269                key = key->next; 
     270            } 
     271            pj_mutex_destroy(ioqueue->ref_cnt_mutex); 
     272            return rc; 
     273        } 
     274 
     275        pj_list_push_back(&ioqueue->free_list, key); 
     276    } 
     277#endif 
    225278 
    226279    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); 
     
    257310PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 
    258311{ 
     312    pj_ioqueue_key_t *key; 
     313 
    259314    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
    260315    PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP); 
     
    263318    os_close(ioqueue->epfd); 
    264319    ioqueue->epfd = 0; 
     320 
     321#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     322    /* Destroy reference counters */ 
     323    key = ioqueue->active_list.next; 
     324    while (key != &ioqueue->active_list) { 
     325        pj_mutex_destroy(key->mutex); 
     326        key = key->next; 
     327    } 
     328 
     329    key = ioqueue->closing_list.next; 
     330    while (key != &ioqueue->closing_list) { 
     331        pj_mutex_destroy(key->mutex); 
     332        key = key->next; 
     333    } 
     334 
     335    key = ioqueue->free_list.next; 
     336    while (key != &ioqueue->free_list) { 
     337        pj_mutex_destroy(key->mutex); 
     338        key = key->next; 
     339    } 
     340 
     341    pj_mutex_destroy(ioqueue->ref_cnt_mutex); 
     342#endif 
    265343    return ioqueue_destroy(ioqueue); 
    266344} 
     
    304382    } 
    305383 
     384    /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get 
     385     * the key from the free list. Otherwise allocate a new one.  
     386     */ 
     387#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     388 
     389    /* Scan closing_keys first to let them come back to free_list */ 
     390    scan_closing_keys(ioqueue); 
     391 
     392    pj_assert(!pj_list_empty(&ioqueue->free_list)); 
     393    if (pj_list_empty(&ioqueue->free_list)) { 
     394        rc = PJ_ETOOMANY; 
     395        goto on_return; 
     396    } 
     397 
     398    key = ioqueue->free_list.next; 
     399    pj_list_erase(key); 
     400#else 
    306401    /* Create key. */ 
    307402    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
     403#endif 
     404 
    308405    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 
    309406    if (rc != PJ_SUCCESS) { 
     
    313410 
    314411    /* Create key's mutex */ 
    315     rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 
     412 /*   rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 
    316413    if (rc != PJ_SUCCESS) { 
    317414        key = NULL; 
    318415        goto on_return; 
    319416    } 
    320  
     417*/ 
    321418    /* os_epoll_ctl. */ 
    322419    ev.events = EPOLLIN | EPOLLERR; 
     
    346443} 
    347444 
     445#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     446/* Increment key's reference counter */ 
     447static void increment_counter(pj_ioqueue_key_t *key) 
     448{ 
     449    pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 
     450    ++key->ref_count; 
     451    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 
     452} 
     453 
     454/* Decrement the key's reference counter, and when the counter reach zero, 
     455 * destroy the key. 
     456 * 
     457 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. 
     458 */ 
     459static void decrement_counter(pj_ioqueue_key_t *key) 
     460{ 
     461    pj_lock_acquire(key->ioqueue->lock); 
     462    pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 
     463    --key->ref_count; 
     464    if (key->ref_count == 0) { 
     465 
     466        pj_assert(key->closing == 1); 
     467        pj_gettimeofday(&key->free_time); 
     468        key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 
     469        pj_time_val_normalize(&key->free_time); 
     470 
     471        pj_list_erase(key); 
     472        pj_list_push_back(&key->ioqueue->closing_list, key); 
     473 
     474    } 
     475    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 
     476    pj_lock_release(key->ioqueue->lock); 
     477} 
     478#endif 
     479 
    348480/* 
    349481 * pj_ioqueue_unregister() 
     
    364496    pj_assert(ioqueue->count > 0); 
    365497    --ioqueue->count; 
     498#if !PJ_IOQUEUE_HAS_SAFE_UNREG 
    366499    pj_list_erase(key); 
     500#endif 
    367501 
    368502    ev.events = 0; 
     
    375509    } 
    376510 
    377     pj_lock_release(ioqueue->lock); 
    378  
    379511    /* Destroy the key. */ 
    380512    pj_sock_close(key->fd); 
     513 
     514    pj_lock_release(ioqueue->lock); 
     515 
     516 
     517#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     518    /* Mark key is closing. */ 
     519    key->closing = 1; 
     520 
     521    /* Decrement counter. */ 
     522    decrement_counter(key); 
     523 
     524    /* Done. */ 
     525    pj_mutex_unlock(key->mutex); 
     526#else 
    381527    pj_mutex_destroy(key->mutex); 
     528#endif 
    382529 
    383530    return PJ_SUCCESS; 
     
    420567    }    
    421568} 
     569 
     570#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     571/* Scan closing keys to be put to free list again */ 
     572static void scan_closing_keys(pj_ioqueue_t *ioqueue) 
     573{ 
     574    pj_time_val now; 
     575    pj_ioqueue_key_t *h; 
     576 
     577    pj_gettimeofday(&now); 
     578    h = ioqueue->closing_list.next; 
     579    while (h != &ioqueue->closing_list) { 
     580        pj_ioqueue_key_t *next = h->next; 
     581 
     582        pj_assert(h->closing != 0); 
     583 
     584        if (PJ_TIME_VAL_GTE(now, h->free_time)) { 
     585            pj_list_erase(h); 
     586            pj_list_push_back(&ioqueue->free_list, h); 
     587        } 
     588        h = next; 
     589    } 
     590} 
     591#endif 
    422592 
    423593/* 
     
    442612    count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); 
    443613    if (count == 0) { 
     614#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     615    /* Check the closing keys only when there's no activity and when there are 
     616     * pending closing keys. 
     617     */ 
     618    if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 
     619        pj_lock_acquire(ioqueue->lock); 
     620        scan_closing_keys(ioqueue); 
     621        pj_lock_release(ioqueue->lock); 
     622    } 
     623#endif 
    444624        TRACE_((THIS_FILE, "os_epoll_wait timed out")); 
    445625        return count; 
     
    468648        if ((events[i].events & EPOLLIN) &&  
    469649            (key_has_pending_read(h) || key_has_pending_accept(h))) { 
     650 
     651#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     652            increment_counter(h); 
     653#endif 
    470654            queue[processed].key = h; 
    471655            queue[processed].event_type = READABLE_EVENT; 
     
    477661         */ 
    478662        if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) { 
     663 
     664#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     665            increment_counter(h); 
     666#endif 
    479667            queue[processed].key = h; 
    480668            queue[processed].event_type = WRITEABLE_EVENT; 
     
    487675         */ 
    488676        if ((events[i].events & EPOLLOUT) && (h->connecting)) { 
     677 
     678#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     679            increment_counter(h); 
     680#endif 
    489681            queue[processed].key = h; 
    490682            queue[processed].event_type = WRITEABLE_EVENT; 
     
    497689         */ 
    498690        if (events[i].events & EPOLLERR && (h->connecting)) { 
     691                 
     692#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     693            increment_counter(h); 
     694#endif           
    499695            queue[processed].key = h; 
    500696            queue[processed].event_type = EXCEPTION_EVENT; 
     
    520716            break; 
    521717        } 
     718 
     719#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     720        decrement_counter(queue[i].key); 
     721#endif 
    522722    } 
    523723 
Note: See TracChangeset for help on using the changeset viewer.