Ignore:
Timestamp:
Nov 6, 2015 4:18:46 AM (8 years ago)
Author:
nanang
Message:

Close #1894: Improve ioqueue performance on multithreadeded environment.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_select.c

    r4991 r5194  
    334334    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && 
    335335                     cb && p_key, PJ_EINVAL); 
     336 
     337    /* On platforms with fd_set containing fd bitmap such as *nix family, 
     338     * avoid potential memory corruption caused by select() when given 
     339     * an fd that is higher than FD_SETSIZE. 
     340     */ 
     341    if (sizeof(fd_set) < FD_SETSIZE && sock >= PJ_IOQUEUE_MAX_HANDLES) 
     342        return PJ_ETOOBIG; 
    336343 
    337344    pj_lock_acquire(ioqueue->lock); 
     
    832839    pj_fd_set_t rfdset, wfdset, xfdset; 
    833840    int nfds; 
    834     int count, i, counter; 
     841    int i, count, event_cnt, processed_cnt; 
    835842    pj_ioqueue_key_t *h; 
     843    enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; 
    836844    struct event 
    837845    { 
    838846        pj_ioqueue_key_t        *key; 
    839847        enum ioqueue_event_type  event_type; 
    840     } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
     848    } event[MAX_EVENTS]; 
    841849 
    842850    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 
     
    890898    else if (count < 0) 
    891899        return -pj_get_netos_error(); 
    892     else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) 
    893         count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; 
    894900 
    895901    /* Scan descriptor sets for event and add the events in the event 
     
    899905    pj_lock_acquire(ioqueue->lock); 
    900906 
    901     counter = 0; 
     907    event_cnt = 0; 
    902908 
    903909    /* Scan for writable sockets first to handle piggy-back data 
    904910     * coming with accept(). 
    905911     */ 
    906     h = ioqueue->active_list.next; 
    907     for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) { 
     912    for (h = ioqueue->active_list.next; 
     913         h != &ioqueue->active_list && event_cnt < MAX_EVENTS; 
     914         h = h->next) 
     915    { 
    908916 
    909917        if ( (key_has_pending_write(h) || key_has_pending_connect(h)) 
     
    913921            increment_counter(h); 
    914922#endif 
    915             event[counter].key = h; 
    916             event[counter].event_type = WRITEABLE_EVENT; 
    917             ++counter; 
     923            event[event_cnt].key = h; 
     924            event[event_cnt].event_type = WRITEABLE_EVENT; 
     925            ++event_cnt; 
    918926        } 
    919927 
     
    921929        if ((key_has_pending_read(h) || key_has_pending_accept(h)) 
    922930            && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) && 
    923             counter<count) 
     931            event_cnt < MAX_EVENTS) 
    924932        { 
    925933#if PJ_IOQUEUE_HAS_SAFE_UNREG 
    926934            increment_counter(h); 
    927935#endif 
    928             event[counter].key = h; 
    929             event[counter].event_type = READABLE_EVENT; 
    930             ++counter; 
     936            event[event_cnt].key = h; 
     937            event[event_cnt].event_type = READABLE_EVENT; 
     938            ++event_cnt; 
    931939        } 
    932940 
    933941#if PJ_HAS_TCP 
    934942        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && 
    935             !IS_CLOSING(h) && counter<count)  
     943            !IS_CLOSING(h) && event_cnt < MAX_EVENTS) 
    936944        { 
    937945#if PJ_IOQUEUE_HAS_SAFE_UNREG 
    938946            increment_counter(h); 
    939947#endif 
    940             event[counter].key = h; 
    941             event[counter].event_type = EXCEPTION_EVENT; 
    942             ++counter; 
     948            event[event_cnt].key = h; 
     949            event[event_cnt].event_type = EXCEPTION_EVENT; 
     950            ++event_cnt; 
    943951        } 
    944952#endif 
    945953    } 
    946954 
    947     for (i=0; i<counter; ++i) { 
     955    for (i=0; i<event_cnt; ++i) { 
    948956        if (event[i].key->grp_lock) 
    949957            pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); 
     
    956964    PJ_RACE_ME(5); 
    957965 
    958     count = counter; 
     966    processed_cnt = 0; 
    959967 
    960968    /* Now process all events. The dispatch functions will take care 
    961969     * of locking in each of the key 
    962970     */ 
    963     for (counter=0; counter<count; ++counter) { 
    964         switch (event[counter].event_type) { 
    965         case READABLE_EVENT: 
    966             ioqueue_dispatch_read_event(ioqueue, event[counter].key); 
    967             break; 
    968         case WRITEABLE_EVENT: 
    969             ioqueue_dispatch_write_event(ioqueue, event[counter].key); 
    970             break; 
    971         case EXCEPTION_EVENT: 
    972             ioqueue_dispatch_exception_event(ioqueue, event[counter].key); 
    973             break; 
    974         case NO_EVENT: 
    975             pj_assert(!"Invalid event!"); 
    976             break; 
    977         } 
    978  
    979 #if PJ_IOQUEUE_HAS_SAFE_UNREG 
    980         decrement_counter(event[counter].key); 
    981 #endif 
    982  
    983         if (event[counter].key->grp_lock) 
    984             pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock, 
     971    for (i=0; i<event_cnt; ++i) { 
     972 
     973        /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ 
     974        if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { 
     975            switch (event[i].event_type) { 
     976            case READABLE_EVENT: 
     977                if (ioqueue_dispatch_read_event(ioqueue, event[i].key)) 
     978                    ++processed_cnt; 
     979                break; 
     980            case WRITEABLE_EVENT: 
     981                if (ioqueue_dispatch_write_event(ioqueue, event[i].key)) 
     982                    ++processed_cnt; 
     983                break; 
     984            case EXCEPTION_EVENT: 
     985                if (ioqueue_dispatch_exception_event(ioqueue, event[i].key)) 
     986                    ++processed_cnt; 
     987                break; 
     988            case NO_EVENT: 
     989                pj_assert(!"Invalid event!"); 
     990                break; 
     991            } 
     992        } 
     993 
     994#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     995        decrement_counter(event[i].key); 
     996#endif 
     997 
     998        if (event[i].key->grp_lock) 
     999            pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock, 
    9851000                                    "ioqueue", 0); 
    9861001    } 
    9871002 
    988  
    989     return count; 
    990 } 
    991  
     1003    TRACE__((THIS_FILE, "     poll: count=%d events=%d processed=%d", 
     1004             count, event_cnt, processed_cnt)); 
     1005 
     1006    return processed_cnt; 
     1007} 
     1008 
Note: See TracChangeset for help on using the changeset viewer.