Ignore:
Timestamp:
Nov 6, 2015 4:18:46 AM (8 years ago)
Author:
nanang
Message:

Close #1894: Improve ioqueue performance on multithreadeded environment.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/src/pj/ioqueue_epoll.c

    r4704 r5194  
    652652PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 
    653653{ 
    654     int i, count, processed; 
     654    int i, count, event_cnt, processed_cnt; 
    655655    int msec; 
    656656    //struct epoll_event *events = ioqueue->events; 
    657657    //struct queue *queue = ioqueue->queue; 
    658     struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
    659     struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 
     658    enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; 
     659    struct epoll_event events[MAX_EVENTS]; 
     660    struct queue queue[MAX_EVENTS]; 
    660661    pj_timestamp t1, t2; 
    661662     
     
    668669  
    669670    //count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); 
    670     count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec); 
     671    count = os_epoll_wait( ioqueue->epfd, events, MAX_EVENTS, msec); 
    671672    if (count == 0) { 
    672673#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     
    695696    pj_lock_acquire(ioqueue->lock); 
    696697 
    697     for (processed=0, i=0; i<count; ++i) { 
     698    for (event_cnt=0, i=0; i<count; ++i) { 
    698699        pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) 
    699700                                events[i].epoll_data; 
     
    710711            increment_counter(h); 
    711712#endif 
    712             queue[processed].key = h; 
    713             queue[processed].event_type = READABLE_EVENT; 
    714             ++processed; 
     713            queue[event_cnt].key = h; 
     714            queue[event_cnt].event_type = READABLE_EVENT; 
     715            ++event_cnt; 
    715716            continue; 
    716717        } 
     
    724725            increment_counter(h); 
    725726#endif 
    726             queue[processed].key = h; 
    727             queue[processed].event_type = WRITEABLE_EVENT; 
    728             ++processed; 
     727            queue[event_cnt].key = h; 
     728            queue[event_cnt].event_type = WRITEABLE_EVENT; 
     729            ++event_cnt; 
    729730            continue; 
    730731        } 
     
    739740            increment_counter(h); 
    740741#endif 
    741             queue[processed].key = h; 
    742             queue[processed].event_type = WRITEABLE_EVENT; 
    743             ++processed; 
     742            queue[event_cnt].key = h; 
     743            queue[event_cnt].event_type = WRITEABLE_EVENT; 
     744            ++event_cnt; 
    744745            continue; 
    745746        } 
     
    759760                increment_counter(h); 
    760761#endif 
    761                 queue[processed].key = h; 
    762                 queue[processed].event_type = EXCEPTION_EVENT; 
    763                 ++processed; 
     762                queue[event_cnt].key = h; 
     763                queue[event_cnt].event_type = EXCEPTION_EVENT; 
     764                ++event_cnt; 
    764765            } else if (key_has_pending_read(h) || key_has_pending_accept(h)) { 
    765766#if PJ_IOQUEUE_HAS_SAFE_UNREG 
    766767                increment_counter(h); 
    767768#endif 
    768                 queue[processed].key = h; 
    769                 queue[processed].event_type = READABLE_EVENT; 
    770                 ++processed; 
     769                queue[event_cnt].key = h; 
     770                queue[event_cnt].event_type = READABLE_EVENT; 
     771                ++event_cnt; 
    771772            } 
    772773            continue; 
    773774        } 
    774775    } 
    775     for (i=0; i<processed; ++i) { 
     776    for (i=0; i<event_cnt; ++i) { 
    776777        if (queue[i].key->grp_lock) 
    777778            pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0); 
     
    784785    PJ_RACE_ME(5); 
    785786 
     787    processed_cnt = 0; 
     788 
    786789    /* Now process the events. */ 
    787     for (i=0; i<processed; ++i) { 
    788         switch (queue[i].event_type) { 
    789         case READABLE_EVENT: 
    790             ioqueue_dispatch_read_event(ioqueue, queue[i].key); 
    791             break; 
    792         case WRITEABLE_EVENT: 
    793             ioqueue_dispatch_write_event(ioqueue, queue[i].key); 
    794             break; 
    795         case EXCEPTION_EVENT: 
    796             ioqueue_dispatch_exception_event(ioqueue, queue[i].key); 
    797             break; 
    798         case NO_EVENT: 
    799             pj_assert(!"Invalid event!"); 
    800             break; 
    801         } 
     790    for (i=0; i<event_cnt; ++i) { 
     791 
     792        /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ 
     793        if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { 
     794            switch (queue[i].event_type) { 
     795            case READABLE_EVENT: 
     796                if (ioqueue_dispatch_read_event(ioqueue, queue[i].key)) 
     797                    ++processed_cnt; 
     798                break; 
     799            case WRITEABLE_EVENT: 
     800                if (ioqueue_dispatch_write_event(ioqueue, queue[i].key)) 
     801                    ++processed_cnt; 
     802                break; 
     803            case EXCEPTION_EVENT: 
     804                if (ioqueue_dispatch_exception_event(ioqueue, queue[i].key)) 
     805                    ++processed_cnt; 
     806                break; 
     807            case NO_EVENT: 
     808                pj_assert(!"Invalid event!"); 
     809                break; 
     810            } 
     811        } 
    802812 
    803813#if PJ_IOQUEUE_HAS_SAFE_UNREG 
     
    813823     * When epoll returns > 0 but no descriptors are actually set! 
    814824     */ 
    815     if (count > 0 && !processed && msec > 0) { 
     825    if (count > 0 && !event_cnt && msec > 0) { 
    816826        pj_thread_sleep(msec); 
    817827    } 
     828 
     829    TRACE_((THIS_FILE, "     poll: count=%d events=%d processed=%d", 
     830                       count, event_cnt, processed_cnt)); 
    818831 
    819832    pj_get_timestamp(&t1); 
     
    821834                       processed, pj_elapsed_usec(&t2, &t1))); 
    822835 
    823     return processed; 
    824 } 
    825  
     836    return processed_cnt; 
     837} 
     838 
Note: See TracChangeset for help on using the changeset viewer.