Changeset 5194 for pjproject/trunk


Ignore:
Timestamp:
Nov 6, 2015 4:18:46 AM (9 years ago)
Author:
nanang
Message:

Close #1894: Improve ioqueue performance on multithreadeded environment.

Location:
pjproject/trunk/pjlib
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/include/pj/ioqueue.h

    r4724 r5194  
    304304#endif 
    305305 
     306 
     307/** 
     308 * This macro specifies the maximum event candidates collected by each 
     309 * polling thread to be able to reach maximum number of processed events 
     310 * (i.e: PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) in each poll cycle. 
     311 * An event candidate will be dispatched to application as event unless 
     312 * it is already being dispatched by other polling thread. So in order to 
     313 * anticipate such race condition, each poll operation should collects its 
     314 * event candidates more than PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, the 
     315 * recommended value is (PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL * 
     316 * number of polling threads). 
     317 * 
     318 * The value is only meaningfull when specified during PJLIB build and 
     319 * is only effective on multiple polling threads environment. 
     320 */ 
     321#if !defined(PJ_IOQUEUE_MAX_CAND_EVENTS) || \ 
     322    PJ_IOQUEUE_MAX_CAND_EVENTS < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL 
     323#   undef  PJ_IOQUEUE_MAX_CAND_EVENTS 
     324#   define PJ_IOQUEUE_MAX_CAND_EVENTS   PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL 
     325#endif 
     326 
     327 
    306328/** 
    307329 * When this flag is specified in ioqueue's recv() or send() operations, 
     
    503525 */ 
    504526PJ_DECL(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key); 
     527 
     528/** 
     529 * Try to acquire the key's mutex. When the key's concurrency is disabled,  
     530 * application may call this function to synchronize its operation 
     531 * with the key's callback. 
     532 * 
     533 * @param key       The key that was previously obtained from registration. 
     534 * 
     535 * @return          PJ_SUCCESS on success or the appropriate error code. 
     536 */ 
     537PJ_DECL(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key); 
    505538 
    506539/** 
  • pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c

    r4890 r5194  
    196196 * framework. 
    197197 */ 
    198 void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) 
    199 { 
    200     /* Lock the key. */ 
    201     pj_ioqueue_lock_key(h); 
     198pj_bool_t ioqueue_dispatch_write_event( pj_ioqueue_t *ioqueue, 
     199                                        pj_ioqueue_key_t *h) 
     200{ 
     201    pj_status_t rc; 
     202 
     203    /* Try lock the key. */ 
     204    rc = pj_ioqueue_trylock_key(h); 
     205    if (rc != PJ_SUCCESS) { 
     206        return PJ_FALSE; 
     207    } 
    202208 
    203209    if (IS_CLOSING(h)) { 
    204210        pj_ioqueue_unlock_key(h); 
    205         return; 
     211        return PJ_TRUE; 
    206212    } 
    207213 
     
    418424         */ 
    419425        pj_ioqueue_unlock_key(h); 
    420     } 
    421 } 
    422  
    423 void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) 
     426 
     427        return PJ_FALSE; 
     428    } 
     429 
     430    return PJ_TRUE; 
     431} 
     432 
     433pj_bool_t ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, 
     434                                       pj_ioqueue_key_t *h ) 
    424435{ 
    425436    pj_status_t rc; 
    426437 
    427     /* Lock the key. */ 
    428     pj_ioqueue_lock_key(h); 
     438    /* Try lock the key. */ 
     439    rc = pj_ioqueue_trylock_key(h); 
     440    if (rc != PJ_SUCCESS) { 
     441        return PJ_FALSE; 
     442    } 
    429443 
    430444    if (IS_CLOSING(h)) { 
    431445        pj_ioqueue_unlock_key(h); 
    432         return; 
     446        return PJ_TRUE; 
    433447    } 
    434448 
     
    605619         */ 
    606620        pj_ioqueue_unlock_key(h); 
    607     } 
    608 } 
    609  
    610  
    611 void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,  
    612                                        pj_ioqueue_key_t *h ) 
     621 
     622        return PJ_FALSE; 
     623    } 
     624 
     625    return PJ_TRUE; 
     626} 
     627 
     628 
     629pj_bool_t ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, 
     630                                            pj_ioqueue_key_t *h ) 
    613631{ 
    614632    pj_bool_t has_lock; 
    615  
    616     pj_ioqueue_lock_key(h); 
     633    pj_status_t rc; 
     634 
     635    /* Try lock the key. */ 
     636    rc = pj_ioqueue_trylock_key(h); 
     637    if (rc != PJ_SUCCESS) { 
     638        return PJ_FALSE; 
     639    } 
    617640 
    618641    if (!h->connecting) { 
     
    622645         */ 
    623646        pj_ioqueue_unlock_key(h); 
    624         return; 
     647        return PJ_TRUE; 
    625648    } 
    626649 
    627650    if (IS_CLOSING(h)) { 
    628651        pj_ioqueue_unlock_key(h); 
    629         return; 
     652        return PJ_TRUE; 
    630653    } 
    631654 
     
    669692        pj_ioqueue_unlock_key(h); 
    670693    } 
     694 
     695    return PJ_TRUE; 
    671696} 
    672697 
     
    13251350} 
    13261351 
     1352PJ_DEF(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key) 
     1353{ 
     1354    if (key->grp_lock) 
     1355        return pj_grp_lock_tryacquire(key->grp_lock); 
     1356    else 
     1357        return pj_lock_tryacquire(key->lock); 
     1358} 
     1359 
    13271360PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 
    13281361{ 
  • pjproject/trunk/pjlib/src/pj/ioqueue_epoll.c

    r4704 r5194  
    652652PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 
    653653{ 
    654     int i, count, processed; 
     654    int i, count, event_cnt, processed_cnt; 
    655655    int msec; 
    656656    //struct epoll_event *events = ioqueue->events; 
    657657    //struct queue *queue = ioqueue->queue; 
    658     struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
    659     struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
     658    enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; 
     659    struct epoll_event events[MAX_EVENTS]; 
     660    struct queue queue[MAX_EVENTS]; 
    660661    pj_timestamp t1, t2; 
    661662     
     
    668669  
    669670    //count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); 
    670     count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec); 
     671    count = os_epoll_wait( ioqueue->epfd, events, MAX_EVENTS, msec); 
    671672    if (count == 0) { 
    672673#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     
    695696    pj_lock_acquire(ioqueue->lock); 
    696697 
    697     for (processed=0, i=0; i<count; ++i) { 
     698    for (event_cnt=0, i=0; i<count; ++i) { 
    698699        pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) 
    699700                                events[i].epoll_data; 
     
    710711            increment_counter(h); 
    711712#endif 
    712             queue[processed].key = h; 
    713             queue[processed].event_type = READABLE_EVENT; 
    714             ++processed; 
     713            queue[event_cnt].key = h; 
     714            queue[event_cnt].event_type = READABLE_EVENT; 
     715            ++event_cnt; 
    715716            continue; 
    716717        } 
     
    724725            increment_counter(h); 
    725726#endif 
    726             queue[processed].key = h; 
    727             queue[processed].event_type = WRITEABLE_EVENT; 
    728             ++processed; 
     727            queue[event_cnt].key = h; 
     728            queue[event_cnt].event_type = WRITEABLE_EVENT; 
     729            ++event_cnt; 
    729730            continue; 
    730731        } 
     
    739740            increment_counter(h); 
    740741#endif 
    741             queue[processed].key = h; 
    742             queue[processed].event_type = WRITEABLE_EVENT; 
    743             ++processed; 
     742            queue[event_cnt].key = h; 
     743            queue[event_cnt].event_type = WRITEABLE_EVENT; 
     744            ++event_cnt; 
    744745            continue; 
    745746        } 
     
    759760                increment_counter(h); 
    760761#endif 
    761                 queue[processed].key = h; 
    762                 queue[processed].event_type = EXCEPTION_EVENT; 
    763                 ++processed; 
     762                queue[event_cnt].key = h; 
     763                queue[event_cnt].event_type = EXCEPTION_EVENT; 
     764                ++event_cnt; 
    764765            } else if (key_has_pending_read(h) || key_has_pending_accept(h)) { 
    765766#if PJ_IOQUEUE_HAS_SAFE_UNREG 
    766767                increment_counter(h); 
    767768#endif 
    768                 queue[processed].key = h; 
    769                 queue[processed].event_type = READABLE_EVENT; 
    770                 ++processed; 
     769                queue[event_cnt].key = h; 
     770                queue[event_cnt].event_type = READABLE_EVENT; 
     771                ++event_cnt; 
    771772            } 
    772773            continue; 
    773774        } 
    774775    } 
    775     for (i=0; i<processed; ++i) { 
     776    for (i=0; i<event_cnt; ++i) { 
    776777        if (queue[i].key->grp_lock) 
    777778            pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0); 
     
    784785    PJ_RACE_ME(5); 
    785786 
     787    processed_cnt = 0; 
     788 
    786789    /* Now process the events. */ 
    787     for (i=0; i<processed; ++i) { 
    788         switch (queue[i].event_type) { 
    789         case READABLE_EVENT: 
    790             ioqueue_dispatch_read_event(ioqueue, queue[i].key); 
    791             break; 
    792         case WRITEABLE_EVENT: 
    793             ioqueue_dispatch_write_event(ioqueue, queue[i].key); 
    794             break; 
    795         case EXCEPTION_EVENT: 
    796             ioqueue_dispatch_exception_event(ioqueue, queue[i].key); 
    797             break; 
    798         case NO_EVENT: 
    799             pj_assert(!"Invalid event!"); 
    800             break; 
    801         } 
     790    for (i=0; i<event_cnt; ++i) { 
     791 
     792        /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ 
     793        if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { 
     794            switch (queue[i].event_type) { 
     795            case READABLE_EVENT: 
     796                if (ioqueue_dispatch_read_event(ioqueue, queue[i].key)) 
     797                    ++processed_cnt; 
     798                break; 
     799            case WRITEABLE_EVENT: 
     800                if (ioqueue_dispatch_write_event(ioqueue, queue[i].key)) 
     801                    ++processed_cnt; 
     802                break; 
     803            case EXCEPTION_EVENT: 
     804                if (ioqueue_dispatch_exception_event(ioqueue, queue[i].key)) 
     805                    ++processed_cnt; 
     806                break; 
     807            case NO_EVENT: 
     808                pj_assert(!"Invalid event!"); 
     809                break; 
     810            } 
     811        } 
    802812 
    803813#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     
    813823     * When epoll returns > 0 but no descriptors are actually set! 
    814824     */ 
    815     if (count > 0 && !processed && msec > 0) { 
     825    if (count > 0 && !event_cnt && msec > 0) { 
    816826        pj_thread_sleep(msec); 
    817827    } 
     828 
     829    TRACE_((THIS_FILE, "     poll: count=%d events=%d processed=%d", 
     830                       count, event_cnt, processed_cnt)); 
    818831 
    819832    pj_get_timestamp(&t1); 
     
    821834                       processed, pj_elapsed_usec(&t2, &t1))); 
    822835 
    823     return processed; 
    824 } 
    825  
     836    return processed_cnt; 
     837} 
     838 
  • pjproject/trunk/pjlib/src/pj/ioqueue_select.c

    r4991 r5194  
    334334    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && 
    335335                     cb && p_key, PJ_EINVAL); 
     336 
     337    /* On platforms with fd_set containing fd bitmap such as *nix family, 
     338     * avoid potential memory corruption caused by select() when given 
     339     * an fd that is higher than FD_SETSIZE. 
     340     */ 
     341    if (sizeof(fd_set) < FD_SETSIZE && sock >= PJ_IOQUEUE_MAX_HANDLES) 
     342        return PJ_ETOOBIG; 
    336343 
    337344    pj_lock_acquire(ioqueue->lock); 
     
    832839    pj_fd_set_t rfdset, wfdset, xfdset; 
    833840    int nfds; 
    834     int count, i, counter; 
     841    int i, count, event_cnt, processed_cnt; 
    835842    pj_ioqueue_key_t *h; 
     843    enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; 
    836844    struct event 
    837845    { 
    838846        pj_ioqueue_key_t        *key; 
    839847        enum ioqueue_event_type  event_type; 
    840     } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
     848    } event[MAX_EVENTS]; 
    841849 
    842850    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 
     
    890898    else if (count < 0) 
    891899        return -pj_get_netos_error(); 
    892     else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) 
    893         count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; 
    894900 
    895901    /* Scan descriptor sets for event and add the events in the event 
     
    899905    pj_lock_acquire(ioqueue->lock); 
    900906 
    901     counter = 0; 
     907    event_cnt = 0; 
    902908 
    903909    /* Scan for writable sockets first to handle piggy-back data 
    904910     * coming with accept(). 
    905911     */ 
    906     h = ioqueue->active_list.next; 
    907     for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) { 
     912    for (h = ioqueue->active_list.next; 
     913         h != &ioqueue->active_list && event_cnt < MAX_EVENTS; 
     914         h = h->next) 
     915    { 
    908916 
    909917        if ( (key_has_pending_write(h) || key_has_pending_connect(h)) 
     
    913921            increment_counter(h); 
    914922#endif 
    915             event[counter].key = h; 
    916             event[counter].event_type = WRITEABLE_EVENT; 
    917             ++counter; 
     923            event[event_cnt].key = h; 
     924            event[event_cnt].event_type = WRITEABLE_EVENT; 
     925            ++event_cnt; 
    918926        } 
    919927 
     
    921929        if ((key_has_pending_read(h) || key_has_pending_accept(h)) 
    922930            && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) && 
    923             counter<count) 
     931            event_cnt < MAX_EVENTS) 
    924932        { 
    925933#if PJ_IOQUEUE_HAS_SAFE_UNREG 
    926934            increment_counter(h); 
    927935#endif 
    928             event[counter].key = h; 
    929             event[counter].event_type = READABLE_EVENT; 
    930             ++counter; 
     936            event[event_cnt].key = h; 
     937            event[event_cnt].event_type = READABLE_EVENT; 
     938            ++event_cnt; 
    931939        } 
    932940 
    933941#if PJ_HAS_TCP 
    934942        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && 
    935             !IS_CLOSING(h) && counter<count)  
     943            !IS_CLOSING(h) && event_cnt < MAX_EVENTS) 
    936944        { 
    937945#if PJ_IOQUEUE_HAS_SAFE_UNREG 
    938946            increment_counter(h); 
    939947#endif 
    940             event[counter].key = h; 
    941             event[counter].event_type = EXCEPTION_EVENT; 
    942             ++counter; 
     948            event[event_cnt].key = h; 
     949            event[event_cnt].event_type = EXCEPTION_EVENT; 
     950            ++event_cnt; 
    943951        } 
    944952#endif 
    945953    } 
    946954 
    947     for (i=0; i<counter; ++i) { 
     955    for (i=0; i<event_cnt; ++i) { 
    948956        if (event[i].key->grp_lock) 
    949957            pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); 
     
    956964    PJ_RACE_ME(5); 
    957965 
    958     count = counter; 
     966    processed_cnt = 0; 
    959967 
    960968    /* Now process all events. The dispatch functions will take care 
    961969     * of locking in each of the key 
    962970     */ 
    963     for (counter=0; counter<count; ++counter) { 
    964         switch (event[counter].event_type) { 
    965         case READABLE_EVENT: 
    966             ioqueue_dispatch_read_event(ioqueue, event[counter].key); 
    967             break; 
    968         case WRITEABLE_EVENT: 
    969             ioqueue_dispatch_write_event(ioqueue, event[counter].key); 
    970             break; 
    971         case EXCEPTION_EVENT: 
    972             ioqueue_dispatch_exception_event(ioqueue, event[counter].key); 
    973             break; 
    974         case NO_EVENT: 
    975             pj_assert(!"Invalid event!"); 
    976             break; 
    977         } 
    978  
    979 #if PJ_IOQUEUE_HAS_SAFE_UNREG 
    980         decrement_counter(event[counter].key); 
    981 #endif 
    982  
    983         if (event[counter].key->grp_lock) 
    984             pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock, 
     971    for (i=0; i<event_cnt; ++i) { 
     972 
     973        /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ 
     974        if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { 
     975            switch (event[i].event_type) { 
     976            case READABLE_EVENT: 
     977                if (ioqueue_dispatch_read_event(ioqueue, event[i].key)) 
     978                    ++processed_cnt; 
     979                break; 
     980            case WRITEABLE_EVENT: 
     981                if (ioqueue_dispatch_write_event(ioqueue, event[i].key)) 
     982                    ++processed_cnt; 
     983                break; 
     984            case EXCEPTION_EVENT: 
     985                if (ioqueue_dispatch_exception_event(ioqueue, event[i].key)) 
     986                    ++processed_cnt; 
     987                break; 
     988            case NO_EVENT: 
     989                pj_assert(!"Invalid event!"); 
     990                break; 
     991            } 
     992        } 
     993 
     994#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     995        decrement_counter(event[i].key); 
     996#endif 
     997 
     998        if (event[i].key->grp_lock) 
     999            pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock, 
    9851000                                    "ioqueue", 0); 
    9861001    } 
    9871002 
    988  
    989     return count; 
    990 } 
    991  
     1003    TRACE__((THIS_FILE, "     poll: count=%d events=%d processed=%d", 
     1004             count, event_cnt, processed_cnt)); 
     1005 
     1006    return processed_cnt; 
     1007} 
     1008 
Note: See TracChangeset for help on using the changeset viewer.