Ticket #622: ioqueue_epoll.c

File ioqueue_epoll.c, 18.0 KB (added by bennylp, 16 years ago)

Updated version (thanks Simon Chen for the file)

Line 
1/* $Id: ioqueue_epoll.c 2039 2008-06-20 22:44:47Z bennylp $ */
2/*
3 * Copyright (C)2003-2008 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18 */
19/*
20 * ioqueue_epoll.c
21 *
22 * This is the implementation of IOQueue framework using /dev/epoll
23 * API in _both_ Linux user-mode and kernel-mode.
24 */
25
26#include <pj/ioqueue.h>
27#include <pj/os.h>
28#include <pj/lock.h>
29#include <pj/log.h>
30#include <pj/list.h>
31#include <pj/pool.h>
32#include <pj/string.h>
33#include <pj/assert.h>
34#include <pj/errno.h>
35#include <pj/sock.h>
36#include <pj/compat/socket.h>
37
38#if !defined(PJ_LINUX_KERNEL) || PJ_LINUX_KERNEL==0
39    /*
40     * Linux user mode
41     */
42#   include <sys/epoll.h>
43#   include <errno.h>
44#   include <unistd.h>
45
46#   define epoll_data           data.ptr
47#   define epoll_data_type      void*
48#   define ioctl_val_type       unsigned long
49#   define getsockopt_val_ptr   int*
50#   define os_getsockopt        getsockopt
51#   define os_ioctl             ioctl
52#   define os_read              read
53#   define os_close             close
54#   define os_epoll_create      epoll_create
55#   define os_epoll_ctl         epoll_ctl
56#   define os_epoll_wait        epoll_wait
57#else
58    /*
59     * Linux kernel mode.
60     */
61#   include <linux/config.h>
62#   include <linux/version.h>
63#   if defined(MODVERSIONS)
64#       include <linux/modversions.h>
65#   endif
66#   include <linux/kernel.h>
67#   include <linux/poll.h>
68#   include <linux/eventpoll.h>
69#   include <linux/syscalls.h>
70#   include <linux/errno.h>
71#   include <linux/unistd.h>
72#   include <asm/ioctls.h>
73    enum EPOLL_EVENTS
74    {
75        EPOLLIN = 0x001,
76        EPOLLOUT = 0x004,
77        EPOLLERR = 0x008,
78    };
79#   define os_epoll_create              sys_epoll_create
80    static int os_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
81    {
82        long rc;
83        mm_segment_t oldfs = get_fs();
84        set_fs(KERNEL_DS);
85        rc = sys_epoll_ctl(epfd, op, fd, event);
86        set_fs(oldfs);
87        if (rc) {
88            errno = -rc;
89            return -1;
90        } else {
91            return 0;
92        }
93    }
94    static int os_epoll_wait(int epfd, struct epoll_event *events,
95                          int maxevents, int timeout)
96    {
97        int count;
98        mm_segment_t oldfs = get_fs();
99        set_fs(KERNEL_DS);
100        count = sys_epoll_wait(epfd, events, maxevents, timeout);
101        set_fs(oldfs);
102        return count;
103    }
104#   define os_close             sys_close
105#   define os_getsockopt        pj_sock_getsockopt
106    static int os_read(int fd, void *buf, size_t len)
107    {
108        long rc;
109        mm_segment_t oldfs = get_fs();
110        set_fs(KERNEL_DS);
111        rc = sys_read(fd, buf, len);
112        set_fs(oldfs);
113        if (rc) {
114            errno = -rc;
115            return -1;
116        } else {
117            return 0;
118        }
119    }
120#   define socklen_t            unsigned
121#   define ioctl_val_type       unsigned long
122    int ioctl(int fd, int opt, ioctl_val_type value);
123    static int os_ioctl(int fd, int opt, ioctl_val_type value)
124    {
125        int rc;
126        mm_segment_t oldfs = get_fs();
127        set_fs(KERNEL_DS);
128        rc = ioctl(fd, opt, value);
129        set_fs(oldfs);
130        if (rc < 0) {
131            errno = -rc;
132            return rc;
133        } else
134            return rc;
135    }
136#   define getsockopt_val_ptr   char*
137
138#   define epoll_data           data
139#   define epoll_data_type      __u32
140#endif
141
142#define THIS_FILE   "ioq_epoll"
143
144//#define TRACE_(expr) PJ_LOG(3,expr)
145#define TRACE_(expr)
146
147/*
148 * Include common ioqueue abstraction.
149 */
150#include "ioqueue_common_abs.h"
151
152/*
153 * This describes each key.
154 */
155struct pj_ioqueue_key_t
156{
157    DECLARE_COMMON_KEY
158};
159
160struct queue
161{
162    pj_ioqueue_key_t        *key;
163    enum ioqueue_event_type  event_type;
164};
165
166/*
167 * This describes the I/O queue.
168 */
169struct pj_ioqueue_t
170{
171    DECLARE_COMMON_IOQUEUE
172
173    unsigned            max, count;
174    pj_ioqueue_key_t    hlist;
175    int                 epfd;
176    struct epoll_event *events;
177    struct queue       *queue;
178
179#if PJ_IOQUEUE_HAS_SAFE_UNREG
180    pj_mutex_t         *ref_cnt_mutex;
181    pj_ioqueue_key_t    active_list;
182    pj_ioqueue_key_t    closing_list;
183    pj_ioqueue_key_t    free_list;
184#endif
185};
186
187/* Include implementation for common abstraction after we declare
188 * pj_ioqueue_key_t and pj_ioqueue_t.
189 */
190#include "ioqueue_common_abs.c"
191
192#if PJ_IOQUEUE_HAS_SAFE_UNREG
193/* Scan closing keys to be put to free list again */
194static void scan_closing_keys(pj_ioqueue_t *ioqueue);
195#endif
196
197/*
198 * pj_ioqueue_name()
199 */
200PJ_DEF(const char*) pj_ioqueue_name(void)
201{
202#if defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0
203        return "epoll-kernel";
204#else
205        return "epoll";
206#endif
207}
208
209/*
210 * pj_ioqueue_create()
211 *
212 * Create select ioqueue.
213 */
214PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
215                                       pj_size_t max_fd,
216                                       pj_ioqueue_t **p_ioqueue)
217{
218    pj_ioqueue_t *ioqueue;
219    pj_status_t rc;
220    pj_lock_t *lock;
221    int i;
222
223    /* Check that arguments are valid. */
224    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
225                     max_fd > 0, PJ_EINVAL);
226
227    /* Check that size of pj_ioqueue_op_key_t is sufficient */
228    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
229                     sizeof(union operation_key), PJ_EBUG);
230
231    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
232
233    ioqueue_init(ioqueue);
234
235    ioqueue->max = max_fd;
236    ioqueue->count = 0;
237    pj_list_init(&ioqueue->hlist);
238
239#if PJ_IOQUEUE_HAS_SAFE_UNREG
240    /* When safe unregistration is used (the default), we pre-create
241     * all keys and put them in the free list.
242     */
243
244    /* Mutex to protect key's reference counter
245     * We don't want to use key's mutex or ioqueue's mutex because
246     * that would create deadlock situation in some cases.
247     */
248    rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
249    if (rc != PJ_SUCCESS)
250        return rc;
251
252
253    /* Init key list */
254    pj_list_init(&ioqueue->free_list);
255    pj_list_init(&ioqueue->closing_list);
256
257
258    /* Pre-create all keys according to max_fd */
259    for ( i=0; i<max_fd; ++i) {
260        pj_ioqueue_key_t *key;
261
262        key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
263        key->ref_count = 0;
264        rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
265        if (rc != PJ_SUCCESS) {
266            key = ioqueue->free_list.next;
267            while (key != &ioqueue->free_list) {
268                pj_mutex_destroy(key->mutex);
269                key = key->next;
270            }
271            pj_mutex_destroy(ioqueue->ref_cnt_mutex);
272            return rc;
273        }
274
275        pj_list_push_back(&ioqueue->free_list, key);
276    }
277#endif
278
279    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
280    if (rc != PJ_SUCCESS)
281        return rc;
282
283    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
284    if (rc != PJ_SUCCESS)
285        return rc;
286
287    ioqueue->epfd = os_epoll_create(max_fd);
288    if (ioqueue->epfd < 0) {
289        ioqueue_destroy(ioqueue);
290        return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
291    }
292   
293    ioqueue->events = pj_pool_calloc(pool, max_fd, sizeof(struct epoll_event));
294    PJ_ASSERT_RETURN(ioqueue->events != NULL, PJ_ENOMEM);
295
296    ioqueue->queue = pj_pool_calloc(pool, max_fd, sizeof(struct queue));
297    PJ_ASSERT_RETURN(ioqueue->queue != NULL, PJ_ENOMEM);
298
299    PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
300
301    *p_ioqueue = ioqueue;
302    return PJ_SUCCESS;
303}
304
305/*
306 * pj_ioqueue_destroy()
307 *
308 * Destroy ioqueue.
309 */
310PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
311{
312    pj_ioqueue_key_t *key;
313
314    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
315    PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
316
317    pj_lock_acquire(ioqueue->lock);
318    os_close(ioqueue->epfd);
319    ioqueue->epfd = 0;
320
321#if PJ_IOQUEUE_HAS_SAFE_UNREG
322    /* Destroy reference counters */
323    key = ioqueue->active_list.next;
324    while (key != &ioqueue->active_list) {
325        pj_mutex_destroy(key->mutex);
326        key = key->next;
327    }
328
329    key = ioqueue->closing_list.next;
330    while (key != &ioqueue->closing_list) {
331        pj_mutex_destroy(key->mutex);
332        key = key->next;
333    }
334
335    key = ioqueue->free_list.next;
336    while (key != &ioqueue->free_list) {
337        pj_mutex_destroy(key->mutex);
338        key = key->next;
339    }
340
341    pj_mutex_destroy(ioqueue->ref_cnt_mutex);
342#endif
343    return ioqueue_destroy(ioqueue);
344}
345
346/*
347 * pj_ioqueue_register_sock()
348 *
349 * Register a socket to ioqueue.
350 */
351PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
352                                              pj_ioqueue_t *ioqueue,
353                                              pj_sock_t sock,
354                                              void *user_data,
355                                              const pj_ioqueue_callback *cb,
356                                              pj_ioqueue_key_t **p_key)
357{
358    pj_ioqueue_key_t *key = NULL;
359    pj_uint32_t value;
360    struct epoll_event ev;
361    int status;
362    pj_status_t rc = PJ_SUCCESS;
363   
364    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
365                     cb && p_key, PJ_EINVAL);
366
367    pj_lock_acquire(ioqueue->lock);
368
369    if (ioqueue->count >= ioqueue->max) {
370        rc = PJ_ETOOMANY;
371        TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
372        goto on_return;
373    }
374
375    /* Set socket to nonblocking. */
376    value = 1;
377    if ((rc=os_ioctl(sock, FIONBIO, (ioctl_val_type)&value))) {
378        TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: ioctl rc=%d",
379                rc));
380        rc = pj_get_netos_error();
381        goto on_return;
382    }
383
384    /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
385     * the key from the free list. Otherwise allocate a new one.
386     */
387#if PJ_IOQUEUE_HAS_SAFE_UNREG
388
389    /* Scan closing_keys first to let them come back to free_list */
390    scan_closing_keys(ioqueue);
391
392    pj_assert(!pj_list_empty(&ioqueue->free_list));
393    if (pj_list_empty(&ioqueue->free_list)) {
394        rc = PJ_ETOOMANY;
395        goto on_return;
396    }
397
398    key = ioqueue->free_list.next;
399    pj_list_erase(key);
400#else
401    /* Create key. */
402    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
403#endif
404
405    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
406    if (rc != PJ_SUCCESS) {
407        key = NULL;
408        goto on_return;
409    }
410
411    /* Create key's mutex */
412 /*   rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
413    if (rc != PJ_SUCCESS) {
414        key = NULL;
415        goto on_return;
416    }
417*/
418    /* os_epoll_ctl. */
419    ev.events = EPOLLIN | EPOLLERR;
420    ev.epoll_data = (epoll_data_type)key;
421    status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
422    if (status < 0) {
423        rc = pj_get_os_error();
424        pj_mutex_destroy(key->mutex);
425        key = NULL;
426        TRACE_((THIS_FILE,
427                "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d",
428                status));
429        goto on_return;
430    }
431   
432    /* Register */
433    pj_list_insert_before(&ioqueue->hlist, key);
434    ++ioqueue->count;
435
436    //TRACE_((THIS_FILE, "socket registered, count=%d", ioqueue->count));
437
438on_return:
439    *p_key = key;
440    pj_lock_release(ioqueue->lock);
441   
442    return rc;
443}
444
445#if PJ_IOQUEUE_HAS_SAFE_UNREG
446/* Increment key's reference counter */
447static void increment_counter(pj_ioqueue_key_t *key)
448{
449    pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
450    ++key->ref_count;
451    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
452}
453
454/* Decrement the key's reference counter, and when the counter reach zero,
455 * destroy the key.
456 *
457 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
458 */
459static void decrement_counter(pj_ioqueue_key_t *key)
460{
461    pj_lock_acquire(key->ioqueue->lock);
462    pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
463    --key->ref_count;
464    if (key->ref_count == 0) {
465
466        pj_assert(key->closing == 1);
467        pj_gettimeofday(&key->free_time);
468        key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
469        pj_time_val_normalize(&key->free_time);
470
471        pj_list_erase(key);
472        pj_list_push_back(&key->ioqueue->closing_list, key);
473
474    }
475    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
476    pj_lock_release(key->ioqueue->lock);
477}
478#endif
479
480/*
481 * pj_ioqueue_unregister()
482 *
483 * Unregister handle from ioqueue.
484 */
485PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
486{
487    pj_ioqueue_t *ioqueue;
488    struct epoll_event ev;
489    int status;
490   
491    PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL);
492
493    ioqueue = key->ioqueue;
494    pj_lock_acquire(ioqueue->lock);
495
496    pj_assert(ioqueue->count > 0);
497    --ioqueue->count;
498#if !PJ_IOQUEUE_HAS_SAFE_UNREG
499    pj_list_erase(key);
500#endif
501
502    ev.events = 0;
503    ev.epoll_data = (epoll_data_type)key;
504    status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev);
505    if (status != 0) {
506        pj_status_t rc = pj_get_os_error();
507        pj_lock_release(ioqueue->lock);
508        return rc;
509    }
510
511    /* Destroy the key. */
512    pj_sock_close(key->fd);
513
514    pj_lock_release(ioqueue->lock);
515
516
517#if PJ_IOQUEUE_HAS_SAFE_UNREG
518    /* Mark key is closing. */
519    key->closing = 1;
520
521    /* Decrement counter. */
522    decrement_counter(key);
523
524    /* Done. */
525    pj_mutex_unlock(key->mutex);
526#else
527    pj_mutex_destroy(key->mutex);
528#endif
529
530    return PJ_SUCCESS;
531}
532
533/* ioqueue_remove_from_set()
534 * This function is called from ioqueue_dispatch_event() to instruct
535 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
536 * set for the specified event.
537 */
538static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
539                                     pj_ioqueue_key_t *key,
540                                     enum ioqueue_event_type event_type)
541{
542    if (event_type == WRITEABLE_EVENT) {
543        struct epoll_event ev;
544
545        ev.events = EPOLLIN | EPOLLERR;
546        ev.epoll_data = (epoll_data_type)key;
547        os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
548    }   
549}
550
551/*
552 * ioqueue_add_to_set()
553 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
554 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
555 * set for the specified event.
556 */
557static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
558                                pj_ioqueue_key_t *key,
559                                enum ioqueue_event_type event_type )
560{
561    if (event_type == WRITEABLE_EVENT) {
562        struct epoll_event ev;
563
564        ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
565        ev.epoll_data = (epoll_data_type)key;
566        os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
567    }   
568}
569
570#if PJ_IOQUEUE_HAS_SAFE_UNREG
571/* Scan closing keys to be put to free list again */
572static void scan_closing_keys(pj_ioqueue_t *ioqueue)
573{
574    pj_time_val now;
575    pj_ioqueue_key_t *h;
576
577    pj_gettimeofday(&now);
578    h = ioqueue->closing_list.next;
579    while (h != &ioqueue->closing_list) {
580        pj_ioqueue_key_t *next = h->next;
581
582        pj_assert(h->closing != 0);
583
584        if (PJ_TIME_VAL_GTE(now, h->free_time)) {
585            pj_list_erase(h);
586            pj_list_push_back(&ioqueue->free_list, h);
587        }
588        h = next;
589    }
590}
591#endif
592
593/*
594 * pj_ioqueue_poll()
595 *
596 */
597PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
598{
599    int i, count, processed;
600    int msec;
601    struct epoll_event *events = ioqueue->events;
602    struct queue *queue = ioqueue->queue;
603    pj_timestamp t1, t2;
604   
605    PJ_CHECK_STACK();
606
607    msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
608
609    TRACE_((THIS_FILE, "start os_epoll_wait, msec=%d", msec));
610    pj_get_timestamp(&t1);
611 
612    count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
613    if (count == 0) {
614#if PJ_IOQUEUE_HAS_SAFE_UNREG
615    /* Check the closing keys only when there's no activity and when there are
616     * pending closing keys.
617     */
618    if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
619        pj_lock_acquire(ioqueue->lock);
620        scan_closing_keys(ioqueue);
621        pj_lock_release(ioqueue->lock);
622    }
623#endif
624        TRACE_((THIS_FILE, "os_epoll_wait timed out"));
625        return count;
626    }
627    else if (count < 0) {
628        TRACE_((THIS_FILE, "os_epoll_wait error"));
629        return -pj_get_netos_error();
630    }
631
632    pj_get_timestamp(&t2);
633    TRACE_((THIS_FILE, "os_epoll_wait returns %d, time=%d usec",
634                       count, pj_elapsed_usec(&t1, &t2)));
635
636    /* Lock ioqueue. */
637    pj_lock_acquire(ioqueue->lock);
638
639    for (processed=0, i=0; i<count; ++i) {
640        pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
641                                events[i].epoll_data;
642
643        TRACE_((THIS_FILE, "event %d: events=%d", i, events[i].events));
644
645        /*
646         * Check readability.
647         */
648        if ((events[i].events & EPOLLIN) &&
649            (key_has_pending_read(h) || key_has_pending_accept(h))) {
650
651#if PJ_IOQUEUE_HAS_SAFE_UNREG
652            increment_counter(h);
653#endif
654            queue[processed].key = h;
655            queue[processed].event_type = READABLE_EVENT;
656            ++processed;
657        }
658
659        /*
660         * Check for writeability.
661         */
662        if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) {
663
664#if PJ_IOQUEUE_HAS_SAFE_UNREG
665            increment_counter(h);
666#endif
667            queue[processed].key = h;
668            queue[processed].event_type = WRITEABLE_EVENT;
669            ++processed;
670        }
671
672#if PJ_HAS_TCP
673        /*
674         * Check for completion of connect() operation.
675         */
676        if ((events[i].events & EPOLLOUT) && (h->connecting)) {
677
678#if PJ_IOQUEUE_HAS_SAFE_UNREG
679            increment_counter(h);
680#endif
681            queue[processed].key = h;
682            queue[processed].event_type = WRITEABLE_EVENT;
683            ++processed;
684        }
685#endif /* PJ_HAS_TCP */
686       
687        /*
688         * Check for error condition.
689         */
690        if (events[i].events & EPOLLERR && (h->connecting)) {
691               
692#if PJ_IOQUEUE_HAS_SAFE_UNREG
693            increment_counter(h);
694#endif         
695            queue[processed].key = h;
696            queue[processed].event_type = EXCEPTION_EVENT;
697            ++processed;
698        }
699    }
700    pj_lock_release(ioqueue->lock);
701
702    /* Now process the events. */
703    for (i=0; i<processed; ++i) {
704        switch (queue[i].event_type) {
705        case READABLE_EVENT:
706            ioqueue_dispatch_read_event(ioqueue, queue[i].key);
707            break;
708        case WRITEABLE_EVENT:
709            ioqueue_dispatch_write_event(ioqueue, queue[i].key);
710            break;
711        case EXCEPTION_EVENT:
712            ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
713            break;
714        case NO_EVENT:
715            pj_assert(!"Invalid event!");
716            break;
717        }
718
719#if PJ_IOQUEUE_HAS_SAFE_UNREG
720        decrement_counter(queue[i].key);
721#endif
722    }
723
724    /* Special case:
725     * When epoll returns > 0 but no descriptors are actually set!
726     */
727    if (count > 0 && !processed && msec > 0) {
728        pj_thread_sleep(msec);
729    }
730
731    pj_get_timestamp(&t1);
732    TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec",
733                       processed, pj_elapsed_usec(&t2, &t1)));
734
735    return processed;
736}
737