Changeset 11


Ignore:
Timestamp:
Nov 6, 2005 9:37:47 AM (19 years ago)
Author:
bennylp
Message:

Changed ioqueue to allow simultaneous operations on the same key

Location:
pjproject/main/pjlib
Files:
1 added
23 edited

Legend:

Unmodified
Added
Removed
  • pjproject/main/pjlib/build/pjlib.dsp

    r1 r11  
    392392# Begin Source File 
    393393 
     394SOURCE=..\include\pj\compat\os_sunos.h 
     395# End Source File 
     396# Begin Source File 
     397 
    394398SOURCE=..\include\pj\compat\os_win32.h 
    395399# End Source File 
  • pjproject/main/pjlib/build/pjlib_test.dsp

    r1 r11  
    197197# Begin Source File 
    198198 
     199SOURCE="..\src\pjlib-test\udp_echo_srv_ioqueue.c" 
     200# End Source File 
     201# Begin Source File 
     202 
    199203SOURCE="..\src\pjlib-test\udp_echo_srv_sync.c" 
    200204# End Source File 
  • pjproject/main/pjlib/include/pj/compat/os_linux.h

    r4 r11  
    5757#define PJ_SOCK_HAS_INET_ATON       1 
    5858 
     59/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 
     60 * the status of non-blocking connect() operation. 
     61 */ 
     62#define PJ_HAS_SO_ERROR             1 
     63 
     64/* This value specifies the value set in errno by the OS when a non-blocking 
     65 * socket recv() can not return immediate daata. 
     66 */ 
     67#define PJ_BLOCKING_ERROR_VAL       EAGAIN 
     68 
     69/* This value specifies the value set in errno by the OS when a non-blocking 
     70 * socket connect() can not get connected immediately. 
     71 */ 
     72#define PJ_BLOCKING_CONNECT_ERROR_VAL   EINPROGRESS 
     73 
    5974/* Default threading is enabled, unless it's overridden. */ 
    6075#ifndef PJ_HAS_THREADS 
  • pjproject/main/pjlib/include/pj/compat/os_linux_kernel.h

    r4 r11  
    5454#define PJ_SOCK_HAS_INET_ATON       0 
    5555 
     56/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 
     57 * the status of non-blocking connect() operation. 
     58 */ 
     59#define PJ_HAS_SO_ERROR             1 
     60 
     61/* This value specifies the value set in errno by the OS when a non-blocking 
     62 * socket recv() can not return immediate daata. 
     63 */ 
     64#define PJ_BLOCKING_ERROR_VAL       EAGAIN 
     65 
     66/* This value specifies the value set in errno by the OS when a non-blocking 
     67 * socket connect() can not get connected immediately. 
     68 */ 
     69#define PJ_BLOCKING_CONNECT_ERROR_VAL   EINPROGRESS 
     70 
    5671#ifndef PJ_HAS_THREADS 
    5772#  define PJ_HAS_THREADS            (1) 
  • pjproject/main/pjlib/include/pj/compat/os_palmos.h

    r4 r11  
    4646#define PJ_SOCK_HAS_INET_ATON       0 
    4747 
     48/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 
     49 * the status of non-blocking connect() operation. 
     50 */ 
     51#define PJ_HAS_SO_ERROR             0 
     52 
     53/* This value specifies the value set in errno by the OS when a non-blocking 
     54 * socket recv() can not return immediate daata. 
     55 */ 
     56#define PJ_BLOCKING_ERROR_VAL       xxx 
     57 
     58/* This value specifies the value set in errno by the OS when a non-blocking 
     59 * socket connect() can not get connected immediately. 
     60 */ 
     61#define PJ_BLOCKING_CONNECT_ERROR_VAL   xxx 
     62 
    4863/* Default threading is enabled, unless it's overridden. */ 
    4964#ifndef PJ_HAS_THREADS 
  • pjproject/main/pjlib/include/pj/compat/os_sunos.h

    r4 r11  
    4141#define PJ_SOCK_HAS_INET_ATON       0 
    4242 
     43/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 
     44 * the status of non-blocking connect() operation. 
     45 */ 
     46#define PJ_HAS_SO_ERROR             0 
     47 
     48/* This value specifies the value set in errno by the OS when a non-blocking 
     49 * socket recv() can not return immediate daata. 
     50 */ 
     51#define PJ_BLOCKING_ERROR_VAL       EWOULDBLOCK 
     52 
     53/* This value specifies the value set in errno by the OS when a non-blocking 
     54 * socket connect() can not get connected immediately. 
     55 */ 
     56#define PJ_BLOCKING_CONNECT_ERROR_VAL   EINPROGRESS 
     57 
    4358/* Default threading is enabled, unless it's overridden. */ 
    4459#ifndef PJ_HAS_THREADS 
  • pjproject/main/pjlib/include/pj/compat/os_win32.h

    r4 r11  
    6161#define PJ_SOCK_HAS_INET_ATON       0 
    6262 
     63/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 
     64 * the status of non-blocking connect() operation. 
     65 */ 
     66#define PJ_HAS_SO_ERROR             0 
     67 
     68/* This value specifies the value set in errno by the OS when a non-blocking 
     69 * socket recv() or send() can not return immediately. 
     70 */ 
     71#define PJ_BLOCKING_ERROR_VAL       WSAEWOULDBLOCK 
     72 
     73/* This value specifies the value set in errno by the OS when a non-blocking 
     74 * socket connect() can not get connected immediately. 
     75 */ 
     76#define PJ_BLOCKING_CONNECT_ERROR_VAL   WSAEWOULDBLOCK 
     77 
    6378/* Default threading is enabled, unless it's overridden. */ 
    6479#ifndef PJ_HAS_THREADS 
  • pjproject/main/pjlib/include/pj/doxygen.h

    r8 r11  
    8989 * <b>PJLIB Page Documentation</b> on navigation pane of your PDF reader. 
    9090 * 
    91  * 
    92  * 
     91 *  - <b>How to Submit Code to PJLIB Project</b> 
     92 *\n 
     93 * Please read \ref pjlib_coding_convention_page before submitting 
     94 * your code. Send your code as patch against current Subversion tree 
     95 * to the appropriate mailing list. 
    9396 * 
    9497 * 
     
    395398 
    396399 
     400 
     401/*////////////////////////////////////////////////////////////////////////// */ 
     402/* 
     403         CODING CONVENTION 
     404 */ 
     405 
     406/** 
     407 * @page pjlib_coding_convention_page Coding Convention 
     408 * 
     409 * Before you submit your code/patches to be included with PJLIB, you must 
     410 * make sure that your code is compliant with PJLIB coding convention. 
     411 * <b>This is very important!</b> Otherwise we would not accept your code. 
     412 * 
     413 * @section coding_conv_editor_sec Editor Settings 
     414 * 
     415 * The single most important thing in the whole coding convention is editor  
     416 * settings. It's more important than the correctness of your code (bugs will 
     417 * only crash the system, but incorrect tab size is mental!). 
     418 * 
     419 * Kindly set your editor as follows: 
     420 *  - tab size to \b 8. 
     421 *  - indentation to \b 4. 
     422 * 
     423 * With \c vi, you can do it with: 
     424 * <pre> 
     425 *  :se ts=8 
     426 *  :se sts=4 
     427 * </pre> 
     428 * 
     429 * You should replace tab with eight spaces. 
     430 * 
     431 * @section coding_conv_detail_sec Coding Style 
     432 * 
     433 * Coding style MUST strictly follow K&R style. The rest of coding style 
     434 * must follow current style. You SHOULD be able to observe the style 
     435 * currently used by PJLIB from PJLIB sources, and apply the style to your  
     436 * code. If you're not able to do simple thing like to observe PJLIB 
     437 * coding style from the sources, then logic dictates that your ability to 
     438 * observe more difficult area in PJLIB such as memory allocation strategy,  
     439 * concurrency, etc is questionable. 
     440 * 
     441 * @section coding_conv_comment_sec Commenting Your Code 
     442 * 
     443 * Public API (e.g. in header files) MUST have doxygen compliant comments. 
     444 * 
     445 */ 
    397446 
    398447 
  • pjproject/main/pjlib/include/pj/ioqueue.h

    r4 r11  
    11/* $Id$ 
    2  * 
    32 */ 
    43 
     
    4948 * @{ 
    5049 * 
    51  * This file provides abstraction for various event dispatching mechanisms.  
    52  * The interfaces for event dispatching vary alot, even in a single 
    53  * operating system. The abstraction here hopefully is suitable for most of 
    54  * the event dispatching available. 
    55  * 
    56  * Currently, the I/O Queue supports: 
    57  * - select(), as the common denominator, but the least efficient. 
    58  * - I/O Completion ports in Windows NT/2000/XP, which is the most efficient 
    59  *      way to dispatch events in Windows NT based OSes, and most importantly, 
    60  *      it doesn't have the limit on how many handles to monitor. And it works 
    61  *      with files (not only sockets) as well. 
     50 * I/O Queue provides API for performing asynchronous I/O operations. It 
     51 * conforms to proactor pattern, which allows application to submit an 
     52 * asynchronous operation and to be notified later when the operation has 
     53 * completed. 
     54 * 
     55 * The framework works natively in platforms where asynchronous operation API 
     56 * exists, such as in Windows NT with IoCompletionPort/IOCP. In other  
     57 * platforms, the I/O queue abstracts the operating system's event poll API 
     58 * to provide semantics similar to IoCompletionPort with minimal penalties 
     59 * (i.e. per ioqueue and per handle mutex protection). 
     60 * 
     61 * The I/O queue provides more than just unified abstraction. It also: 
     62 *  - makes sure that the operation uses the most effective way to utilize 
     63 *    the underlying mechanism, to provide the maximum theoritical 
     64 *    throughput possible on a given platform. 
     65 *  - choose the most efficient mechanism for event polling on a given 
     66 *    platform. 
     67 * 
     68 * Currently, the I/O Queue is implemented using: 
     69 *  - <tt><b>select()</b></tt>, as the common denominator, but the least  
     70 *    efficient. Also the number of descriptor is limited to  
     71 *    \c PJ_IOQUEUE_MAX_HANDLES (which by default is 64). 
     72 *  - <tt><b>/dev/epoll</b></tt> on Linux (user mode and kernel mode),  
     73 *    a much faster replacement for select() on Linux (and more importantly 
     74 *    doesn't have limitation on number of descriptors). 
     75 *  - <b>I/O Completion ports</b> on Windows NT/2000/XP, which is the most  
     76 *    efficient way to dispatch events in Windows NT based OSes, and most  
     77 *    importantly, it doesn't have the limit on how many handles to monitor. 
     78 *    And it works with files (not only sockets) as well. 
     79 * 
     80 * 
     81 * \section pj_ioqueue_concurrency_sec Concurrency Rules 
     82 * 
     83 * The items below describe rules that must be obeyed when using the I/O  
     84 * queue, with regard to concurrency: 
     85 *  - in general, the I/O queue is thread safe (assuming the lock strategy 
     86 *    is not changed to disable mutex protection). All operations, except 
     87 *    unregistration which is described below, can be safely invoked  
     88 *    simultaneously by multiple  threads. 
     89 *  - however, <b>care must be taken when unregistering a key</b> from the 
     90 *    ioqueue. Application must take care that when one thread is issuing 
     91 *    an unregistration, other thread is not simultaneously invoking an 
     92 *    operation <b>to the same key</b>. 
     93 *\n 
     94 *    This happens because the ioqueue functions are working with a pointer 
     95 *    to the key, and there is a possible race condition where the pointer 
     96 *    has been rendered invalid by other threads before the ioqueue has a 
     97 *    chance to acquire mutex on it. 
    6298 * 
    6399 * \section pj_ioqeuue_examples_sec Examples 
     
    70106 */ 
    71107 
    72  /** 
    73   * This structure describes the callbacks to be called when I/O operation 
    74   * completes. 
    75   */ 
     108 
     109 
     110/** 
     111 * This structure describes operation specific key to be submitted to 
     112 * I/O Queue when performing the asynchronous operation. This key will 
     113 * be returned to the application when completion callback is called. 
     114 * 
     115 * Application normally wants to attach it's specific data in the 
     116 * \c user_data field so that it can keep track of which operation has 
     117 * completed when the callback is called. Alternatively, application can 
     118 * also extend this struct to include its data, because the pointer that 
     119 * is returned in the completion callback will be exactly the same as 
     120 * the pointer supplied when the asynchronous function is called. 
     121 */ 
     122typedef struct pj_ioqueue_op_key_t 
     123{  
     124    void *internal__[32];           /**< Internal I/O Queue data.   */ 
     125    void *user_data;                /**< Application data.          */ 
     126} pj_ioqueue_op_key_t; 
     127 
     128/** 
     129 * This structure describes the callbacks to be called when I/O operation 
     130 * completes. 
     131 */ 
    76132typedef struct pj_ioqueue_callback 
    77133{ 
    78134    /** 
    79      * This callback is called when #pj_ioqueue_read or #pj_ioqueue_recvfrom 
     135     * This callback is called when #pj_ioqueue_recv or #pj_ioqueue_recvfrom 
    80136     * completes. 
    81137     * 
    82138     * @param key           The key. 
    83      * @param bytes_read    The size of data that has just been read. 
     139     * @param op_key        Operation key. 
     140     * @param bytes_read    >= 0 to indicate the amount of data read,  
     141     *                      otherwise negative value containing the error 
     142     *                      code. To obtain the pj_status_t error code, use 
     143     *                      (pj_status_t code = -bytes_read). 
    84144     */ 
    85     void (*on_read_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_read); 
     145    void (*on_read_complete)(pj_ioqueue_key_t *key,  
     146                             pj_ioqueue_op_key_t *op_key,  
     147                             pj_ssize_t bytes_read); 
    86148 
    87149    /** 
     
    90152     * 
    91153     * @param key           The key. 
    92      * @param bytes_read    The size of data that has just been read. 
     154     * @param op_key        Operation key. 
     155     * @param bytes_sent    >= 0 to indicate the amount of data written,  
     156     *                      otherwise negative value containing the error 
     157     *                      code. To obtain the pj_status_t error code, use 
     158     *                      (pj_status_t code = -bytes_sent). 
    93159     */ 
    94     void (*on_write_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent); 
     160    void (*on_write_complete)(pj_ioqueue_key_t *key,  
     161                              pj_ioqueue_op_key_t *op_key,  
     162                              pj_ssize_t bytes_sent); 
    95163 
    96164    /** 
     
    98166     * 
    99167     * @param key           The key. 
     168     * @param op_key        Operation key. 
    100169     * @param sock          Newly connected socket. 
    101170     * @param status        Zero if the operation completes successfully. 
    102171     */ 
    103     void (*on_accept_complete)(pj_ioqueue_key_t *key, pj_sock_t sock,  
    104                                int status); 
     172    void (*on_accept_complete)(pj_ioqueue_key_t *key,  
     173                               pj_ioqueue_op_key_t *op_key,  
     174                               pj_sock_t sock,  
     175                               pj_status_t status); 
    105176 
    106177    /** 
     
    108179     * 
    109180     * @param key           The key. 
    110      * @param status        Zero if the operation completes successfully. 
     181     * @param status        PJ_SUCCESS if the operation completes successfully. 
    111182     */ 
    112     void (*on_connect_complete)(pj_ioqueue_key_t *key, int status); 
     183    void (*on_connect_complete)(pj_ioqueue_key_t *key,  
     184                                pj_status_t status); 
    113185} pj_ioqueue_callback; 
    114186 
    115187 
    116188/** 
    117  * Types of I/O Queue operation. 
     189 * Types of pending I/O Queue operation. This enumeration is only used 
     190 * internally within the ioqueue. 
    118191 */ 
    119192typedef enum pj_ioqueue_operation_e 
     
    139212#define PJ_IOQUEUE_DEFAULT_THREADS  0 
    140213 
    141  
    142214/** 
    143215 * Create a new I/O Queue framework. 
     
    146218 * @param max_fd        The maximum number of handles to be supported, which  
    147219 *                      should not exceed PJ_IOQUEUE_MAX_HANDLES. 
    148  * @param max_threads   The maximum number of threads that are allowed to 
    149  *                      operate on a single descriptor simultaneously. If 
    150  *                      the value is zero, the framework will set it 
    151  *                      to a reasonable value. 
    152220 * @param ioqueue       Pointer to hold the newly created I/O Queue. 
    153221 * 
     
    156224PJ_DECL(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,  
    157225                                        pj_size_t max_fd, 
    158                                         int max_threads, 
    159226                                        pj_ioqueue_t **ioqueue); 
    160227 
     
    201268 *                  retrieved later. 
    202269 * @param cb        Callback to be called when I/O operation completes.  
    203  * @param key       Pointer to receive the returned key. 
     270 * @param key       Pointer to receive the key to be associated with this 
     271 *                  socket. Subsequent I/O queue operation will need this 
     272 *                  key. 
    204273 * 
    205274 * @return          PJ_SUCCESS on success, or the error code. 
     
    210279                                               void *user_data, 
    211280                                               const pj_ioqueue_callback *cb, 
    212                                                pj_ioqueue_key_t **key); 
    213  
    214 /** 
    215  * Unregister a handle from the I/O Queue framework. 
    216  * 
    217  * @param ioque     The I/O Queue. 
    218  * @param key       The key that uniquely identifies the handle, which is  
    219  *                  returned from the function #pj_ioqueue_register_sock() 
    220  *                  or other registration functions. 
     281                                               pj_ioqueue_key_t **key ); 
     282 
     283/** 
     284 * Unregister from the I/O Queue framework. Caller must make sure that 
     285 * the key doesn't have any pending operation before calling this function, 
     286 * or otherwise the behaviour is undefined (either callback will be called 
     287 * later when the data is sent/received, or the callback will not be called, 
     288 * or even something else). 
     289 * 
     290 * @param key       The key that was previously obtained from registration. 
    221291 * 
    222292 * @return          PJ_SUCCESS on success or the error code. 
    223293 */ 
    224 PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, 
    225                                             pj_ioqueue_key_t *key ); 
    226  
    227  
    228 /** 
    229  * Get user data associated with the I/O Queue key. 
    230  * 
    231  * @param key       The key previously associated with the socket/handle with 
    232  *                  #pj_ioqueue_register_sock() (or other registration  
    233  *                  functions). 
    234  * 
    235  * @return          The user data associated with the key, or NULL on error 
    236  *                  of if no data is associated with the key during  
     294PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ); 
     295 
     296 
     297/** 
     298 * Get user data associated with an ioqueue key. 
     299 * 
     300 * @param key       The key that was previously obtained from registration. 
     301 * 
     302 * @return          The user data associated with the descriptor, or NULL  
     303 *                  on error or if no data is associated with the key during 
    237304 *                  registration. 
    238305 */ 
    239306PJ_DECL(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ); 
    240307 
     308/** 
     309 * Set or change the user data to be associated with the file descriptor or 
     310 * handle or socket descriptor. 
     311 * 
     312 * @param key       The key that was previously obtained from registration. 
     313 * @param user_data User data to be associated with the descriptor. 
     314 * @param old_data  Optional parameter to retrieve the old user data. 
     315 * 
     316 * @return          PJ_SUCCESS on success or the error code. 
     317 */ 
     318PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 
     319                                               void *user_data, 
     320                                               void **old_data); 
     321 
    241322 
    242323#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 
    243324/** 
    244  * Instruct I/O Queue to wait for incoming connections on the specified  
    245  * listening socket. This function will return 
    246  * immediately (i.e. non-blocking) regardless whether some data has been  
    247  * transfered. If the function can't complete immediately, and the caller will 
    248  * be notified about the completion when it calls pj_ioqueue_poll(). 
    249  * 
    250  * @param ioqueue   The I/O Queue 
     325 * Instruct I/O Queue to accept incoming connection on the specified  
     326 * listening socket. This function will return immediately (i.e. non-blocking) 
     327 * regardless whether a connection is immediately available. If the function 
     328 * can't complete immediately, the caller will be notified about the incoming 
     329 * connection when it calls pj_ioqueue_poll(). If a new connection is 
     330 * immediately available, the function returns PJ_SUCCESS with the new 
     331 * connection; in this case, the callback WILL NOT be called. 
     332 * 
    251333 * @param key       The key which registered to the server socket. 
    252  * @param sock      Argument which contain pointer to receive  
    253  *                  the socket for the incoming connection. 
     334 * @param op_key    An operation specific key to be associated with the 
     335 *                  pending operation, so that application can keep track of 
     336 *                  which operation has been completed when the callback is 
     337 *                  called. 
     338 * @param new_sock  Argument which contain pointer to receive the new socket 
     339 *                  for the incoming connection. 
    254340 * @param local     Optional argument which contain pointer to variable to  
    255341 *                  receive local address. 
     
    260346 *                  address. This argument is optional. 
    261347 * @return 
    262  *  - PJ_SUCCESS    If there's a connection available immediately, which  
    263  *                  in this case the callback should have been called before  
    264  *                  the function returns. 
    265  *  - PJ_EPENDING   If accept is queued, or  
     348 *  - PJ_SUCCESS    When connection is available immediately, and the  
     349 *                  parameters will be updated to contain information about  
     350 *                  the new connection. In this case, a completion callback 
     351 *                  WILL NOT be called. 
     352 *  - PJ_EPENDING   If no connection is available immediately. When a new 
     353 *                  connection arrives, the callback will be called. 
    266354 *  - non-zero      which indicates the appropriate error code. 
    267355 */ 
    268 PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, 
    269                                         pj_ioqueue_key_t *key, 
     356PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 
     357                                        pj_ioqueue_op_key_t *op_key, 
    270358                                        pj_sock_t *sock, 
    271359                                        pj_sockaddr_t *local, 
     
    275363/** 
    276364 * Initiate non-blocking socket connect. If the socket can NOT be connected 
    277  * immediately, the result will be reported during poll. 
    278  * 
    279  * @param ioqueue   The ioqueue 
     365 * immediately, asynchronous connect() will be scheduled and caller will be 
     366 * notified via completion callback when it calls pj_ioqueue_poll(). If 
     367 * socket is connected immediately, the function returns PJ_SUCCESS and 
     368 * completion callback WILL NOT be called. 
     369 * 
    280370 * @param key       The key associated with TCP socket 
    281371 * @param addr      The remote address. 
     
    283373 * 
    284374 * @return 
    285  *  - PJ_SUCCESS    If socket is connected immediately, which in this case  
    286  *                  the callback should have been called. 
     375 *  - PJ_SUCCESS    If socket is connected immediately. In this case, the 
     376 *                  completion callback WILL NOT be called. 
    287377 *  - PJ_EPENDING   If operation is queued, or  
    288378 *  - non-zero      Indicates the error code. 
    289379 */ 
    290 PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, 
    291                                          pj_ioqueue_key_t *key, 
     380PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 
    292381                                         const pj_sockaddr_t *addr, 
    293382                                         int addrlen ); 
     
    310399                              const pj_time_val *timeout); 
    311400 
     401 
    312402/** 
    313403 * Instruct the I/O Queue to read from the specified handle. This function 
    314404 * returns immediately (i.e. non-blocking) regardless whether some data has  
    315405 * been transfered. If the operation can't complete immediately, caller will  
    316  * be notified about the completion when it calls pj_ioqueue_poll(). 
    317  * 
    318  * @param ioque     The I/O Queue. 
     406 * be notified about the completion when it calls pj_ioqueue_poll(). If data 
     407 * is immediately available, the function will return PJ_SUCCESS and the 
     408 * callback WILL NOT be called. 
     409 * 
    319410 * @param key       The key that uniquely identifies the handle. 
     411 * @param op_key    An operation specific key to be associated with the 
     412 *                  pending operation, so that application can keep track of 
     413 *                  which operation has been completed when the callback is 
     414 *                  called. Caller must make sure that this key remains  
     415 *                  valid until the function completes. 
    320416 * @param buffer    The buffer to hold the read data. The caller MUST make sure 
    321417 *                  that this buffer remain valid until the framework completes 
    322418 *                  reading the handle. 
    323  * @param buflen    The maximum size to be read. 
     419 * @param length    On input, it specifies the size of the buffer. If data is 
     420 *                  available to be read immediately, the function returns 
     421 *                  PJ_SUCCESS and this argument will be filled with the 
     422 *                  amount of data read. If the function is pending, caller 
     423 *                  will be notified about the amount of data read in the 
     424 *                  callback. This parameter can point to local variable in 
     425 *                  caller's stack and doesn't have to remain valid for the 
     426 *                  duration of pending operation. 
     427 * @param flags     Recv flag. 
     428 * 
     429 * @return 
     430 *  - PJ_SUCCESS    If immediate data has been received in the buffer. In this 
     431 *                  case, the callback WILL NOT be called. 
     432 *  - PJ_EPENDING   If the operation has been queued, and the callback will be 
     433 *                  called when data has been received. 
     434 *  - non-zero      The return value indicates the error code. 
     435 */ 
     436PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, 
     437                                      pj_ioqueue_op_key_t *op_key, 
     438                                      void *buffer, 
     439                                      pj_ssize_t *length, 
     440                                      unsigned flags ); 
     441 
     442/** 
     443 * This function behaves similarly as #pj_ioqueue_recv(), except that it is 
     444 * normally called for socket, and the remote address will also be returned 
     445 * along with the data. Caller MUST make sure that both buffer and addr 
     446 * remain valid until the framework completes reading the data. 
     447 * 
     448 * @param key       The key that uniquely identifies the handle. 
     449 * @param op_key    An operation specific key to be associated with the 
     450 *                  pending operation, so that application can keep track of 
     451 *                  which operation has been completed when the callback is 
     452 *                  called. 
     453 * @param buffer    The buffer to hold the read data. The caller MUST make sure 
     454 *                  that this buffer remain valid until the framework completes 
     455 *                  reading the handle. 
     456 * @param length    On input, it specifies the size of the buffer. If data is 
     457 *                  available to be read immediately, the function returns 
     458 *                  PJ_SUCCESS and this argument will be filled with the 
     459 *                  amount of data read. If the function is pending, caller 
     460 *                  will be notified about the amount of data read in the 
     461 *                  callback. This parameter can point to local variable in 
     462 *                  caller's stack and doesn't have to remain valid for the 
     463 *                  duration of pending operation. 
     464 * @param flags     Recv flag. 
     465 * @param addr      Optional Pointer to buffer to receive the address. 
     466 * @param addrlen   On input, specifies the length of the address buffer. 
     467 *                  On output, it will be filled with the actual length of 
     468 *                  the address. This argument can be NULL if \c addr is not 
     469 *                  specified. 
    324470 * 
    325471 * @return 
     
    330476 *  - non-zero      The return value indicates the error code. 
    331477 */ 
    332 PJ_DECL(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, 
    333                                       pj_ioqueue_key_t *key, 
    334                                       void *buffer, 
    335                                       pj_size_t buflen); 
    336  
    337  
    338 /** 
    339  * This function behaves similarly as #pj_ioqueue_read(), except that it is 
    340  * normally called for socket. 
    341  * 
    342  * @param ioque     The I/O Queue. 
    343  * @param key       The key that uniquely identifies the handle. 
    344  * @param buffer    The buffer to hold the read data. The caller MUST make sure 
    345  *                  that this buffer remain valid until the framework completes 
    346  *                  reading the handle. 
    347  * @param buflen    The maximum size to be read. 
    348  * @param flags     Recv flag. 
    349  * 
    350  * @return 
    351  *  - PJ_SUCCESS    If immediate data has been received. In this case, the  
    352  *                  callback must have been called before this function  
    353  *                  returns, and no pending operation is scheduled. 
    354  *  - PJ_EPENDING   If the operation has been queued. 
    355  *  - non-zero      The return value indicates the error code. 
    356  */ 
    357 PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, 
    358                                       pj_ioqueue_key_t *key, 
    359                                       void *buffer, 
    360                                       pj_size_t buflen, 
    361                                       unsigned flags ); 
    362  
    363 /** 
    364  * This function behaves similarly as #pj_ioqueue_read(), except that it is 
    365  * normally called for socket, and the remote address will also be returned 
    366  * along with the data. Caller MUST make sure that both buffer and addr 
    367  * remain valid until the framework completes reading the data. 
    368  * 
    369  * @param ioque     The I/O Queue. 
    370  * @param key       The key that uniquely identifies the handle. 
    371  * @param buffer    The buffer to hold the read data. The caller MUST make sure 
    372  *                  that this buffer remain valid until the framework completes 
    373  *                  reading the handle. 
    374  * @param buflen    The maximum size to be read. 
    375  * @param flags     Recv flag. 
    376  * @param addr      Pointer to buffer to receive the address, or NULL. 
    377  * @param addrlen   On input, specifies the length of the address buffer. 
    378  *                  On output, it will be filled with the actual length of 
    379  *                  the address. 
    380  * 
    381  * @return 
    382  *  - PJ_SUCCESS    If immediate data has been received. In this case, the  
    383  *                  callback must have been called before this function  
    384  *                  returns, and no pending operation is scheduled. 
    385  *  - PJ_EPENDING   If the operation has been queued. 
    386  *  - non-zero      The return value indicates the error code. 
    387  */ 
    388 PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, 
    389                                           pj_ioqueue_key_t *key, 
     478PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 
     479                                          pj_ioqueue_op_key_t *op_key, 
    390480                                          void *buffer, 
    391                                           pj_size_t buflen, 
     481                                          pj_ssize_t *length, 
    392482                                          unsigned flags, 
    393483                                          pj_sockaddr_t *addr, 
    394484                                          int *addrlen); 
    395485 
     486 
    396487/** 
    397488 * Instruct the I/O Queue to write to the handle. This function will return 
    398489 * immediately (i.e. non-blocking) regardless whether some data has been  
    399  * transfered. If the function can't complete immediately, and the caller will 
    400  * be notified about the completion when it calls pj_ioqueue_poll(). 
    401  * 
    402  * @param ioque     the I/O Queue. 
     490 * transfered. If the function can't complete immediately, the caller will 
     491 * be notified about the completion when it calls pj_ioqueue_poll(). If  
     492 * operation completes immediately and data has been transfered, the function 
     493 * returns PJ_SUCCESS and the callback will NOT be called. 
     494 * 
     495 * @param key       The key that identifies the handle. 
     496 * @param op_key    An operation specific key to be associated with the 
     497 *                  pending operation, so that application can keep track of 
     498 *                  which operation has been completed when the callback is 
     499 *                  called. 
     500 * @param data      The data to send. Caller MUST make sure that this buffer  
     501 *                  remains valid until the write operation completes. 
     502 * @param length    On input, it specifies the length of data to send. When 
     503 *                  data was sent immediately, this function returns PJ_SUCCESS 
     504 *                  and this parameter contains the length of data sent. If 
     505 *                  data can not be sent immediately, an asynchronous operation 
     506 *                  is scheduled and caller will be notified via callback the 
     507 *                  number of bytes sent. This parameter can point to local  
     508 *                  variable on caller's stack and doesn't have to remain  
     509 *                  valid until the operation has completed. 
     510 * @param flags     Send flags. 
     511 * 
     512 * @return 
     513 *  - PJ_SUCCESS    If data was immediately transfered. In this case, no 
     514 *                  pending operation has been scheduled and the callback 
     515 *                  WILL NOT be called. 
     516 *  - PJ_EPENDING   If the operation has been queued. Once data base been 
     517 *                  transfered, the callback will be called. 
     518 *  - non-zero      The return value indicates the error code. 
     519 */ 
     520PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 
     521                                      pj_ioqueue_op_key_t *op_key, 
     522                                      const void *data, 
     523                                      pj_ssize_t *length, 
     524                                      unsigned flags ); 
     525 
     526 
     527/** 
     528 * This function behaves similarly as #pj_ioqueue_write(), except that 
     529 * pj_sock_sendto() (or equivalent) will be called to send the data. 
     530 * 
    403531 * @param key       the key that identifies the handle. 
     532 * @param op_key    An operation specific key to be associated with the 
     533 *                  pending operation, so that application can keep track of 
     534 *                  which operation has been completed when the callback is 
     535 *                  called. 
    404536 * @param data      the data to send. Caller MUST make sure that this buffer  
    405537 *                  remains valid until the write operation completes. 
    406  * @param datalen   the length of the data. 
     538 * @param length    On input, it specifies the length of data to send. When 
     539 *                  data was sent immediately, this function returns PJ_SUCCESS 
     540 *                  and this parameter contains the length of data sent. If 
     541 *                  data can not be sent immediately, an asynchronous operation 
     542 *                  is scheduled and caller will be notified via callback the 
     543 *                  number of bytes sent. This parameter can point to local  
     544 *                  variable on caller's stack and doesn't have to remain  
     545 *                  valid until the operation has completed. 
     546 * @param flags     send flags. 
     547 * @param addr      Optional remote address. 
     548 * @param addrlen   Remote address length, \c addr is specified. 
    407549 * 
    408550 * @return 
     
    411553 *  - non-zero      The return value indicates the error code. 
    412554 */ 
    413 PJ_DECL(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, 
    414                                        pj_ioqueue_key_t *key, 
    415                                        const void *data, 
    416                                        pj_size_t datalen); 
    417  
    418 /** 
    419  * This function behaves similarly as #pj_ioqueue_write(), except that 
    420  * pj_sock_send() (or equivalent) will be called to send the data. 
    421  * 
    422  * @param ioque     the I/O Queue. 
    423  * @param key       the key that identifies the handle. 
    424  * @param data      the data to send. Caller MUST make sure that this buffer  
    425  *                  remains valid until the write operation completes. 
    426  * @param datalen   the length of the data. 
    427  * @param flags     send flags. 
    428  * 
    429  * @return 
    430  *  - PJ_SUCCESS    If data was immediately written. 
    431  *  - PJ_EPENDING   If the operation has been queued. 
    432  *  - non-zero      The return value indicates the error code. 
    433  */ 
    434 PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, 
    435                                       pj_ioqueue_key_t *key, 
    436                                       const void *data, 
    437                                       pj_size_t datalen, 
    438                                       unsigned flags ); 
    439  
    440  
    441 /** 
    442  * This function behaves similarly as #pj_ioqueue_write(), except that 
    443  * pj_sock_sendto() (or equivalent) will be called to send the data. 
    444  * 
    445  * @param ioque     the I/O Queue. 
    446  * @param key       the key that identifies the handle. 
    447  * @param data      the data to send. Caller MUST make sure that this buffer  
    448  *                  remains valid until the write operation completes. 
    449  * @param datalen   the length of the data. 
    450  * @param flags     send flags. 
    451  * @param addr      remote address. 
    452  * @param addrlen   remote address length. 
    453  * 
    454  * @return 
    455  *  - PJ_SUCCESS    If data was immediately written. 
    456  *  - PJ_EPENDING   If the operation has been queued. 
    457  *  - non-zero      The return value indicates the error code. 
    458  */ 
    459 PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, 
    460                                         pj_ioqueue_key_t *key, 
     555PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 
     556                                        pj_ioqueue_op_key_t *op_key, 
    461557                                        const void *data, 
    462                                         pj_size_t datalen, 
     558                                        pj_ssize_t *length, 
    463559                                        unsigned flags, 
    464560                                        const pj_sockaddr_t *addr, 
  • pjproject/main/pjlib/include/pj/list.h

    r4 r11  
    4747 */ 
    4848#define PJ_DECL_LIST_MEMBER(type)  type *prev; /** List @a prev. */ \ 
    49                                    type *next; /** List @a next. */  
     49                                   type *next  /** List @a next. */  
    5050 
    5151 
     
    5757struct pj_list 
    5858{ 
    59     PJ_DECL_LIST_MEMBER(void) 
     59    PJ_DECL_LIST_MEMBER(void); 
    6060}; 
    6161 
  • pjproject/main/pjlib/include/pj/pool.h

    r4 r11  
    114114typedef struct pj_pool_block 
    115115{ 
    116     PJ_DECL_LIST_MEMBER(struct pj_pool_block)   /**< List's prev and next.  */ 
     116    PJ_DECL_LIST_MEMBER(struct pj_pool_block);  /**< List's prev and next.  */ 
    117117    unsigned char    *buf;                      /**< Start of buffer.       */ 
    118118    unsigned char    *cur;                      /**< Current alloc ptr.     */ 
     
    127127struct pj_pool_t 
    128128{ 
    129     PJ_DECL_LIST_MEMBER(struct pj_pool_t) 
     129    PJ_DECL_LIST_MEMBER(struct pj_pool_t); 
    130130 
    131131    /** Pool name */ 
  • pjproject/main/pjlib/include/pj/sock_select.h

    r4 r11  
    9898 
    9999/** 
    100  * Get the number of descriptors in the set. 
    101  * 
    102  * @param fdsetp    The descriptor set. 
    103  * 
    104  * @return          Number of descriptors in the set. 
    105  */ 
    106 PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); 
    107  
    108  
    109 /** 
    110100 * This function wait for a number of file  descriptors to change status. 
    111101 * The behaviour is the same as select() function call which appear in 
  • pjproject/main/pjlib/include/pj/xml.h

    r4 r11  
    3131struct pj_xml_attr 
    3232{ 
    33     PJ_DECL_LIST_MEMBER(pj_xml_attr) 
     33    PJ_DECL_LIST_MEMBER(pj_xml_attr); 
    3434    pj_str_t    name;       /**< Attribute name. */ 
    3535    pj_str_t    value;      /**< Attribute value. */ 
     
    4040typedef struct pj_xml_node_head 
    4141{ 
    42     PJ_DECL_LIST_MEMBER(pj_xml_node) 
     42    PJ_DECL_LIST_MEMBER(pj_xml_node); 
    4343} pj_xml_node_head; 
    4444 
     
    4646struct pj_xml_node 
    4747{ 
    48     PJ_DECL_LIST_MEMBER(pj_xml_node)    /** List @a prev and @a next member */ 
     48    PJ_DECL_LIST_MEMBER(pj_xml_node);   /** List @a prev and @a next member */ 
    4949    pj_str_t            name;           /** Node name. */ 
    5050    pj_xml_attr         attr_head;      /** Attribute list. */ 
  • pjproject/main/pjlib/src/pj/config.c

    r6 r11  
    55 
    66static const char *id = "config.c"; 
    7 const char *PJ_VERSION = "0.3.0-pre1"; 
     7const char *PJ_VERSION = "0.3.0-pre4"; 
    88 
    99PJ_DEF(void) pj_dump_config(void) 
  • pjproject/main/pjlib/src/pj/ioqueue_select.c

    r6 r11  
    3535#define THIS_FILE   "ioq_select" 
    3636 
    37 #define PJ_IOQUEUE_IS_READ_OP(op)   ((op & PJ_IOQUEUE_OP_READ) || \ 
    38                                      (op & PJ_IOQUEUE_OP_RECV) || \ 
    39                                      (op & PJ_IOQUEUE_OP_RECV_FROM)) 
    40 #define PJ_IOQUEUE_IS_WRITE_OP(op)  ((op & PJ_IOQUEUE_OP_WRITE) || \ 
    41                                      (op & PJ_IOQUEUE_OP_SEND) || \ 
    42                                      (op & PJ_IOQUEUE_OP_SEND_TO)) 
    43  
    44  
    45 #if PJ_HAS_TCP 
    46 #  define PJ_IOQUEUE_IS_ACCEPT_OP(op)   (op & PJ_IOQUEUE_OP_ACCEPT) 
    47 #  define PJ_IOQUEUE_IS_CONNECT_OP(op)  (op & PJ_IOQUEUE_OP_CONNECT) 
    48 #else 
    49 #  define PJ_IOQUEUE_IS_ACCEPT_OP(op)   0 
    50 #  define PJ_IOQUEUE_IS_CONNECT_OP(op)  0 
    51 #endif 
     37/* 
     38 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return 
     39 * the correct error code. 
     40 */ 
     41#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) 
     42#   error "Error reporting must be enabled for this function to work!" 
     43#endif 
     44 
     45/** 
     46 * Get the number of descriptors in the set. This is defined in sock_select.c 
     47 * This function will only return the number of sockets set from PJ_FD_SET 
     48 * operation. When the set is modified by other means (such as by select()), 
     49 * the count will not be reflected here. 
     50 * 
     51 * That's why don't export this function in the header file, to avoid 
     52 * misunderstanding. 
     53 * 
     54 * @param fdsetp    The descriptor set. 
     55 * 
     56 * @return          Number of descriptors in the set. 
     57 */ 
     58PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); 
     59 
     60 
     61 
    5262 
    5363/* 
     
    6171#endif 
    6272 
     73struct generic_operation 
     74{ 
     75    PJ_DECL_LIST_MEMBER(struct generic_operation); 
     76    pj_ioqueue_operation_e  op; 
     77}; 
     78 
     79struct read_operation 
     80{ 
     81    PJ_DECL_LIST_MEMBER(struct read_operation); 
     82    pj_ioqueue_operation_e  op; 
     83 
     84    void                   *buf; 
     85    pj_size_t               size; 
     86    unsigned                flags; 
     87    pj_sockaddr_t          *rmt_addr; 
     88    int                    *rmt_addrlen; 
     89}; 
     90 
     91struct write_operation 
     92{ 
     93    PJ_DECL_LIST_MEMBER(struct write_operation); 
     94    pj_ioqueue_operation_e  op; 
     95 
     96    char                   *buf; 
     97    pj_size_t               size; 
     98    pj_ssize_t              written; 
     99    unsigned                flags; 
     100    pj_sockaddr_in          rmt_addr; 
     101    int                     rmt_addrlen; 
     102}; 
     103 
     104#if PJ_HAS_TCP 
     105struct accept_operation 
     106{ 
     107    PJ_DECL_LIST_MEMBER(struct accept_operation); 
     108    pj_ioqueue_operation_e  op; 
     109 
     110    pj_sock_t              *accept_fd; 
     111    pj_sockaddr_t          *local_addr; 
     112    pj_sockaddr_t          *rmt_addr; 
     113    int                    *addrlen; 
     114}; 
     115#endif 
     116 
     117union operation_key 
     118{ 
     119    struct generic_operation generic; 
     120    struct read_operation    read; 
     121    struct write_operation   write; 
     122#if PJ_HAS_TCP 
     123    struct accept_operation  accept; 
     124#endif 
     125}; 
     126 
    63127/* 
    64128 * This describes each key. 
     
    66130struct pj_ioqueue_key_t 
    67131{ 
    68     PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) 
     132    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); 
     133    pj_ioqueue_t           *ioqueue; 
    69134    pj_sock_t               fd; 
    70     pj_ioqueue_operation_e  op; 
    71135    void                   *user_data; 
    72136    pj_ioqueue_callback     cb; 
    73  
    74     void                   *rd_buf; 
    75     unsigned                rd_flags; 
    76     pj_size_t               rd_buflen; 
    77     void                   *wr_buf; 
    78     pj_size_t               wr_buflen; 
    79  
    80     pj_sockaddr_t          *rmt_addr; 
    81     int                    *rmt_addrlen; 
    82  
    83     pj_sockaddr_t          *local_addr; 
    84     int                    *local_addrlen; 
    85  
    86     pj_sock_t              *accept_fd; 
     137    int                     connecting; 
     138    struct read_operation   read_list; 
     139    struct write_operation  write_list; 
     140#if PJ_HAS_TCP 
     141    struct accept_operation accept_list; 
     142#endif 
    87143}; 
    88144 
     
    95151    pj_bool_t           auto_delete_lock; 
    96152    unsigned            max, count; 
    97     pj_ioqueue_key_t    hlist; 
     153    pj_ioqueue_key_t    key_list; 
    98154    pj_fd_set_t         rfdset; 
    99155    pj_fd_set_t         wfdset; 
     
    110166PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,  
    111167                                       pj_size_t max_fd, 
    112                                        int max_threads, 
    113168                                       pj_ioqueue_t **p_ioqueue) 
    114169{ 
    115     pj_ioqueue_t *ioque; 
     170    pj_ioqueue_t *ioqueue; 
    116171    pj_status_t rc; 
    117172 
    118     PJ_UNUSED_ARG(max_threads); 
    119  
    120     if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { 
    121         pj_assert(!"max_fd too large"); 
    122         return PJ_EINVAL; 
    123     } 
    124  
    125     ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 
    126     ioque->max = max_fd; 
    127     ioque->count = 0; 
    128     PJ_FD_ZERO(&ioque->rfdset); 
    129     PJ_FD_ZERO(&ioque->wfdset); 
     173    /* Check that arguments are valid. */ 
     174    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&  
     175                     max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,  
     176                     PJ_EINVAL); 
     177 
     178    /* Check that size of pj_ioqueue_op_key_t is sufficient */ 
     179    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= 
     180                     sizeof(union operation_key), PJ_EBUG); 
     181 
     182    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 
     183    ioqueue->max = max_fd; 
     184    ioqueue->count = 0; 
     185    PJ_FD_ZERO(&ioqueue->rfdset); 
     186    PJ_FD_ZERO(&ioqueue->wfdset); 
    130187#if PJ_HAS_TCP 
    131     PJ_FD_ZERO(&ioque->xfdset); 
    132 #endif 
    133     pj_list_init(&ioque->hlist); 
    134  
    135     rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock); 
     188    PJ_FD_ZERO(&ioqueue->xfdset); 
     189#endif 
     190    pj_list_init(&ioqueue->key_list); 
     191 
     192    rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock); 
    136193    if (rc != PJ_SUCCESS) 
    137194        return rc; 
    138195 
    139     ioque->auto_delete_lock = PJ_TRUE; 
    140  
    141     PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque)); 
    142  
    143     *p_ioqueue = ioque; 
     196    ioqueue->auto_delete_lock = PJ_TRUE; 
     197 
     198    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); 
     199 
     200    *p_ioqueue = ioqueue; 
    144201    return PJ_SUCCESS; 
    145202} 
     
    150207 * Destroy ioqueue. 
    151208 */ 
    152 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque) 
     209PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 
    153210{ 
    154211    pj_status_t rc = PJ_SUCCESS; 
    155212 
    156     PJ_ASSERT_RETURN(ioque, PJ_EINVAL); 
    157  
    158     if (ioque->auto_delete_lock) 
    159         rc = pj_lock_destroy(ioque->lock); 
     213    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
     214 
     215    pj_lock_acquire(ioqueue->lock); 
     216 
     217    if (ioqueue->auto_delete_lock) 
     218        rc = pj_lock_destroy(ioqueue->lock); 
    160219 
    161220    return rc; 
     
    164223 
    165224/* 
    166  * pj_ioqueue_set_lock() 
    167  */ 
    168 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,  
    169                                          pj_lock_t *lock, 
    170                                          pj_bool_t auto_delete ) 
    171 { 
    172     PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); 
    173  
    174     if (ioque->auto_delete_lock) { 
    175         pj_lock_destroy(ioque->lock); 
    176     } 
    177  
    178     ioque->lock = lock; 
    179     ioque->auto_delete_lock = auto_delete; 
    180  
    181     return PJ_SUCCESS; 
    182 } 
    183  
    184  
    185 /* 
    186225 * pj_ioqueue_register_sock() 
    187226 * 
     
    189228 */ 
    190229PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 
    191                                               pj_ioqueue_t *ioque, 
     230                                              pj_ioqueue_t *ioqueue, 
    192231                                              pj_sock_t sock, 
    193232                                              void *user_data, 
     
    199238    pj_status_t rc = PJ_SUCCESS; 
    200239     
    201     PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET && 
     240    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && 
    202241                     cb && p_key, PJ_EINVAL); 
    203242 
    204     pj_lock_acquire(ioque->lock); 
    205  
    206     if (ioque->count >= ioque->max) { 
     243    pj_lock_acquire(ioqueue->lock); 
     244 
     245    if (ioqueue->count >= ioqueue->max) { 
    207246        rc = PJ_ETOOMANY; 
    208247        goto on_return; 
     
    212251    value = 1; 
    213252#ifdef PJ_WIN32 
    214     if (ioctlsocket(sock, FIONBIO, (unsigned long*)&value)) { 
     253    if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) { 
    215254#else 
    216255    if (ioctl(sock, FIONBIO, &value)) { 
     
    222261    /* Create key. */ 
    223262    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
     263    key->ioqueue = ioqueue; 
    224264    key->fd = sock; 
    225265    key->user_data = user_data; 
     266    pj_list_init(&key->read_list); 
     267    pj_list_init(&key->write_list); 
     268#if PJ_HAS_TCP 
     269    pj_list_init(&key->accept_list); 
     270#endif 
    226271 
    227272    /* Save callback. */ 
     
    229274 
    230275    /* Register */ 
    231     pj_list_insert_before(&ioque->hlist, key); 
    232     ++ioque->count; 
     276    pj_list_insert_before(&ioqueue->key_list, key); 
     277    ++ioqueue->count; 
    233278 
    234279on_return: 
     280    /* On error, socket may be left in non-blocking mode. */ 
    235281    *p_key = key; 
    236     pj_lock_release(ioque->lock); 
     282    pj_lock_release(ioqueue->lock); 
    237283     
    238284    return rc; 
     
    244290 * Unregister handle from ioqueue. 
    245291 */ 
    246 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, 
    247                                            pj_ioqueue_key_t *key) 
    248 { 
    249     PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); 
    250  
    251     pj_lock_acquire(ioque->lock); 
    252  
    253     pj_assert(ioque->count > 0); 
    254     --ioque->count; 
     292PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) 
     293{ 
     294    pj_ioqueue_t *ioqueue; 
     295 
     296    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
     297 
     298    ioqueue = key->ioqueue; 
     299 
     300    pj_lock_acquire(ioqueue->lock); 
     301 
     302    pj_assert(ioqueue->count > 0); 
     303    --ioqueue->count; 
    255304    pj_list_erase(key); 
    256     PJ_FD_CLR(key->fd, &ioque->rfdset); 
    257     PJ_FD_CLR(key->fd, &ioque->wfdset); 
     305    PJ_FD_CLR(key->fd, &ioqueue->rfdset); 
     306    PJ_FD_CLR(key->fd, &ioqueue->wfdset); 
    258307#if PJ_HAS_TCP 
    259     PJ_FD_CLR(key->fd, &ioque->xfdset); 
    260 #endif 
    261  
    262     pj_lock_release(ioque->lock); 
     308    PJ_FD_CLR(key->fd, &ioqueue->xfdset); 
     309#endif 
     310 
     311    pj_lock_release(ioqueue->lock); 
    263312    return PJ_SUCCESS; 
    264313} 
     
    276325 
    277326 
     327/* 
     328 * pj_ioqueue_set_user_data() 
     329 */ 
     330PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 
     331                                              void *user_data, 
     332                                              void **old_data) 
     333{ 
     334    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
     335 
     336    if (old_data) 
     337        *old_data = key->user_data; 
     338    key->user_data = user_data; 
     339 
     340    return PJ_SUCCESS; 
     341} 
     342 
     343 
    278344/* This supposed to check whether the fd_set values are consistent 
    279345 * with the operation currently set in each key. 
    280346 */ 
    281347#if VALIDATE_FD_SET 
    282 static void validate_sets(const pj_ioqueue_t *ioque, 
     348static void validate_sets(const pj_ioqueue_t *ioqueue, 
    283349                          const pj_fd_set_t *rfdset, 
    284350                          const pj_fd_set_t *wfdset, 
     
    287353    pj_ioqueue_key_t *key; 
    288354 
    289     key = ioque->hlist.next; 
    290     while (key != &ioque->hlist) { 
    291         if ((key->op & PJ_IOQUEUE_OP_READ)  
    292             || (key->op & PJ_IOQUEUE_OP_RECV) 
    293             || (key->op & PJ_IOQUEUE_OP_RECV_FROM) 
     355    key = ioqueue->key_list.next; 
     356    while (key != &ioqueue->key_list) { 
     357        if (!pj_list_empty(&key->read_list) 
    294358#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 
    295             || (key->op & PJ_IOQUEUE_OP_ACCEPT) 
     359            || !pj_list_empty(&key->accept_list) 
    296360#endif 
    297361            )  
     
    302366            pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0); 
    303367        } 
    304         if ((key->op & PJ_IOQUEUE_OP_WRITE) 
    305             || (key->op & PJ_IOQUEUE_OP_SEND) 
    306             || (key->op & PJ_IOQUEUE_OP_SEND_TO) 
     368        if (!pj_list_empty(&key->write_list) 
    307369#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 
    308             || (key->op & PJ_IOQUEUE_OP_CONNECT) 
     370            || key->connecting 
    309371#endif 
    310372           ) 
     
    316378        } 
    317379#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 
    318         if (key->op & PJ_IOQUEUE_OP_CONNECT) 
     380        if (key->connecting) 
    319381        { 
    320382            pj_assert(PJ_FD_ISSET(key->fd, xfdset)); 
     
    348410 *    work on fd_set copy of the ioqueue (not the original one). 
    349411 */ 
    350 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) 
     412PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 
    351413{ 
    352414    pj_fd_set_t rfdset, wfdset, xfdset; 
     
    354416    pj_ioqueue_key_t *h; 
    355417 
     418    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
     419 
    356420    /* Lock ioqueue before making fd_set copies */ 
    357     pj_lock_acquire(ioque->lock); 
    358  
    359     if (PJ_FD_COUNT(&ioque->rfdset)==0 && 
    360         PJ_FD_COUNT(&ioque->wfdset)==0 && 
    361         PJ_FD_COUNT(&ioque->xfdset)==0) 
     421    pj_lock_acquire(ioqueue->lock); 
     422 
     423    /* We will only do select() when there are sockets to be polled. 
     424     * Otherwise select() will return error. 
     425     */ 
     426    if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && 
     427        PJ_FD_COUNT(&ioqueue->wfdset)==0 && 
     428        PJ_FD_COUNT(&ioqueue->xfdset)==0) 
    362429    { 
    363         pj_lock_release(ioque->lock); 
     430        pj_lock_release(ioqueue->lock); 
    364431        if (timeout) 
    365432            pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); 
     
    368435 
    369436    /* Copy ioqueue's pj_fd_set_t to local variables. */ 
    370     pj_memcpy(&rfdset, &ioque->rfdset, sizeof(pj_fd_set_t)); 
    371     pj_memcpy(&wfdset, &ioque->wfdset, sizeof(pj_fd_set_t)); 
     437    pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t)); 
     438    pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t)); 
    372439#if PJ_HAS_TCP 
    373     pj_memcpy(&xfdset, &ioque->xfdset, sizeof(pj_fd_set_t)); 
     440    pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t)); 
    374441#else 
    375442    PJ_FD_ZERO(&xfdset); 
     
    377444 
    378445#if VALIDATE_FD_SET 
    379     validate_sets(ioque, &rfdset, &wfdset, &xfdset); 
     446    validate_sets(ioqueue, &rfdset, &wfdset, &xfdset); 
    380447#endif 
    381448 
    382449    /* Unlock ioqueue before select(). */ 
    383     pj_lock_release(ioque->lock); 
     450    pj_lock_release(ioqueue->lock); 
    384451 
    385452    count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout); 
     
    388455        return count; 
    389456 
    390     /* Lock ioqueue again before scanning for signalled sockets. */ 
    391     pj_lock_acquire(ioque->lock); 
    392  
    393 #if PJ_HAS_TCP 
    394     /* Scan for exception socket */ 
    395     h = ioque->hlist.next; 
    396 do_except_scan: 
    397     for ( ; h!=&ioque->hlist; h = h->next) { 
    398         if ((h->op & PJ_IOQUEUE_OP_CONNECT) && PJ_FD_ISSET(h->fd, &xfdset)) 
    399             break; 
    400     } 
    401     if (h != &ioque->hlist) { 
    402         /* 'connect()' should be the only operation. */ 
    403         pj_assert((h->op == PJ_IOQUEUE_OP_CONNECT)); 
    404  
    405         /* Clear operation. */ 
    406         h->op &= ~(PJ_IOQUEUE_OP_CONNECT); 
    407         PJ_FD_CLR(h->fd, &ioque->wfdset); 
    408         PJ_FD_CLR(h->fd, &ioque->xfdset); 
    409         PJ_FD_CLR(h->fd, &wfdset); 
    410         PJ_FD_CLR(h->fd, &xfdset); 
    411  
    412         /* Call callback. */ 
    413         if (h->cb.on_connect_complete) 
    414             (*h->cb.on_connect_complete)(h, -1); 
    415  
    416         /* Re-scan exception list. */ 
    417         goto do_except_scan; 
    418     } 
    419 #endif  /* PJ_HAS_TCP */ 
    420  
    421     /* Scan for readable socket. */ 
    422     h = ioque->hlist.next; 
    423 do_readable_scan: 
    424     for ( ; h!=&ioque->hlist; h = h->next) { 
    425         if ((PJ_IOQUEUE_IS_READ_OP(h->op) || PJ_IOQUEUE_IS_ACCEPT_OP(h->op)) &&  
    426             PJ_FD_ISSET(h->fd, &rfdset)) 
     457    /* Lock ioqueue again before scanning for signalled sockets.  
     458     * We must strictly use recursive mutex since application may invoke 
     459     * the ioqueue again inside the callback. 
     460     */ 
     461    pj_lock_acquire(ioqueue->lock); 
     462 
     463    /* Scan for writable sockets first to handle piggy-back data 
     464     * coming with accept(). 
     465     */ 
     466    h = ioqueue->key_list.next; 
     467do_writable_scan: 
     468    for ( ; h!=&ioqueue->key_list; h = h->next) { 
     469        if ( (!pj_list_empty(&h->write_list) || h->connecting) 
     470             && PJ_FD_ISSET(h->fd, &wfdset)) 
    427471        { 
    428472            break; 
    429473        } 
    430474    } 
    431     if (h != &ioque->hlist) { 
    432         pj_status_t rc; 
    433  
    434         pj_assert(PJ_IOQUEUE_IS_READ_OP(h->op) || 
    435                   PJ_IOQUEUE_IS_ACCEPT_OP(h->op)); 
    436          
    437 #       if PJ_HAS_TCP 
    438         if ((h->op & PJ_IOQUEUE_OP_ACCEPT)) { 
    439             /* accept() must be the only operation specified on server socket */ 
    440             pj_assert(h->op == PJ_IOQUEUE_OP_ACCEPT); 
    441  
    442             rc=pj_sock_accept(h->fd, h->accept_fd, h->rmt_addr, h->rmt_addrlen); 
    443             if (rc==0 && h->local_addr) { 
    444                 rc = pj_sock_getsockname(*h->accept_fd, h->local_addr,  
    445                                          h->local_addrlen); 
    446             } 
    447  
    448             h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); 
    449             PJ_FD_CLR(h->fd, &ioque->rfdset); 
    450  
    451             /* Call callback. */ 
    452             if (h->cb.on_accept_complete) 
    453                 (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); 
    454  
    455             /* Re-scan readable sockets. */ 
    456             goto do_readable_scan; 
    457         }  
    458         else { 
    459 #       endif 
    460             pj_ssize_t bytes_read = h->rd_buflen; 
    461  
    462             if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { 
    463                 rc = pj_sock_recvfrom(h->fd, h->rd_buf, &bytes_read, 0, 
    464                                       h->rmt_addr, h->rmt_addrlen); 
    465             } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { 
    466                 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); 
    467             } else { 
    468                 /* 
    469                  * User has specified pj_ioqueue_read(). 
    470                  * On Win32, we should do ReadFile(). But because we got 
    471                  * here because of select() anyway, user must have put a 
    472                  * socket descriptor on h->fd, which in this case we can 
    473                  * just call pj_sock_recv() instead of ReadFile(). 
    474                  * On Unix, user may put a file in h->fd, so we'll have 
    475                  * to call read() here. 
    476                  * This may not compile on systems which doesn't have  
    477                  * read(). That's why we only specify PJ_LINUX here so 
    478                  * that error is easier to catch. 
    479                  */ 
    480 #               if defined(PJ_WIN32) && PJ_WIN32 != 0 
    481                 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); 
    482 #               elif (defined(PJ_LINUX) && PJ_LINUX != 0) || \ 
    483                      (defined(PJ_SUNOS) && PJ_SUNOS != 0) 
    484                 bytes_read = read(h->fd, h->rd_buf, bytes_read); 
    485                 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 
    486 #               elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 
    487                 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); 
    488                 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 
    489 #               else 
    490 #               error "Implement read() for this platform!" 
    491 #               endif 
    492             } 
    493              
    494             if (rc != PJ_SUCCESS) { 
    495 #               if defined(PJ_WIN32) && PJ_WIN32 != 0 
    496                 /* On Win32, for UDP, WSAECONNRESET on the receive side  
    497                  * indicates that previous sending has triggered ICMP Port  
    498                  * Unreachable message. 
    499                  * But we wouldn't know at this point which one of previous  
    500                  * key that has triggered the error, since UDP socket can 
    501                  * be shared! 
    502                  * So we'll just ignore it! 
    503                  */ 
    504  
    505                 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { 
    506                     PJ_LOG(4,(THIS_FILE,  
    507                               "Ignored ICMP port unreach. on key=%p", h)); 
    508                 } 
    509 #               endif 
    510  
    511                 /* In any case we would report this to caller. */ 
    512                 bytes_read = -rc; 
    513             } 
    514  
    515             h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV |  
    516                        PJ_IOQUEUE_OP_RECV_FROM); 
    517             PJ_FD_CLR(h->fd, &ioque->rfdset); 
    518             PJ_FD_CLR(h->fd, &rfdset); 
    519  
    520             /* Call callback. */ 
    521             if (h->cb.on_read_complete) 
    522                 (*h->cb.on_read_complete)(h, bytes_read); 
    523  
    524             /* Re-scan readable sockets. */ 
    525             goto do_readable_scan; 
    526  
    527         } 
    528     } 
    529  
    530     /* Scan for writable socket  */ 
    531     h = ioque->hlist.next; 
    532 do_writable_scan: 
    533     for ( ; h!=&ioque->hlist; h = h->next) { 
    534         if ((PJ_IOQUEUE_IS_WRITE_OP(h->op) || PJ_IOQUEUE_IS_CONNECT_OP(h->op))  
    535             && PJ_FD_ISSET(h->fd, &wfdset)) 
    536         { 
    537             break; 
    538         } 
    539     } 
    540     if (h != &ioque->hlist) { 
    541         pj_assert(PJ_IOQUEUE_IS_WRITE_OP(h->op) ||  
    542                   PJ_IOQUEUE_IS_CONNECT_OP(h->op)); 
     475    if (h != &ioqueue->key_list) { 
     476        pj_assert(!pj_list_empty(&h->write_list) || h->connecting); 
    543477 
    544478#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 
    545         if ((h->op & PJ_IOQUEUE_OP_CONNECT)) { 
     479        if (h->connecting) { 
    546480            /* Completion of connect() operation */ 
    547481            pj_ssize_t bytes_transfered; 
    548482 
    549 #if (defined(PJ_LINUX) && PJ_LINUX!=0) || \ 
    550     (defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0) 
     483#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) 
    551484            /* from connect(2):  
    552                 * On Linux, use getsockopt to read the SO_ERROR option at 
    553                 * level SOL_SOCKET to determine whether connect() completed 
    554                 * successfully (if SO_ERROR is zero). 
    555                 */ 
     485             * On Linux, use getsockopt to read the SO_ERROR option at 
     486             * level SOL_SOCKET to determine whether connect() completed 
     487             * successfully (if SO_ERROR is zero). 
     488             */ 
    556489            int value; 
    557490            socklen_t vallen = sizeof(value); 
     
    590523 
    591524            /* Clear operation. */ 
    592             h->op &= (~PJ_IOQUEUE_OP_CONNECT); 
    593             PJ_FD_CLR(h->fd, &ioque->wfdset); 
    594             PJ_FD_CLR(h->fd, &ioque->xfdset); 
     525            h->connecting = 0; 
     526            PJ_FD_CLR(h->fd, &ioqueue->wfdset); 
     527            PJ_FD_CLR(h->fd, &ioqueue->xfdset); 
    595528 
    596529            /* Call callback. */ 
     
    604537#endif /* PJ_HAS_TCP */ 
    605538        { 
    606             /* Completion of write(), send(), or sendto() operation. */ 
    607  
    608             /* Clear operation. */ 
    609             h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND |  
    610                        PJ_IOQUEUE_OP_SEND_TO); 
    611             PJ_FD_CLR(h->fd, &ioque->wfdset); 
     539            /* Socket is writable. */ 
     540            struct write_operation *write_op; 
     541            pj_ssize_t sent; 
     542            pj_status_t send_rc; 
     543 
     544            /* Get the first in the queue. */ 
     545            write_op = h->write_list.next; 
     546 
     547            /* Send the data. */ 
     548            sent = write_op->size - write_op->written; 
     549            if (write_op->op == PJ_IOQUEUE_OP_SEND) { 
     550                send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, 
     551                                       &sent, write_op->flags); 
     552            } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { 
     553                send_rc = pj_sock_sendto(h->fd,  
     554                                         write_op->buf+write_op->written, 
     555                                         &sent, write_op->flags, 
     556                                         &write_op->rmt_addr,  
     557                                         write_op->rmt_addrlen); 
     558            } else { 
     559                pj_assert(!"Invalid operation type!"); 
     560                send_rc = PJ_EBUG; 
     561            } 
     562 
     563            if (send_rc == PJ_SUCCESS) { 
     564                write_op->written += sent; 
     565            } else { 
     566                pj_assert(send_rc > 0); 
     567                write_op->written = -send_rc; 
     568            } 
     569 
     570            /* In any case we don't need to process this descriptor again. */ 
    612571            PJ_FD_CLR(h->fd, &wfdset); 
    613572 
    614             /* Call callback. */ 
    615             /* All data must have been sent? */ 
    616             if (h->cb.on_write_complete) 
    617                 (*h->cb.on_write_complete)(h, h->wr_buflen); 
    618  
     573            /* Are we finished with this buffer? */ 
     574            if (send_rc!=PJ_SUCCESS ||  
     575                write_op->written == (pj_ssize_t)write_op->size)  
     576            { 
     577                pj_list_erase(write_op); 
     578 
     579                /* Clear operation if there's no more data to send. */ 
     580                if (pj_list_empty(&h->write_list)) 
     581                    PJ_FD_CLR(h->fd, &ioqueue->wfdset); 
     582 
     583                /* Call callback. */ 
     584                if (h->cb.on_write_complete) { 
     585                    (*h->cb.on_write_complete)(h,  
     586                                               (pj_ioqueue_op_key_t*)write_op, 
     587                                               write_op->written); 
     588                } 
     589            } 
     590             
    619591            /* Re-scan writable sockets. */ 
    620592            goto do_writable_scan; 
    621593        } 
    622594    } 
     595 
     596    /* Scan for readable socket. */ 
     597    h = ioqueue->key_list.next; 
     598do_readable_scan: 
     599    for ( ; h!=&ioqueue->key_list; h = h->next) { 
     600        if ((!pj_list_empty(&h->read_list)  
     601#if PJ_HAS_TCP 
     602             || !pj_list_empty(&h->accept_list) 
     603#endif 
     604            ) && PJ_FD_ISSET(h->fd, &rfdset)) 
     605        { 
     606            break; 
     607        } 
     608    } 
     609    if (h != &ioqueue->key_list) { 
     610        pj_status_t rc; 
     611 
     612#if PJ_HAS_TCP 
     613        pj_assert(!pj_list_empty(&h->read_list) ||  
     614                  !pj_list_empty(&h->accept_list)); 
     615#else 
     616        pj_assert(!pj_list_empty(&h->read_list)); 
     617#endif 
     618         
     619#       if PJ_HAS_TCP 
     620        if (!pj_list_empty(&h->accept_list)) { 
     621 
     622            struct accept_operation *accept_op; 
     623             
     624            /* Get one accept operation from the list. */ 
     625            accept_op = h->accept_list.next; 
     626            pj_list_erase(accept_op); 
     627 
     628            rc=pj_sock_accept(h->fd, accept_op->accept_fd,  
     629                              accept_op->rmt_addr, accept_op->addrlen); 
     630            if (rc==PJ_SUCCESS && accept_op->local_addr) { 
     631                rc = pj_sock_getsockname(*accept_op->accept_fd,  
     632                                         accept_op->local_addr, 
     633                                         accept_op->addrlen); 
     634            } 
     635 
     636            /* Clear bit in fdset if there is no more pending accept */ 
     637            if (pj_list_empty(&h->accept_list)) 
     638                PJ_FD_CLR(h->fd, &ioqueue->rfdset); 
     639 
     640            /* Call callback. */ 
     641            if (h->cb.on_accept_complete) 
     642                (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, 
     643                                            *accept_op->accept_fd, rc); 
     644 
     645            /* Re-scan readable sockets. */ 
     646            goto do_readable_scan; 
     647        } 
     648        else { 
     649#       endif 
     650            struct read_operation *read_op; 
     651            pj_ssize_t bytes_read; 
     652 
     653            pj_assert(!pj_list_empty(&h->read_list)); 
     654 
     655            /* Get one pending read operation from the list. */ 
     656            read_op = h->read_list.next; 
     657            pj_list_erase(read_op); 
     658 
     659            bytes_read = read_op->size; 
     660 
     661            if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { 
     662                rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, 
     663                                      read_op->rmt_addr,  
     664                                      read_op->rmt_addrlen); 
     665            } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { 
     666                rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 
     667            } else { 
     668                pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); 
     669                /* 
     670                 * User has specified pj_ioqueue_read(). 
     671                 * On Win32, we should do ReadFile(). But because we got 
     672                 * here because of select() anyway, user must have put a 
     673                 * socket descriptor on h->fd, which in this case we can 
     674                 * just call pj_sock_recv() instead of ReadFile(). 
     675                 * On Unix, user may put a file in h->fd, so we'll have 
     676                 * to call read() here. 
     677                 * This may not compile on systems which doesn't have  
     678                 * read(). That's why we only specify PJ_LINUX here so 
     679                 * that error is easier to catch. 
     680                 */ 
     681#               if defined(PJ_WIN32) && PJ_WIN32 != 0 
     682                rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 
     683                //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, 
     684                //              &bytes_read, NULL); 
     685#               elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) 
     686                bytes_read = read(h->fd, h->rd_buf, bytes_read); 
     687                rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 
     688#               elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 
     689                bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); 
     690                rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 
     691#               else 
     692#               error "Implement read() for this platform!" 
     693#               endif 
     694            } 
     695             
     696            if (rc != PJ_SUCCESS) { 
     697#               if defined(PJ_WIN32) && PJ_WIN32 != 0 
     698                /* On Win32, for UDP, WSAECONNRESET on the receive side  
     699                 * indicates that previous sending has triggered ICMP Port  
     700                 * Unreachable message. 
     701                 * But we wouldn't know at this point which one of previous  
     702                 * key that has triggered the error, since UDP socket can 
     703                 * be shared! 
     704                 * So we'll just ignore it! 
     705                 */ 
     706 
     707                if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { 
     708                    //PJ_LOG(4,(THIS_FILE,  
     709                    //          "Ignored ICMP port unreach. on key=%p", h)); 
     710                } 
     711#               endif 
     712 
     713                /* In any case we would report this to caller. */ 
     714                bytes_read = -rc; 
     715            } 
     716 
     717            /* Clear fdset if there is no pending read. */ 
     718            if (pj_list_empty(&h->read_list)) 
     719                PJ_FD_CLR(h->fd, &ioqueue->rfdset); 
     720 
     721            /* In any case clear from temporary set. */ 
     722            PJ_FD_CLR(h->fd, &rfdset); 
     723 
     724            /* Call callback. */ 
     725            if (h->cb.on_read_complete) 
     726                (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, 
     727                                          bytes_read); 
     728 
     729            /* Re-scan readable sockets. */ 
     730            goto do_readable_scan; 
     731 
     732        } 
     733    } 
     734 
     735#if PJ_HAS_TCP 
     736    /* Scan for exception socket for TCP connection error. */ 
     737    h = ioqueue->key_list.next; 
     738do_except_scan: 
     739    for ( ; h!=&ioqueue->key_list; h = h->next) { 
     740        if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset)) 
     741            break; 
     742    } 
     743    if (h != &ioqueue->key_list) { 
     744 
     745        pj_assert(h->connecting); 
     746 
     747        /* Clear operation. */ 
     748        h->connecting = 0; 
     749        PJ_FD_CLR(h->fd, &ioqueue->wfdset); 
     750        PJ_FD_CLR(h->fd, &ioqueue->xfdset); 
     751        PJ_FD_CLR(h->fd, &wfdset); 
     752        PJ_FD_CLR(h->fd, &xfdset); 
     753 
     754        /* Call callback. */ 
     755        if (h->cb.on_connect_complete) 
     756            (*h->cb.on_connect_complete)(h, -1); 
     757 
     758        /* Re-scan exception list. */ 
     759        goto do_except_scan; 
     760    } 
     761#endif  /* PJ_HAS_TCP */ 
    623762 
    624763    /* Shouldn't happen. */ 
     
    629768    //count = 0; 
    630769 
    631     pj_lock_release(ioque->lock); 
     770    pj_lock_release(ioqueue->lock); 
    632771    return count; 
    633772} 
    634773 
    635774/* 
    636  * pj_ioqueue_read() 
    637  * 
    638  * Start asynchronous read from the descriptor. 
    639  */ 
    640 PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, 
    641                                      pj_ioqueue_key_t *key, 
    642                                      void *buffer, 
    643                                      pj_size_t buflen) 
    644 { 
    645     PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 
     775 * pj_ioqueue_recv() 
     776 * 
     777 * Start asynchronous recv() from the socket. 
     778 */ 
     779PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key, 
     780                                      pj_ioqueue_op_key_t *op_key, 
     781                                      void *buffer, 
     782                                      pj_ssize_t *length, 
     783                                      unsigned flags ) 
     784{ 
     785    pj_status_t status; 
     786    pj_ssize_t size; 
     787    struct read_operation *read_op; 
     788    pj_ioqueue_t *ioqueue; 
     789 
     790    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 
    646791    PJ_CHECK_STACK(); 
    647792 
    648     /* For consistency with other ioqueue implementation, we would reject  
    649      * if descriptor has already been submitted for reading before. 
    650      */ 
    651     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 
    652                       (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 
    653                       (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 
    654                      PJ_EBUSY); 
    655  
    656     pj_lock_acquire(ioque->lock); 
    657  
    658     key->op |= PJ_IOQUEUE_OP_READ; 
    659     key->rd_flags = 0; 
    660     key->rd_buf = buffer; 
    661     key->rd_buflen = buflen; 
    662     PJ_FD_SET(key->fd, &ioque->rfdset); 
    663  
    664     pj_lock_release(ioque->lock); 
     793    /* Try to see if there's data immediately available.  
     794     */ 
     795    size = *length; 
     796    status = pj_sock_recv(key->fd, buffer, &size, flags); 
     797    if (status == PJ_SUCCESS) { 
     798        /* Yes! Data is available! */ 
     799        *length = size; 
     800        return PJ_SUCCESS; 
     801    } else { 
     802        /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
     803         * the error to caller. 
     804         */ 
     805        if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) 
     806            return status; 
     807    } 
     808 
     809    /* 
     810     * No data is immediately available. 
     811     * Must schedule asynchronous operation to the ioqueue. 
     812     */ 
     813    ioqueue = key->ioqueue; 
     814    pj_lock_acquire(ioqueue->lock); 
     815 
     816    read_op = (struct read_operation*)op_key; 
     817 
     818    read_op->op = PJ_IOQUEUE_OP_RECV; 
     819    read_op->buf = buffer; 
     820    read_op->size = *length; 
     821    read_op->flags = flags; 
     822 
     823    pj_list_insert_before(&key->read_list, read_op); 
     824    PJ_FD_SET(key->fd, &ioqueue->rfdset); 
     825 
     826    pj_lock_release(ioqueue->lock); 
    665827    return PJ_EPENDING; 
    666828} 
    667829 
    668  
    669 /* 
    670  * pj_ioqueue_recv() 
    671  * 
    672  * Start asynchronous recv() from the socket. 
    673  */ 
    674 PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_t *ioque, 
    675                                       pj_ioqueue_key_t *key, 
    676                                       void *buffer, 
    677                                       pj_size_t buflen, 
    678                                       unsigned flags ) 
    679 { 
    680     PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 
    681     PJ_CHECK_STACK(); 
    682  
    683     /* For consistency with other ioqueue implementation, we would reject  
    684      * if descriptor has already been submitted for reading before. 
    685      */ 
    686     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 
    687                       (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 
    688                       (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 
    689                      PJ_EBUSY); 
    690  
    691     pj_lock_acquire(ioque->lock); 
    692  
    693     key->op |= PJ_IOQUEUE_OP_RECV; 
    694     key->rd_buf = buffer; 
    695     key->rd_buflen = buflen; 
    696     key->rd_flags = flags; 
    697     PJ_FD_SET(key->fd, &ioque->rfdset); 
    698  
    699     pj_lock_release(ioque->lock); 
    700     return PJ_EPENDING; 
    701 } 
    702  
    703830/* 
    704831 * pj_ioqueue_recvfrom() 
     
    706833 * Start asynchronous recvfrom() from the socket. 
    707834 */ 
    708 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, 
    709                                          pj_ioqueue_key_t *key, 
     835PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 
     836                                         pj_ioqueue_op_key_t *op_key, 
    710837                                         void *buffer, 
    711                                          pj_size_t buflen, 
     838                                         pj_ssize_t *length, 
    712839                                         unsigned flags, 
    713840                                         pj_sockaddr_t *addr, 
    714841                                         int *addrlen) 
    715842{ 
    716     PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 
     843    pj_status_t status; 
     844    pj_ssize_t size; 
     845    struct read_operation *read_op; 
     846    pj_ioqueue_t *ioqueue; 
     847 
     848    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 
    717849    PJ_CHECK_STACK(); 
    718850 
    719     /* For consistency with other ioqueue implementation, we would reject  
    720      * if descriptor has already been submitted for reading before. 
    721      */ 
    722     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 
    723                       (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 
    724                       (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 
    725                      PJ_EBUSY); 
    726  
    727     pj_lock_acquire(ioque->lock); 
    728  
    729     key->op |= PJ_IOQUEUE_OP_RECV_FROM; 
    730     key->rd_buf = buffer; 
    731     key->rd_buflen = buflen; 
    732     key->rd_flags = flags; 
    733     key->rmt_addr = addr; 
    734     key->rmt_addrlen = addrlen; 
    735     PJ_FD_SET(key->fd, &ioque->rfdset); 
    736  
    737     pj_lock_release(ioque->lock); 
     851    /* Try to see if there's data immediately available.  
     852     */ 
     853    size = *length; 
     854    status = pj_sock_recvfrom(key->fd, buffer, &size, flags, 
     855                              addr, addrlen); 
     856    if (status == PJ_SUCCESS) { 
     857        /* Yes! Data is available! */ 
     858        *length = size; 
     859        return PJ_SUCCESS; 
     860    } else { 
     861        /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
     862         * the error to caller. 
     863         */ 
     864        if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) 
     865            return status; 
     866    } 
     867 
     868    /* 
     869     * No data is immediately available. 
     870     * Must schedule asynchronous operation to the ioqueue. 
     871     */ 
     872    ioqueue = key->ioqueue; 
     873    pj_lock_acquire(ioqueue->lock); 
     874 
     875    read_op = (struct read_operation*)op_key; 
     876 
     877    read_op->op = PJ_IOQUEUE_OP_RECV_FROM; 
     878    read_op->buf = buffer; 
     879    read_op->size = *length; 
     880    read_op->flags = flags; 
     881    read_op->rmt_addr = addr; 
     882    read_op->rmt_addrlen = addrlen; 
     883 
     884    pj_list_insert_before(&key->read_list, read_op); 
     885    PJ_FD_SET(key->fd, &ioqueue->rfdset); 
     886 
     887    pj_lock_release(ioqueue->lock); 
    738888    return PJ_EPENDING; 
    739889} 
    740890 
    741891/* 
    742  * pj_ioqueue_write() 
     892 * pj_ioqueue_send() 
     893 * 
     894 * Start asynchronous send() to the descriptor. 
     895 */ 
     896PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 
     897                                     pj_ioqueue_op_key_t *op_key, 
     898                                     const void *data, 
     899                                     pj_ssize_t *length, 
     900                                     unsigned flags) 
     901{ 
     902    pj_ioqueue_t *ioqueue; 
     903    struct write_operation *write_op; 
     904    pj_status_t status; 
     905    pj_ssize_t sent; 
     906 
     907    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 
     908    PJ_CHECK_STACK(); 
     909 
     910    /* Fast track: 
     911     *   Try to send data immediately, only if there's no pending write! 
     912     * Note: 
     913     *  We are speculating that the list is empty here without properly 
     914     *  acquiring ioqueue's mutex first. This is intentional, to maximize 
     915     *  performance via parallelism. 
     916     * 
     917     *  This should be safe, because: 
     918     *      - by convention, we require caller to make sure that the 
     919     *        key is not unregistered while other threads are invoking 
     920     *        an operation on the same key. 
     921     *      - pj_list_empty() is safe to be invoked by multiple threads, 
     922     *        even when other threads are modifying the list. 
     923     */ 
     924    if (pj_list_empty(&key->write_list)) { 
     925        /* 
     926         * See if data can be sent immediately. 
     927         */ 
     928        sent = *length; 
     929        status = pj_sock_send(key->fd, data, &sent, flags); 
     930        if (status == PJ_SUCCESS) { 
     931            /* Success! */ 
     932            *length = sent; 
     933            return PJ_SUCCESS; 
     934        } else { 
     935            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
     936             * the error to caller. 
     937             */ 
     938            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 
     939                return status; 
     940            } 
     941        } 
     942    } 
     943 
     944    /* 
     945     * Schedule asynchronous send. 
     946     */ 
     947    ioqueue = key->ioqueue; 
     948    pj_lock_acquire(ioqueue->lock); 
     949 
     950    write_op = (struct write_operation*)op_key; 
     951    write_op->op = PJ_IOQUEUE_OP_SEND; 
     952    write_op->buf = NULL; 
     953    write_op->size = *length; 
     954    write_op->written = 0; 
     955    write_op->flags = flags; 
     956     
     957    pj_list_insert_before(&key->write_list, write_op); 
     958    PJ_FD_SET(key->fd, &ioqueue->wfdset); 
     959 
     960    pj_lock_release(ioqueue->lock); 
     961 
     962    return PJ_EPENDING; 
     963} 
     964 
     965 
     966/* 
     967 * pj_ioqueue_sendto() 
    743968 * 
    744969 * Start asynchronous write() to the descriptor. 
    745970 */ 
    746 PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, 
    747                                       pj_ioqueue_key_t *key, 
    748                                       const void *data, 
    749                                       pj_size_t datalen) 
    750 { 
    751     pj_status_t rc; 
    752     pj_ssize_t sent; 
    753  
    754     PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 
    755     PJ_CHECK_STACK(); 
    756  
    757     /* For consistency with other ioqueue implementation, we would reject  
    758      * if descriptor has already been submitted for writing before. 
    759      */ 
    760     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 
    761                       (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 
    762                       (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 
    763                      PJ_EBUSY); 
    764  
    765     sent = datalen; 
    766     /* sent would be -1 after pj_sock_send() if it returns error. */ 
    767     rc = pj_sock_send(key->fd, data, &sent, 0); 
    768     if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 
    769         return rc; 
    770     } 
    771  
    772     pj_lock_acquire(ioque->lock); 
    773  
    774     key->op |= PJ_IOQUEUE_OP_WRITE; 
    775     key->wr_buf = NULL; 
    776     key->wr_buflen = datalen; 
    777     PJ_FD_SET(key->fd, &ioque->wfdset); 
    778  
    779     pj_lock_release(ioque->lock); 
    780  
    781     return PJ_EPENDING; 
    782 } 
    783  
    784 /* 
    785  * pj_ioqueue_send() 
    786  * 
    787  * Start asynchronous send() to the descriptor. 
    788  */ 
    789 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, 
    790                                      pj_ioqueue_key_t *key, 
    791                                      const void *data, 
    792                                      pj_size_t datalen, 
    793                                      unsigned flags) 
    794 { 
    795     pj_status_t rc; 
    796     pj_ssize_t sent; 
    797  
    798     PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 
    799     PJ_CHECK_STACK(); 
    800  
    801     /* For consistency with other ioqueue implementation, we would reject  
    802      * if descriptor has already been submitted for writing before. 
    803      */ 
    804     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 
    805                       (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 
    806                       (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 
    807                      PJ_EBUSY); 
    808  
    809     sent = datalen; 
    810     /* sent would be -1 after pj_sock_send() if it returns error. */ 
    811     rc = pj_sock_send(key->fd, data, &sent, flags); 
    812     if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 
    813         return rc; 
    814     } 
    815  
    816     pj_lock_acquire(ioque->lock); 
    817  
    818     key->op |= PJ_IOQUEUE_OP_SEND; 
    819     key->wr_buf = NULL; 
    820     key->wr_buflen = datalen; 
    821     PJ_FD_SET(key->fd, &ioque->wfdset); 
    822  
    823     pj_lock_release(ioque->lock); 
    824  
    825     return PJ_EPENDING; 
    826 } 
    827  
    828  
    829 /* 
    830  * pj_ioqueue_sendto() 
    831  * 
    832  * Start asynchronous write() to the descriptor. 
    833  */ 
    834 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, 
    835                                        pj_ioqueue_key_t *key, 
     971PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 
     972                                       pj_ioqueue_op_key_t *op_key, 
    836973                                       const void *data, 
    837                                        pj_size_t datalen, 
     974                                       pj_ssize_t *length, 
    838975                                       unsigned flags, 
    839976                                       const pj_sockaddr_t *addr, 
    840977                                       int addrlen) 
    841978{ 
    842     pj_status_t rc; 
     979    pj_ioqueue_t *ioqueue; 
     980    struct write_operation *write_op; 
     981    pj_status_t status; 
    843982    pj_ssize_t sent; 
    844983 
    845     PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 
     984    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 
    846985    PJ_CHECK_STACK(); 
    847986 
    848     /* For consistency with other ioqueue implementation, we would reject  
    849      * if descriptor has already been submitted for writing before. 
    850      */ 
    851     PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 
    852                       (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 
    853                       (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 
    854                      PJ_EBUSY); 
    855  
    856     sent = datalen; 
    857     /* sent would be -1 after pj_sock_sendto() if it returns error. */ 
    858     rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); 
    859     if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))  { 
    860         return rc; 
    861     } 
    862  
    863     pj_lock_acquire(ioque->lock); 
    864  
    865     key->op |= PJ_IOQUEUE_OP_SEND_TO; 
    866     key->wr_buf = NULL; 
    867     key->wr_buflen = datalen; 
    868     PJ_FD_SET(key->fd, &ioque->wfdset); 
    869  
    870     pj_lock_release(ioque->lock); 
     987    /* Fast track: 
     988     *   Try to send data immediately, only if there's no pending write! 
     989     * Note: 
     990     *  We are speculating that the list is empty here without properly 
     991     *  acquiring ioqueue's mutex first. This is intentional, to maximize 
     992     *  performance via parallelism. 
     993     * 
     994     *  This should be safe, because: 
     995     *      - by convention, we require caller to make sure that the 
     996     *        key is not unregistered while other threads are invoking 
     997     *        an operation on the same key. 
     998     *      - pj_list_empty() is safe to be invoked by multiple threads, 
     999     *        even when other threads are modifying the list. 
     1000     */ 
     1001    if (pj_list_empty(&key->write_list)) { 
     1002        /* 
     1003         * See if data can be sent immediately. 
     1004         */ 
     1005        sent = *length; 
     1006        status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); 
     1007        if (status == PJ_SUCCESS) { 
     1008            /* Success! */ 
     1009            *length = sent; 
     1010            return PJ_SUCCESS; 
     1011        } else { 
     1012            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
     1013             * the error to caller. 
     1014             */ 
     1015            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 
     1016                return status; 
     1017            } 
     1018        } 
     1019    } 
     1020 
     1021    /* 
     1022     * Check that address storage can hold the address parameter. 
     1023     */ 
     1024    PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG); 
     1025 
     1026    /* 
     1027     * Schedule asynchronous send. 
     1028     */ 
     1029    ioqueue = key->ioqueue; 
     1030    pj_lock_acquire(ioqueue->lock); 
     1031 
     1032    write_op = (struct write_operation*)op_key; 
     1033    write_op->op = PJ_IOQUEUE_OP_SEND_TO; 
     1034    write_op->buf = NULL; 
     1035    write_op->size = *length; 
     1036    write_op->written = 0; 
     1037    write_op->flags = flags; 
     1038    pj_memcpy(&write_op->rmt_addr, addr, addrlen); 
     1039    write_op->rmt_addrlen = addrlen; 
     1040     
     1041    pj_list_insert_before(&key->write_list, write_op); 
     1042    PJ_FD_SET(key->fd, &ioqueue->wfdset); 
     1043 
     1044    pj_lock_release(ioqueue->lock); 
     1045 
    8711046    return PJ_EPENDING; 
    8721047} 
     
    8761051 * Initiate overlapped accept() operation. 
    8771052 */ 
    878 PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, 
    879                                pj_ioqueue_key_t *key, 
    880                                pj_sock_t *new_sock, 
    881                                pj_sockaddr_t *local, 
    882                                pj_sockaddr_t *remote, 
    883                                int *addrlen) 
    884 { 
     1053PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 
     1054                                       pj_ioqueue_op_key_t *op_key, 
     1055                                       pj_sock_t *new_sock, 
     1056                                       pj_sockaddr_t *local, 
     1057                                       pj_sockaddr_t *remote, 
     1058                                       int *addrlen) 
     1059{ 
     1060    pj_ioqueue_t *ioqueue; 
     1061    struct accept_operation *accept_op; 
     1062    pj_status_t status; 
     1063 
    8851064    /* check parameters. All must be specified! */ 
    886     pj_assert(ioqueue && key && new_sock); 
    887  
    888     /* Server socket must have no other operation! */ 
    889     pj_assert(key->op == 0); 
    890      
     1065    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 
     1066 
     1067    /* Fast track: 
     1068     *  See if there's new connection available immediately. 
     1069     */ 
     1070    if (pj_list_empty(&key->accept_list)) { 
     1071        status = pj_sock_accept(key->fd, new_sock, remote, addrlen); 
     1072        if (status == PJ_SUCCESS) { 
     1073            /* Yes! New connection is available! */ 
     1074            if (local && addrlen) { 
     1075                status = pj_sock_getsockname(*new_sock, local, addrlen); 
     1076                if (status != PJ_SUCCESS) { 
     1077                    pj_sock_close(*new_sock); 
     1078                    *new_sock = PJ_INVALID_SOCKET; 
     1079                    return status; 
     1080                } 
     1081            } 
     1082            return PJ_SUCCESS; 
     1083        } else { 
     1084            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 
     1085             * the error to caller. 
     1086             */ 
     1087            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 
     1088                return status; 
     1089            } 
     1090        } 
     1091    } 
     1092 
     1093    /* 
     1094     * No connection is available immediately. 
     1095     * Schedule accept() operation to be completed when there is incoming 
     1096     * connection available. 
     1097     */ 
     1098    ioqueue = key->ioqueue; 
     1099    accept_op = (struct accept_operation*)op_key; 
     1100 
    8911101    pj_lock_acquire(ioqueue->lock); 
    8921102 
    893     key->op = PJ_IOQUEUE_OP_ACCEPT; 
    894     key->accept_fd = new_sock; 
    895     key->rmt_addr = remote; 
    896     key->rmt_addrlen = addrlen; 
    897     key->local_addr = local; 
    898     key->local_addrlen = addrlen;   /* use same addr. as rmt_addrlen */ 
    899  
     1103    accept_op->op = PJ_IOQUEUE_OP_ACCEPT; 
     1104    accept_op->accept_fd = new_sock; 
     1105    accept_op->rmt_addr = remote; 
     1106    accept_op->addrlen= addrlen; 
     1107    accept_op->local_addr = local; 
     1108 
     1109    pj_list_insert_before(&key->accept_list, accept_op); 
    9001110    PJ_FD_SET(key->fd, &ioqueue->rfdset); 
    9011111 
    9021112    pj_lock_release(ioqueue->lock); 
     1113 
    9031114    return PJ_EPENDING; 
    9041115} 
     
    9081119 * since there's no overlapped version of connect()). 
    9091120 */ 
    910 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, 
    911                                         pj_ioqueue_key_t *key, 
     1121PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 
    9121122                                        const pj_sockaddr_t *addr, 
    9131123                                        int addrlen ) 
    9141124{ 
    915     pj_status_t rc; 
     1125    pj_ioqueue_t *ioqueue; 
     1126    pj_status_t status; 
    9161127     
    9171128    /* check parameters. All must be specified! */ 
    918     PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL); 
    919  
    920     /* Connecting socket must have no other operation! */ 
    921     PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY); 
     1129    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 
     1130 
     1131    /* Check if socket has not been marked for connecting */ 
     1132    if (key->connecting != 0) 
     1133        return PJ_EPENDING; 
    9221134     
    923     rc = pj_sock_connect(key->fd, addr, addrlen); 
    924     if (rc == PJ_SUCCESS) { 
     1135    status = pj_sock_connect(key->fd, addr, addrlen); 
     1136    if (status == PJ_SUCCESS) { 
    9251137        /* Connected! */ 
    9261138        return PJ_SUCCESS; 
    9271139    } else { 
    928         if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) ||  
    929             rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))  
    930         { 
     1140        if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { 
    9311141            /* Pending! */ 
     1142            ioqueue = key->ioqueue; 
    9321143            pj_lock_acquire(ioqueue->lock); 
    933             key->op = PJ_IOQUEUE_OP_CONNECT; 
     1144            key->connecting = PJ_TRUE; 
    9341145            PJ_FD_SET(key->fd, &ioqueue->wfdset); 
    9351146            PJ_FD_SET(key->fd, &ioqueue->xfdset); 
     
    9381149        } else { 
    9391150            /* Error! */ 
    940             return rc; 
     1151            return status; 
    9411152        } 
    9421153    } 
  • pjproject/main/pjlib/src/pj/ioqueue_winnt.c

    r6 r11  
    2424 
    2525 
    26 #define ACCEPT_ADDR_LEN     (sizeof(pj_sockaddr_in)+20) 
     26/* The address specified in AcceptEx() must be 16 more than the size of 
     27 * SOCKADDR (source: MSDN). 
     28 */ 
     29#define ACCEPT_ADDR_LEN     (sizeof(pj_sockaddr_in)+16) 
     30 
     31typedef struct generic_overlapped 
     32{ 
     33    WSAOVERLAPPED          overlapped; 
     34    pj_ioqueue_operation_e operation; 
     35} generic_overlapped; 
    2736 
    2837/* 
     
    3443    pj_ioqueue_operation_e operation; 
    3544    WSABUF                 wsabuf; 
     45    pj_sockaddr_in         dummy_addr; 
     46    int                    dummy_addrlen; 
    3647} ioqueue_overlapped; 
    3748 
     
    5465 
    5566/* 
     67 * Structure to hold pending operation key. 
     68 */ 
     69union operation_key 
     70{ 
     71    generic_overlapped      generic; 
     72    ioqueue_overlapped      overlapped; 
     73#if PJ_HAS_TCP 
     74    ioqueue_accept_rec      accept; 
     75#endif 
     76}; 
     77 
     78/* 
    5679 * Structure for individual socket. 
    5780 */ 
    5881struct pj_ioqueue_key_t 
    5982{ 
     83    pj_ioqueue_t       *ioqueue; 
    6084    HANDLE              hnd; 
    6185    void               *user_data; 
    62     ioqueue_overlapped  recv_overlapped; 
    63     ioqueue_overlapped  send_overlapped; 
    6486#if PJ_HAS_TCP 
    6587    int                 connecting; 
    66     ioqueue_accept_rec  accept_overlapped; 
    6788#endif 
    6889    pj_ioqueue_callback cb; 
     
    108129                          &remote, 
    109130                          &remotelen); 
    110     pj_memcpy(accept_overlapped->local, local, locallen); 
    111     pj_memcpy(accept_overlapped->remote, remote, locallen); 
     131    if (*accept_overlapped->addrlen > locallen) { 
     132        pj_memcpy(accept_overlapped->local, local, locallen); 
     133        pj_memcpy(accept_overlapped->remote, remote, locallen); 
     134    } else { 
     135        pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen); 
     136        pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen); 
     137    } 
    112138    *accept_overlapped->addrlen = locallen; 
    113139    if (accept_overlapped->newsock_ptr) 
     
    121147    pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; 
    122148    HANDLE hEvent = ioqueue->connecting_handles[pos]; 
    123     unsigned long optval; 
    124149 
    125150    /* Remove key from array of connecting handles. */ 
     
    144169    } 
    145170 
    146     /* Set socket to blocking again. */ 
    147     optval = 0; 
    148     if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { 
    149         DWORD dwStatus; 
    150         dwStatus = WSAGetLastError(); 
    151     } 
    152171} 
    153172 
     
    184203                                 ioqueue->connecting_handles[pos],  
    185204                                 &net_events); 
    186             *connect_err = net_events.iErrorCode[FD_CONNECT_BIT]; 
     205            *connect_err =  
     206                PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); 
    187207 
    188208            /* Erase socket from pending connect. */ 
     
    195215#endif 
    196216 
    197  
     217/* 
     218 * pj_ioqueue_create() 
     219 */ 
    198220PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,  
    199221                                       pj_size_t max_fd, 
    200                                        int max_threads, 
    201                                        pj_ioqueue_t **ioqueue) 
    202 { 
    203     pj_ioqueue_t *ioq; 
     222                                       pj_ioqueue_t **p_ioqueue) 
     223{ 
     224    pj_ioqueue_t *ioqueue; 
    204225    pj_status_t rc; 
    205226 
    206227    PJ_UNUSED_ARG(max_fd); 
    207     PJ_ASSERT_RETURN(pool && ioqueue, PJ_EINVAL); 
    208  
    209     ioq = pj_pool_zalloc(pool, sizeof(*ioq)); 
    210     ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, max_threads); 
    211     if (ioq->iocp == NULL) 
     228    PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); 
     229 
     230    rc = sizeof(union operation_key); 
     231 
     232    /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */ 
     233    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=  
     234                     sizeof(union operation_key), PJ_EBUG); 
     235 
     236    ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); 
     237    ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 
     238    if (ioqueue->iocp == NULL) 
    212239        return PJ_RETURN_OS_ERROR(GetLastError()); 
    213240 
    214     rc = pj_lock_create_simple_mutex(pool, NULL, &ioq->lock); 
     241    rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); 
    215242    if (rc != PJ_SUCCESS) { 
    216         CloseHandle(ioq->iocp); 
     243        CloseHandle(ioqueue->iocp); 
    217244        return rc; 
    218245    } 
    219246 
    220     ioq->auto_delete_lock = PJ_TRUE; 
    221  
    222     *ioqueue = ioq; 
    223  
    224     PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioq)); 
     247    ioqueue->auto_delete_lock = PJ_TRUE; 
     248 
     249    *p_ioqueue = ioqueue; 
     250 
     251    PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); 
    225252    return PJ_SUCCESS; 
    226253} 
    227254 
    228 PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioque ) 
     255/* 
     256 * pj_ioqueue_destroy() 
     257 */ 
     258PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) 
    229259{ 
    230260    unsigned i; 
    231261 
    232262    PJ_CHECK_STACK(); 
    233     PJ_ASSERT_RETURN(ioque, PJ_EINVAL); 
     263    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 
    234264 
    235265    /* Destroy events in the pool */ 
    236     for (i=0; i<ioque->event_count; ++i) { 
    237         CloseHandle(ioque->event_pool[i]); 
    238     } 
    239     ioque->event_count = 0; 
    240  
    241     if (ioque->auto_delete_lock) 
    242         pj_lock_destroy(ioque->lock); 
    243  
    244     if (CloseHandle(ioque->iocp) == TRUE) 
    245         return PJ_SUCCESS; 
    246     else 
     266    for (i=0; i<ioqueue->event_count; ++i) { 
     267        CloseHandle(ioqueue->event_pool[i]); 
     268    } 
     269    ioqueue->event_count = 0; 
     270 
     271    if (CloseHandle(ioqueue->iocp) != TRUE) 
    247272        return PJ_RETURN_OS_ERROR(GetLastError()); 
    248 } 
    249  
    250 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,  
     273 
     274    if (ioqueue->auto_delete_lock) 
     275        pj_lock_destroy(ioqueue->lock); 
     276 
     277    return PJ_SUCCESS; 
     278} 
     279 
     280/* 
     281 * pj_ioqueue_set_lock() 
     282 */ 
     283PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,  
    251284                                         pj_lock_t *lock, 
    252285                                         pj_bool_t auto_delete ) 
    253286{ 
    254     PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); 
    255  
    256     if (ioque->auto_delete_lock) { 
    257         pj_lock_destroy(ioque->lock); 
    258     } 
    259  
    260     ioque->lock = lock; 
    261     ioque->auto_delete_lock = auto_delete; 
     287    PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); 
     288 
     289    if (ioqueue->auto_delete_lock) { 
     290        pj_lock_destroy(ioqueue->lock); 
     291    } 
     292 
     293    ioqueue->lock = lock; 
     294    ioqueue->auto_delete_lock = auto_delete; 
    262295 
    263296    return PJ_SUCCESS; 
    264297} 
    265298 
     299/* 
     300 * pj_ioqueue_register_sock() 
     301 */ 
    266302PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 
    267                                               pj_ioqueue_t *ioque, 
    268                                               pj_sock_t hnd, 
     303                                              pj_ioqueue_t *ioqueue, 
     304                                              pj_sock_t sock, 
    269305                                              void *user_data, 
    270306                                              const pj_ioqueue_callback *cb, 
     
    273309    HANDLE hioq; 
    274310    pj_ioqueue_key_t *rec; 
    275  
    276     PJ_ASSERT_RETURN(pool && ioque && cb && key, PJ_EINVAL); 
    277  
     311    u_long value; 
     312    int rc; 
     313 
     314    PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); 
     315 
     316    /* Build the key for this socket. */ 
    278317    rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 
    279     rec->hnd = (HANDLE)hnd; 
     318    rec->ioqueue = ioqueue; 
     319    rec->hnd = (HANDLE)sock; 
    280320    rec->user_data = user_data; 
    281321    pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 
    282 #if PJ_HAS_TCP 
    283     rec->accept_overlapped.newsock = PJ_INVALID_SOCKET; 
    284 #endif 
    285     hioq = CreateIoCompletionPort((HANDLE)hnd, ioque->iocp, (DWORD)rec, 0); 
     322 
     323    /* Set socket to nonblocking. */ 
     324    value = 1; 
     325    rc = ioctlsocket(sock, FIONBIO, &value); 
     326    if (rc != 0) { 
     327        return PJ_RETURN_OS_ERROR(WSAGetLastError()); 
     328    } 
     329 
     330    /* Associate with IOCP */ 
     331    hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); 
    286332    if (!hioq) { 
    287333        return PJ_RETURN_OS_ERROR(GetLastError()); 
     
    292338} 
    293339 
    294  
    295  
    296 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, 
    297                                           pj_ioqueue_key_t *key ) 
    298 { 
    299     PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); 
     340/* 
     341 * pj_ioqueue_unregister() 
     342 */ 
     343PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 
     344{ 
     345    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
    300346 
    301347#if PJ_HAS_TCP 
    302348    if (key->connecting) { 
    303349        unsigned pos; 
     350        pj_ioqueue_t *ioqueue; 
     351 
     352        ioqueue = key->ioqueue; 
    304353 
    305354        /* Erase from connecting_handles */ 
    306         pj_lock_acquire(ioque->lock); 
    307         for (pos=0; pos < ioque->connecting_count; ++pos) { 
    308             if (ioque->connecting_keys[pos] == key) { 
    309                 erase_connecting_socket(ioque, pos); 
    310                 if (key->accept_overlapped.newsock_ptr) { 
    311                     /* ??? shouldn't it be newsock instead of newsock_ptr??? */ 
    312                     closesocket(*key->accept_overlapped.newsock_ptr); 
    313                 } 
     355        pj_lock_acquire(ioqueue->lock); 
     356        for (pos=0; pos < ioqueue->connecting_count; ++pos) { 
     357            if (ioqueue->connecting_keys[pos] == key) { 
     358                erase_connecting_socket(ioqueue, pos); 
    314359                break; 
    315360            } 
    316361        } 
    317         pj_lock_release(ioque->lock); 
    318362        key->connecting = 0; 
     363        pj_lock_release(ioqueue->lock); 
    319364    } 
    320365#endif 
     
    322367} 
    323368 
     369/* 
     370 * pj_ioqueue_get_user_data() 
     371 */ 
    324372PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) 
    325373{ 
     
    329377 
    330378/* 
     379 * pj_ioqueue_set_user_data() 
     380 */ 
     381PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 
     382                                              void *user_data, 
     383                                              void **old_data ) 
     384{ 
     385    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
     386     
     387    if (old_data) 
     388        *old_data = key->user_data; 
     389 
     390    key->user_data = user_data; 
     391    return PJ_SUCCESS; 
     392} 
     393 
     394/* 
     395 * pj_ioqueue_poll() 
     396 * 
    331397 * Poll for events. 
    332398 */ 
    333 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) 
     399PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 
    334400{ 
    335401    DWORD dwMsec, dwBytesTransfered, dwKey; 
    336     ioqueue_overlapped *ov; 
     402    generic_overlapped *pOv; 
    337403    pj_ioqueue_key_t *key; 
    338404    pj_ssize_t size_status; 
    339405    BOOL rc; 
    340406 
    341     PJ_ASSERT_RETURN(ioque, -PJ_EINVAL); 
     407    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 
    342408 
    343409    /* Check the connecting array. */ 
    344410#if PJ_HAS_TCP 
    345     key = check_connecting(ioque, &size_status); 
     411    key = check_connecting(ioqueue, &size_status); 
    346412    if (key != NULL) { 
    347413        key->cb.on_connect_complete(key, (int)size_status); 
     
    354420 
    355421    /* Poll for completion status. */ 
    356     rc = GetQueuedCompletionStatus(ioque->iocp, &dwBytesTransfered, &dwKey, 
    357                                    (OVERLAPPED**)&ov, dwMsec); 
     422    rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey, 
     423                                   (OVERLAPPED**)&pOv, dwMsec); 
    358424 
    359425    /* The return value is: 
    360426     * - nonzero if event was dequeued. 
    361      * - zero and ov==NULL if no event was dequeued. 
    362      * - zero and ov!=NULL if event for failed I/O was dequeued. 
    363      */ 
    364     if (ov) { 
     427     * - zero and pOv==NULL if no event was dequeued. 
     428     * - zero and pOv!=NULL if event for failed I/O was dequeued. 
     429     */ 
     430    if (pOv) { 
    365431        /* Event was dequeued for either successfull or failed I/O */ 
    366432        key = (pj_ioqueue_key_t*)dwKey; 
    367433        size_status = dwBytesTransfered; 
    368         switch (ov->operation) { 
     434        switch (pOv->operation) { 
    369435        case PJ_IOQUEUE_OP_READ: 
    370436        case PJ_IOQUEUE_OP_RECV: 
    371437        case PJ_IOQUEUE_OP_RECV_FROM: 
    372             key->recv_overlapped.operation = 0; 
     438            pOv->operation = 0; 
    373439            if (key->cb.on_read_complete) 
    374                 key->cb.on_read_complete(key, size_status); 
     440                key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,  
     441                                         size_status); 
    375442            break; 
    376443        case PJ_IOQUEUE_OP_WRITE: 
    377444        case PJ_IOQUEUE_OP_SEND: 
    378445        case PJ_IOQUEUE_OP_SEND_TO: 
    379             key->send_overlapped.operation = 0; 
     446            pOv->operation = 0; 
    380447            if (key->cb.on_write_complete) 
    381                 key->cb.on_write_complete(key, size_status); 
     448                key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,  
     449                                                size_status); 
    382450            break; 
    383451#if PJ_HAS_TCP 
    384452        case PJ_IOQUEUE_OP_ACCEPT: 
    385453            /* special case for accept. */ 
    386             ioqueue_on_accept_complete((ioqueue_accept_rec*)ov); 
    387             if (key->cb.on_accept_complete) 
    388                 key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 
    389                                            0); 
     454            ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv); 
     455            if (key->cb.on_accept_complete) { 
     456                ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv; 
     457                key->cb.on_accept_complete(key,  
     458                                           (pj_ioqueue_op_key_t*)pOv,  
     459                                           accept_rec->newsock, 
     460                                           PJ_SUCCESS); 
     461            } 
    390462            break; 
    391463        case PJ_IOQUEUE_OP_CONNECT: 
     
    399471 
    400472    if (GetLastError()==WAIT_TIMEOUT) { 
    401         /* Check the connecting array. */ 
    402 #if PJ_HAS_TCP 
    403         key = check_connecting(ioque, &size_status); 
     473        /* Check the connecting array (again). */ 
     474#if PJ_HAS_TCP 
     475        key = check_connecting(ioqueue, &size_status); 
    404476        if (key != NULL) { 
    405477            key->cb.on_connect_complete(key, (int)size_status); 
     
    413485 
    414486/* 
    415  * pj_ioqueue_read() 
    416  * 
    417  * Initiate overlapped ReadFile operation. 
    418  */ 
    419 PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, 
    420                                      pj_ioqueue_key_t *key, 
    421                                      void *buffer, 
    422                                      pj_size_t buflen) 
    423 { 
    424     BOOL rc; 
    425     DWORD bytesRead; 
    426  
    427     PJ_CHECK_STACK(); 
    428     PJ_UNUSED_ARG(ioque); 
    429  
    430     if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 
    431         pj_assert(!"Operation already pending for this descriptor"); 
    432         return PJ_EBUSY; 
    433     } 
    434  
    435     pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); 
    436     key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ; 
    437  
    438     rc = ReadFile(key->hnd, buffer, buflen, &bytesRead,  
    439                   &key->recv_overlapped.overlapped); 
    440     if (rc == FALSE) { 
    441         DWORD dwStatus = GetLastError(); 
    442         if (dwStatus==ERROR_IO_PENDING) 
    443             return PJ_EPENDING; 
    444         else 
    445             return PJ_STATUS_FROM_OS(dwStatus); 
    446     } else { 
    447         /* 
    448          * This is workaround to a probable bug in Win2000 (probably NT too). 
    449          * Even if 'rc' is TRUE, which indicates operation has completed, 
    450          * GetQueuedCompletionStatus still will return the key. 
    451          * So as work around, we always return PJ_EPENDING here. 
    452          */ 
    453         return PJ_EPENDING; 
    454     } 
    455 } 
    456  
    457 /* 
    458487 * pj_ioqueue_recv() 
    459488 * 
    460489 * Initiate overlapped WSARecv() operation. 
    461490 */ 
    462 PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_t *ioque, 
    463                                       pj_ioqueue_key_t *key, 
     491PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key, 
     492                                      pj_ioqueue_op_key_t *op_key, 
    464493                                      void *buffer, 
    465                                       pj_size_t buflen, 
     494                                      pj_ssize_t *length, 
    466495                                      unsigned flags ) 
    467496{ 
     497    /* 
     498     * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and 
     499     * addrlen here. But unfortunately it generates EINVAL... :-( 
     500     *  -bennylp 
     501     */ 
    468502    int rc; 
    469503    DWORD bytesRead; 
    470504    DWORD dwFlags = 0; 
     505    union operation_key *op_key_rec; 
    471506 
    472507    PJ_CHECK_STACK(); 
    473     PJ_UNUSED_ARG(ioque); 
    474  
    475     if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 
    476         pj_assert(!"Operation already pending for this socket"); 
    477         return PJ_EBUSY; 
    478     } 
    479  
    480     pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); 
    481     key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ; 
    482  
    483     key->recv_overlapped.wsabuf.buf = buffer; 
    484     key->recv_overlapped.wsabuf.len = buflen; 
     508    PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); 
     509 
     510    op_key_rec = (union operation_key*)op_key->internal__; 
     511    op_key_rec->overlapped.wsabuf.buf = buffer; 
     512    op_key_rec->overlapped.wsabuf.len = *length; 
    485513 
    486514    dwFlags = flags; 
    487  
    488     rc = WSARecv((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1,  
    489                  &bytesRead, &dwFlags, 
    490                  &key->recv_overlapped.overlapped, NULL); 
     515     
     516    /* Try non-overlapped received first to see if data is 
     517     * immediately available. 
     518     */ 
     519    rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 
     520                 &bytesRead, &dwFlags, NULL, NULL); 
     521    if (rc == 0) { 
     522        *length = bytesRead; 
     523        return PJ_SUCCESS; 
     524    } else { 
     525        DWORD dwError = WSAGetLastError(); 
     526        if (dwError != WSAEWOULDBLOCK) { 
     527            *length = -1; 
     528            return PJ_RETURN_OS_ERROR(dwError); 
     529        } 
     530    } 
     531 
     532    /* 
     533     * No immediate data available. 
     534     * Register overlapped Recv() operation. 
     535     */ 
     536    pj_memset(&op_key_rec->overlapped.overlapped, 0, 
     537              sizeof(op_key_rec->overlapped.overlapped)); 
     538    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; 
     539 
     540    rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,  
     541                  &bytesRead, &dwFlags,  
     542                  &op_key_rec->overlapped.overlapped, NULL); 
    491543    if (rc == SOCKET_ERROR) { 
    492544        DWORD dwStatus = WSAGetLastError(); 
    493         if (dwStatus==WSA_IO_PENDING) 
    494             return PJ_EPENDING; 
    495         else 
     545        if (dwStatus!=WSA_IO_PENDING) { 
     546            *length = -1; 
    496547            return PJ_STATUS_FROM_OS(dwStatus); 
    497     } else { 
    498         /* Must always return pending status. 
    499          * See comments on pj_ioqueue_read 
    500          * return bytesRead; 
    501          */ 
    502         return PJ_EPENDING; 
    503     } 
     548        } 
     549    } 
     550 
     551    /* Pending operation has been scheduled. */ 
     552    return PJ_EPENDING; 
    504553} 
    505554 
     
    509558 * Initiate overlapped RecvFrom() operation. 
    510559 */ 
    511 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, 
    512                                          pj_ioqueue_key_t *key, 
     560PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 
     561                                         pj_ioqueue_op_key_t *op_key, 
    513562                                         void *buffer, 
    514                                          pj_size_t buflen, 
     563                                         pj_ssize_t *length, 
    515564                                         unsigned flags, 
    516565                                         pj_sockaddr_t *addr, 
    517566                                         int *addrlen) 
    518567{ 
    519     BOOL rc; 
     568    int rc; 
    520569    DWORD bytesRead; 
    521     DWORD dwFlags; 
     570    DWORD dwFlags = 0; 
     571    union operation_key *op_key_rec; 
    522572 
    523573    PJ_CHECK_STACK(); 
    524     PJ_UNUSED_ARG(ioque); 
    525  
    526     if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 
    527         pj_assert(!"Operation already pending for this socket"); 
    528         return PJ_EBUSY; 
    529     } 
    530  
    531     pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); 
    532     key->recv_overlapped.operation = PJ_IOQUEUE_OP_RECV_FROM; 
    533     key->recv_overlapped.wsabuf.buf = buffer; 
    534     key->recv_overlapped.wsabuf.len = buflen; 
     574    PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); 
     575 
     576    op_key_rec = (union operation_key*)op_key->internal__; 
     577    op_key_rec->overlapped.wsabuf.buf = buffer; 
     578    op_key_rec->overlapped.wsabuf.len = *length; 
     579 
    535580    dwFlags = flags; 
    536     rc = WSARecvFrom((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1,  
    537                      &bytesRead, &dwFlags,  
    538                      addr, addrlen, 
    539                      &key->recv_overlapped.overlapped, NULL); 
     581     
     582    /* Try non-overlapped received first to see if data is 
     583     * immediately available. 
     584     */ 
     585    rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 
     586                     &bytesRead, &dwFlags, addr, addrlen, NULL, NULL); 
     587    if (rc == 0) { 
     588        *length = bytesRead; 
     589        return PJ_SUCCESS; 
     590    } else { 
     591        DWORD dwError = WSAGetLastError(); 
     592        if (dwError != WSAEWOULDBLOCK) { 
     593            *length = -1; 
     594            return PJ_RETURN_OS_ERROR(dwError); 
     595        } 
     596    } 
     597 
     598    /* 
     599     * No immediate data available. 
     600     * Register overlapped Recv() operation. 
     601     */ 
     602    pj_memset(&op_key_rec->overlapped.overlapped, 0, 
     603              sizeof(op_key_rec->overlapped.overlapped)); 
     604    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; 
     605 
     606    rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,  
     607                     &bytesRead, &dwFlags, addr, addrlen, 
     608                     &op_key_rec->overlapped.overlapped, NULL); 
    540609    if (rc == SOCKET_ERROR) { 
    541610        DWORD dwStatus = WSAGetLastError(); 
    542         if (dwStatus==WSA_IO_PENDING) 
    543             return PJ_EPENDING; 
    544         else 
     611        if (dwStatus!=WSA_IO_PENDING) { 
     612            *length = -1; 
    545613            return PJ_STATUS_FROM_OS(dwStatus); 
    546     } else { 
    547         /* Must always return pending status. 
    548          * See comments on pj_ioqueue_read 
    549          * return bytesRead; 
    550          */ 
    551         return PJ_EPENDING; 
    552     } 
    553 } 
    554  
    555 /* 
    556  * pj_ioqueue_write() 
    557  * 
    558  * Initiate overlapped WriteFile() operation. 
    559  */ 
    560 PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, 
    561                                       pj_ioqueue_key_t *key, 
    562                                       const void *data, 
    563                                       pj_size_t datalen) 
    564 { 
    565     BOOL rc; 
    566     DWORD bytesWritten; 
    567  
    568     PJ_CHECK_STACK(); 
    569     PJ_UNUSED_ARG(ioque); 
    570  
    571     if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 
    572         pj_assert(!"Operation already pending for this descriptor"); 
    573         return PJ_EBUSY; 
    574     } 
    575  
    576     pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); 
    577     key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; 
    578     rc = WriteFile(key->hnd, data, datalen, &bytesWritten,  
    579                    &key->send_overlapped.overlapped); 
     614        } 
     615    }  
    580616     
    581     if (rc == FALSE) { 
    582         DWORD dwStatus = GetLastError(); 
    583         if (dwStatus==ERROR_IO_PENDING) 
    584             return PJ_EPENDING; 
    585         else 
    586             return PJ_STATUS_FROM_OS(dwStatus); 
    587     } else { 
    588         /* Must always return pending status. 
    589          * See comments on pj_ioqueue_read 
    590          * return bytesWritten; 
    591          */ 
    592         return PJ_EPENDING; 
    593     } 
    594 } 
    595  
     617    /* Pending operation has been scheduled. */ 
     618    return PJ_EPENDING; 
     619} 
    596620 
    597621/* 
     
    600624 * Initiate overlapped Send operation. 
    601625 */ 
    602 PJ_DEF(pj_status_t) pj_ioqueue_send(  pj_ioqueue_t *ioque, 
    603                                       pj_ioqueue_key_t *key, 
     626PJ_DEF(pj_status_t) pj_ioqueue_send(  pj_ioqueue_key_t *key, 
     627                                      pj_ioqueue_op_key_t *op_key, 
    604628                                      const void *data, 
    605                                       pj_size_t datalen, 
     629                                      pj_ssize_t *length, 
    606630                                      unsigned flags ) 
     631{ 
     632    return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0); 
     633} 
     634 
     635 
     636/* 
     637 * pj_ioqueue_sendto() 
     638 * 
     639 * Initiate overlapped SendTo operation. 
     640 */ 
     641PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 
     642                                       pj_ioqueue_op_key_t *op_key, 
     643                                       const void *data, 
     644                                       pj_ssize_t *length, 
     645                                       unsigned flags, 
     646                                       const pj_sockaddr_t *addr, 
     647                                       int addrlen) 
    607648{ 
    608649    int rc; 
    609650    DWORD bytesWritten; 
    610651    DWORD dwFlags; 
     652    union operation_key *op_key_rec; 
    611653 
    612654    PJ_CHECK_STACK(); 
    613     PJ_UNUSED_ARG(ioque); 
    614  
    615     if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 
    616         pj_assert(!"Operation already pending for this socket"); 
    617         return PJ_EBUSY; 
    618     } 
    619  
    620     pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); 
    621     key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; 
    622     key->send_overlapped.wsabuf.buf = (void*)data; 
    623     key->send_overlapped.wsabuf.len = datalen; 
     655    PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); 
     656     
     657    op_key_rec = (union operation_key*)op_key->internal__; 
     658 
    624659    dwFlags = flags; 
    625     rc = WSASend((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, 
    626                  &bytesWritten,  dwFlags, 
    627                  &key->send_overlapped.overlapped, NULL); 
     660 
     661    /* 
     662     * First try blocking write. 
     663     */ 
     664    op_key_rec->overlapped.wsabuf.buf = (void*)data; 
     665    op_key_rec->overlapped.wsabuf.len = *length; 
     666 
     667    rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 
     668                   &bytesWritten, dwFlags, addr, addrlen, 
     669                   NULL, NULL); 
     670    if (rc == 0) { 
     671        *length = bytesWritten; 
     672        return PJ_SUCCESS; 
     673    } else { 
     674        DWORD dwStatus = WSAGetLastError(); 
     675        if (dwStatus != WSAEWOULDBLOCK) { 
     676            *length = -1; 
     677            return PJ_RETURN_OS_ERROR(dwStatus); 
     678        } 
     679    } 
     680 
     681    /* 
     682     * Data can't be sent immediately. 
     683     * Schedule asynchronous WSASend(). 
     684     */ 
     685    pj_memset(&op_key_rec->overlapped.overlapped, 0, 
     686              sizeof(op_key_rec->overlapped.overlapped)); 
     687    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND; 
     688 
     689    rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 
     690                   &bytesWritten,  dwFlags, addr, addrlen, 
     691                   &op_key_rec->overlapped.overlapped, NULL); 
    628692    if (rc == SOCKET_ERROR) { 
    629693        DWORD dwStatus = WSAGetLastError(); 
    630         if (dwStatus==WSA_IO_PENDING) 
    631             return PJ_EPENDING; 
    632         else 
     694        if (dwStatus!=WSA_IO_PENDING) 
    633695            return PJ_STATUS_FROM_OS(dwStatus); 
    634     } else { 
    635         /* Must always return pending status. 
    636          * See comments on pj_ioqueue_read 
    637          * return bytesRead; 
    638          */ 
    639         return PJ_EPENDING; 
    640     } 
    641 } 
    642  
    643  
    644 /* 
    645  * pj_ioqueue_sendto() 
    646  * 
    647  * Initiate overlapped SendTo operation. 
    648  */ 
    649 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, 
    650                                        pj_ioqueue_key_t *key, 
    651                                        const void *data, 
    652                                        pj_size_t datalen, 
    653                                        unsigned flags, 
    654                                        const pj_sockaddr_t *addr, 
    655                                        int addrlen) 
    656 { 
    657     BOOL rc; 
    658     DWORD bytesSent; 
    659     DWORD dwFlags; 
    660  
    661     PJ_CHECK_STACK(); 
    662     PJ_UNUSED_ARG(ioque); 
    663  
    664     if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 
    665         pj_assert(!"Operation already pending for this socket"); 
    666         return PJ_EBUSY; 
    667     } 
    668  
    669     pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); 
    670     key->send_overlapped.operation = PJ_IOQUEUE_OP_SEND_TO; 
    671     key->send_overlapped.wsabuf.buf = (char*)data; 
    672     key->send_overlapped.wsabuf.len = datalen; 
    673     dwFlags = flags; 
    674     rc = WSASendTo((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1,  
    675                    &bytesSent, dwFlags, addr,  
    676                    addrlen, &key->send_overlapped.overlapped, NULL); 
    677     if (rc == SOCKET_ERROR) { 
    678         DWORD dwStatus = WSAGetLastError(); 
    679         if (dwStatus==WSA_IO_PENDING) 
    680             return PJ_EPENDING; 
    681         else 
    682             return PJ_STATUS_FROM_OS(dwStatus); 
    683     } else { 
    684         // Must always return pending status. 
    685         // See comments on pj_ioqueue_read 
    686         // return bytesSent; 
    687         return PJ_EPENDING; 
    688     } 
     696    } 
     697 
     698    /* Asynchronous operation successfully submitted. */ 
     699    return PJ_EPENDING; 
    689700} 
    690701 
     
    696707 * Initiate overlapped accept() operation. 
    697708 */ 
    698 PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, 
    699                                pj_ioqueue_key_t *key, 
    700                                pj_sock_t *new_sock, 
    701                                pj_sockaddr_t *local, 
    702                                pj_sockaddr_t *remote, 
    703                                int *addrlen) 
     709PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 
     710                                       pj_ioqueue_op_key_t *op_key, 
     711                                       pj_sock_t *new_sock, 
     712                                       pj_sockaddr_t *local, 
     713                                       pj_sockaddr_t *remote, 
     714                                       int *addrlen) 
    704715{ 
    705716    BOOL rc; 
    706717    DWORD bytesReceived; 
    707718    pj_status_t status; 
     719    union operation_key *op_key_rec; 
     720    SOCKET sock; 
    708721 
    709722    PJ_CHECK_STACK(); 
    710     PJ_UNUSED_ARG(ioqueue); 
    711  
    712     if (key->accept_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 
    713         pj_assert(!"Operation already pending for this socket"); 
    714         return PJ_EBUSY; 
    715     } 
    716  
    717     if (key->accept_overlapped.newsock == PJ_INVALID_SOCKET) { 
    718         pj_sock_t sock; 
    719         status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock); 
    720         if (status != PJ_SUCCESS) 
    721             return status; 
    722  
    723         key->accept_overlapped.newsock = sock; 
    724     } 
    725     key->accept_overlapped.operation = PJ_IOQUEUE_OP_ACCEPT; 
    726     key->accept_overlapped.addrlen = addrlen; 
    727     key->accept_overlapped.local = local; 
    728     key->accept_overlapped.remote = remote; 
    729     key->accept_overlapped.newsock_ptr = new_sock; 
    730     pj_memset(&key->accept_overlapped.overlapped, 0,  
    731               sizeof(key->accept_overlapped.overlapped)); 
    732  
    733     rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)key->accept_overlapped.newsock, 
    734                    key->accept_overlapped.accept_buf, 
     723    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 
     724 
     725    /* 
     726     * See if there is a new connection immediately available. 
     727     */ 
     728    sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0); 
     729    if (sock != INVALID_SOCKET) { 
     730        /* Yes! New socket is available! */ 
     731        int status; 
     732 
     733        status = getsockname(sock, local, addrlen); 
     734        if (status != 0) { 
     735            DWORD dwError = WSAGetLastError(); 
     736            closesocket(sock); 
     737            return PJ_RETURN_OS_ERROR(dwError); 
     738        } 
     739 
     740        *new_sock = sock; 
     741        return PJ_SUCCESS; 
     742 
     743    } else { 
     744        DWORD dwError = WSAGetLastError(); 
     745        if (dwError != WSAEWOULDBLOCK) { 
     746            return PJ_RETURN_OS_ERROR(dwError); 
     747        } 
     748    } 
     749 
     750    /* 
     751     * No connection is immediately available. 
     752     * Must schedule an asynchronous operation. 
     753     */ 
     754    op_key_rec = (union operation_key*)op_key->internal__; 
     755     
     756    status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0,  
     757                            &op_key_rec->accept.newsock); 
     758    if (status != PJ_SUCCESS) 
     759        return status; 
     760 
     761    /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket  
     762     * addresses can be obtained with getsockname() and getpeername(). 
     763     */ 
     764    status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET, 
     765                        SO_UPDATE_ACCEPT_CONTEXT,  
     766                        (char*)&key->hnd, sizeof(SOCKET)); 
     767    /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later. 
     768     * So ignore the error status. 
     769     */ 
     770 
     771    op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT; 
     772    op_key_rec->accept.addrlen = addrlen; 
     773    op_key_rec->accept.local = local; 
     774    op_key_rec->accept.remote = remote; 
     775    op_key_rec->accept.newsock_ptr = new_sock; 
     776    pj_memset(&op_key_rec->accept.overlapped, 0,  
     777              sizeof(op_key_rec->accept.overlapped)); 
     778 
     779    rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock, 
     780                   op_key_rec->accept.accept_buf, 
    735781                   0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, 
    736782                   &bytesReceived, 
    737                    &key->accept_overlapped.overlapped); 
     783                   &op_key_rec->accept.overlapped ); 
    738784 
    739785    if (rc == TRUE) { 
    740         ioqueue_on_accept_complete(&key->accept_overlapped); 
    741         if (key->cb.on_accept_complete) 
    742             key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 0); 
     786        ioqueue_on_accept_complete(&op_key_rec->accept); 
    743787        return PJ_SUCCESS; 
    744788    } else { 
    745789        DWORD dwStatus = WSAGetLastError(); 
    746         if (dwStatus==WSA_IO_PENDING) 
    747             return PJ_EPENDING; 
    748         else 
     790        if (dwStatus!=WSA_IO_PENDING) 
    749791            return PJ_STATUS_FROM_OS(dwStatus); 
    750792    } 
     793 
     794    /* Asynchronous Accept() has been submitted. */ 
     795    return PJ_EPENDING; 
    751796} 
    752797 
     
    758803 * since there's no overlapped version of connect()). 
    759804 */ 
    760 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, 
    761                                         pj_ioqueue_key_t *key, 
     805PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 
    762806                                        const pj_sockaddr_t *addr, 
    763807                                        int addrlen ) 
    764808{ 
    765     unsigned long optval = 1; 
    766809    HANDLE hEvent; 
     810    pj_ioqueue_t *ioqueue; 
    767811 
    768812    PJ_CHECK_STACK(); 
    769  
    770     /* Set socket to non-blocking. */ 
    771     if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { 
    772         return PJ_RETURN_OS_ERROR(WSAGetLastError()); 
    773     } 
     813    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 
    774814 
    775815    /* Initiate connect() */ 
     
    777817        DWORD dwStatus; 
    778818        dwStatus = WSAGetLastError(); 
    779         if (dwStatus != WSAEWOULDBLOCK) { 
    780             /* Permanent error */ 
     819        if (dwStatus != WSAEWOULDBLOCK) { 
    781820            return PJ_RETURN_OS_ERROR(dwStatus); 
    782         } else { 
    783             /* Pending operation. This is what we're looking for. */ 
    784821        } 
    785822    } else { 
    786823        /* Connect has completed immediately! */ 
    787         /* Restore to blocking mode. */ 
    788         optval = 0; 
    789         if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { 
    790             return PJ_RETURN_OS_ERROR(WSAGetLastError()); 
    791         } 
    792  
    793         key->cb.on_connect_complete(key, 0); 
    794824        return PJ_SUCCESS; 
    795825    } 
     826 
     827    ioqueue = key->ioqueue; 
    796828 
    797829    /* Add to the array of connecting socket to be polled */ 
  • pjproject/main/pjlib/src/pjlib-test/atomic.c

    r6 r11  
    5050 
    5151    /* increment. */ 
    52     if (pj_atomic_inc(atomic_var) != 112) 
     52    pj_atomic_inc(atomic_var); 
     53    if (pj_atomic_get(atomic_var) != 112) 
    5354        return -40; 
    5455 
    5556    /* decrement. */ 
    56     if (pj_atomic_dec(atomic_var) != 111) 
     57    pj_atomic_dec(atomic_var); 
     58    if (pj_atomic_get(atomic_var) != 111) 
    5759        return -50; 
    5860 
    5961    /* set */ 
    60     if (pj_atomic_set(atomic_var, 211) != 111) 
     62    pj_atomic_set(atomic_var, 211); 
     63    if (pj_atomic_get(atomic_var) != 211) 
     64        return -60; 
     65 
     66    /* add */ 
     67    pj_atomic_add(atomic_var, 10); 
     68    if (pj_atomic_get(atomic_var) != 221) 
    6169        return -60; 
    6270 
    6371    /* check the value again. */ 
    64     if (pj_atomic_get(atomic_var) != 211) 
     72    if (pj_atomic_get(atomic_var) != 221) 
    6573        return -70; 
    6674 
  • pjproject/main/pjlib/src/pjlib-test/ioq_perf.c

    r6 r11  
    3535typedef struct test_item 
    3636{ 
    37     pj_sock_t       server_fd,  
    38                     client_fd; 
    39     pj_ioqueue_t    *ioqueue; 
    40     pj_ioqueue_key_t *server_key, 
    41                    *client_key; 
    42     pj_size_t       buffer_size; 
    43     char           *outgoing_buffer; 
    44     char           *incoming_buffer; 
    45     pj_size_t       bytes_sent,  
    46                     bytes_recv; 
     37    pj_sock_t            server_fd,  
     38                         client_fd; 
     39    pj_ioqueue_t        *ioqueue; 
     40    pj_ioqueue_key_t    *server_key, 
     41                        *client_key; 
     42    pj_ioqueue_op_key_t  recv_op, 
     43                         send_op; 
     44    int                  has_pending_send; 
     45    pj_size_t            buffer_size; 
     46    char                *outgoing_buffer; 
     47    char                *incoming_buffer; 
     48    pj_size_t            bytes_sent,  
     49                         bytes_recv; 
    4750} test_item; 
    4851 
     
    5053 * Increment item->bytes_recv and ready to read the next data. 
    5154 */ 
    52 static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) 
     55static void on_read_complete(pj_ioqueue_key_t *key,  
     56                             pj_ioqueue_op_key_t *op_key, 
     57                             pj_ssize_t bytes_read) 
    5358{ 
    5459    test_item *item = pj_ioqueue_get_user_data(key); 
    5560    pj_status_t rc; 
     61    int data_is_available = 1; 
    5662 
    5763    //TRACE_((THIS_FILE, "     read complete, bytes_read=%d", bytes_read)); 
    5864 
    59     if (thread_quit_flag) 
    60         return; 
    61  
    62     if (bytes_read < 0) { 
    63         pj_status_t rc = -bytes_read; 
    64         char errmsg[128]; 
    65  
    66         if (rc != last_error) { 
    67             last_error = rc; 
    68             pj_strerror(rc, errmsg, sizeof(errmsg)); 
    69             PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",  
    70                       bytes_read, errmsg)); 
    71             PJ_LOG(3,(THIS_FILE,  
    72                       ".....additional info: total read=%u, total written=%u", 
    73                       item->bytes_recv, item->bytes_sent)); 
    74         } else { 
    75             last_error_counter++; 
    76         } 
    77         bytes_read = 0; 
    78  
    79     } else if (bytes_read == 0) { 
    80         PJ_LOG(3,(THIS_FILE, "...socket has closed!")); 
    81     } 
    82  
    83     item->bytes_recv += bytes_read; 
     65    do { 
     66        if (thread_quit_flag) 
     67            return; 
     68 
     69        if (bytes_read < 0) { 
     70            pj_status_t rc = -bytes_read; 
     71            char errmsg[128]; 
     72 
     73            if (rc != last_error) { 
     74                last_error = rc; 
     75                pj_strerror(rc, errmsg, sizeof(errmsg)); 
     76                PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",  
     77                          bytes_read, errmsg)); 
     78                PJ_LOG(3,(THIS_FILE,  
     79                          ".....additional info: total read=%u, total written=%u", 
     80                          item->bytes_recv, item->bytes_sent)); 
     81            } else { 
     82                last_error_counter++; 
     83            } 
     84            bytes_read = 0; 
     85 
     86        } else if (bytes_read == 0) { 
     87            PJ_LOG(3,(THIS_FILE, "...socket has closed!")); 
     88        } 
     89 
     90        item->bytes_recv += bytes_read; 
    8491     
    85     /* To assure that the test quits, even if main thread 
    86      * doesn't have time to run. 
    87      */ 
    88     if (item->bytes_recv > item->buffer_size * 10000)  
    89         thread_quit_flag = 1; 
    90  
    91     rc = pj_ioqueue_recv( item->ioqueue, item->server_key, 
    92                           item->incoming_buffer, item->buffer_size, 0 ); 
    93  
    94     if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
    95         if (rc != last_error) { 
    96             last_error = rc; 
    97             app_perror("...error: read error", rc); 
    98         } else { 
    99             last_error_counter++; 
    100         } 
    101     } 
     92        /* To assure that the test quits, even if main thread 
     93         * doesn't have time to run. 
     94         */ 
     95        if (item->bytes_recv > item->buffer_size * 10000)  
     96            thread_quit_flag = 1; 
     97 
     98        bytes_read = item->buffer_size; 
     99        rc = pj_ioqueue_recv( key, op_key, 
     100                              item->incoming_buffer, &bytes_read, 0 ); 
     101 
     102        if (rc == PJ_SUCCESS) { 
     103            data_is_available = 1; 
     104        } else if (rc == PJ_EPENDING) { 
     105            data_is_available = 0; 
     106        } else { 
     107            data_is_available = 0; 
     108            if (rc != last_error) { 
     109                last_error = rc; 
     110                app_perror("...error: read error", rc); 
     111            } else { 
     112                last_error_counter++; 
     113            } 
     114        } 
     115 
     116        if (!item->has_pending_send) { 
     117            pj_ssize_t sent = item->buffer_size; 
     118            rc = pj_ioqueue_send(item->client_key, &item->send_op, 
     119                                 item->outgoing_buffer, &sent, 0); 
     120            if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
     121                app_perror("...error: write error", rc); 
     122            } 
     123 
     124            item->has_pending_send = (rc==PJ_EPENDING); 
     125        } 
     126 
     127    } while (data_is_available); 
    102128} 
    103129 
     
    105131 * Increment item->bytes_sent and write the next data. 
    106132 */ 
    107 static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) 
     133static void on_write_complete(pj_ioqueue_key_t *key,  
     134                              pj_ioqueue_op_key_t *op_key, 
     135                              pj_ssize_t bytes_sent) 
    108136{ 
    109137    test_item *item = pj_ioqueue_get_user_data(key); 
     
    114142        return; 
    115143 
     144    item->has_pending_send = 0; 
    116145    item->bytes_sent += bytes_sent; 
    117146 
     
    123152        pj_status_t rc; 
    124153 
    125         rc = pj_ioqueue_write(item->ioqueue, item->client_key,  
    126                               item->outgoing_buffer, item->buffer_size); 
     154        bytes_sent = item->buffer_size; 
     155        rc = pj_ioqueue_send( item->client_key, op_key, 
     156                              item->outgoing_buffer, &bytes_sent, 0); 
    127157        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
    128158            app_perror("...error: write error", rc); 
    129159        } 
     160 
     161        item->has_pending_send = (rc==PJ_EPENDING); 
    130162    } 
    131163} 
     
    192224 
    193225    TRACE_((THIS_FILE, "     creating ioqueue..")); 
    194     rc = pj_ioqueue_create(pool, sockpair_cnt*2, thread_cnt, &ioqueue); 
     226    rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); 
    195227    if (rc != PJ_SUCCESS) { 
    196228        app_perror("...error: unable to create ioqueue", rc); 
     
    200232    /* Initialize each producer-consumer pair. */ 
    201233    for (i=0; i<sockpair_cnt; ++i) { 
     234        pj_ssize_t bytes; 
    202235 
    203236        items[i].ioqueue = ioqueue; 
     
    243276        /* Start reading. */ 
    244277        TRACE_((THIS_FILE, "      pj_ioqueue_recv..")); 
    245         rc = pj_ioqueue_recv(ioqueue, items[i].server_key, 
    246                              items[i].incoming_buffer, items[i].buffer_size, 
     278        bytes = items[i].buffer_size; 
     279        rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op, 
     280                             items[i].incoming_buffer, &bytes, 
    247281                             0); 
    248         if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
     282        if (rc != PJ_EPENDING) { 
    249283            app_perror("...error: pj_ioqueue_recv", rc); 
    250284            return -73; 
     
    253287        /* Start writing. */ 
    254288        TRACE_((THIS_FILE, "      pj_ioqueue_write..")); 
    255         rc = pj_ioqueue_write(ioqueue, items[i].client_key, 
    256                               items[i].outgoing_buffer, items[i].buffer_size); 
     289        bytes = items[i].buffer_size; 
     290        rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op, 
     291                             items[i].outgoing_buffer, &bytes, 0); 
    257292        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
    258293            app_perror("...error: pj_ioqueue_write", rc); 
     
    260295        } 
    261296 
     297        items[i].has_pending_send = (rc==PJ_EPENDING); 
    262298    } 
    263299 
     
    325361    TRACE_((THIS_FILE, "     closing all sockets..")); 
    326362    for (i=0; i<sockpair_cnt; ++i) { 
    327         pj_ioqueue_unregister(ioqueue, items[i].server_key); 
    328         pj_ioqueue_unregister(ioqueue, items[i].client_key); 
     363        pj_ioqueue_unregister(items[i].server_key); 
     364        pj_ioqueue_unregister(items[i].client_key); 
    329365        pj_sock_close(items[i].server_fd); 
    330366        pj_sock_close(items[i].client_fd); 
  • pjproject/main/pjlib/src/pjlib-test/ioq_tcp.c

    r6 r11  
    3232#define POOL_SIZE           (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048) 
    3333 
    34 static pj_ssize_t       callback_read_size, 
    35                         callback_write_size, 
    36                         callback_accept_status, 
    37                         callback_connect_status; 
    38 static pj_ioqueue_key_t*callback_read_key, 
    39                        *callback_write_key, 
    40                        *callback_accept_key, 
    41                        *callback_connect_key; 
    42  
    43 static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) 
     34static pj_ssize_t            callback_read_size, 
     35                             callback_write_size, 
     36                             callback_accept_status, 
     37                             callback_connect_status; 
     38static pj_ioqueue_key_t     *callback_read_key, 
     39                            *callback_write_key, 
     40                            *callback_accept_key, 
     41                            *callback_connect_key; 
     42static pj_ioqueue_op_key_t  *callback_read_op, 
     43                            *callback_write_op, 
     44                            *callback_accept_op; 
     45 
     46static void on_ioqueue_read(pj_ioqueue_key_t *key,  
     47                            pj_ioqueue_op_key_t *op_key, 
     48                            pj_ssize_t bytes_read) 
    4449{ 
    4550    callback_read_key = key; 
     51    callback_read_op = op_key; 
    4652    callback_read_size = bytes_read; 
    4753} 
    4854 
    49 static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written) 
     55static void on_ioqueue_write(pj_ioqueue_key_t *key,  
     56                             pj_ioqueue_op_key_t *op_key, 
     57                             pj_ssize_t bytes_written) 
    5058{ 
    5159    callback_write_key = key; 
     60    callback_write_op = op_key; 
    5261    callback_write_size = bytes_written; 
    5362} 
    5463 
    55 static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock,  
     64static void on_ioqueue_accept(pj_ioqueue_key_t *key,  
     65                              pj_ioqueue_op_key_t *op_key, 
     66                              pj_sock_t sock,  
    5667                              int status) 
    5768{ 
     
    5970 
    6071    callback_accept_key = key; 
     72    callback_accept_op = op_key; 
    6173    callback_accept_status = status; 
    6274} 
     
    8496                          pj_timestamp *t_elapsed) 
    8597{ 
    86     int rc; 
     98    pj_status_t status; 
    8799    pj_ssize_t bytes; 
     100    pj_time_val timeout; 
    88101    pj_timestamp t1, t2; 
    89102    int pending_op = 0; 
     103    pj_ioqueue_op_key_t read_op, write_op; 
    90104 
    91105    // Start reading on the server side. 
    92     rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize); 
    93     if (rc != 0 && rc != PJ_EPENDING) { 
     106    bytes = bufsize; 
     107    status = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0); 
     108    if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
     109        app_perror("...pj_ioqueue_recv error", status); 
    94110        return -100; 
    95111    } 
    96112     
    97     ++pending_op; 
     113    if (status == PJ_EPENDING) 
     114        ++pending_op; 
     115    else { 
     116        /* Does not expect to return error or immediate data. */ 
     117        return -115; 
     118    } 
    98119 
    99120    // Randomize send buffer. 
     
    101122 
    102123    // Starts send on the client side. 
    103     bytes = pj_ioqueue_write(ioque, ckey, send_buf, bufsize); 
    104     if (bytes != bufsize && bytes != PJ_EPENDING) { 
     124    bytes = bufsize; 
     125    status = pj_ioqueue_send(ckey, &write_op, send_buf, &bytes, 0); 
     126    if (status != PJ_SUCCESS && bytes != PJ_EPENDING) { 
    105127        return -120; 
    106128    } 
    107     if (bytes == PJ_EPENDING) { 
     129    if (status == PJ_EPENDING) { 
    108130        ++pending_op; 
    109131    } 
     
    115137    callback_read_size = callback_write_size = 0; 
    116138    callback_read_key = callback_write_key = NULL; 
     139    callback_read_op = callback_write_op = NULL; 
    117140 
    118141    // Poll the queue until we've got completion event in the server side. 
    119     rc = 0; 
     142    status = 0; 
    120143    while (pending_op > 0) { 
    121         rc = pj_ioqueue_poll(ioque, NULL); 
    122         if (rc > 0) { 
     144        timeout.sec = 1; timeout.msec = 0; 
     145        status = pj_ioqueue_poll(ioque, &timeout); 
     146        if (status > 0) { 
    123147            if (callback_read_size) { 
    124                 if (callback_read_size != bufsize) { 
     148                if (callback_read_size != bufsize) 
    125149                    return -160; 
    126                 } 
    127150                if (callback_read_key != skey) 
    128151                    return -161; 
     152                if (callback_read_op != &read_op) 
     153                    return -162; 
    129154            } 
    130155            if (callback_write_size) { 
    131156                if (callback_write_key != ckey) 
    132                     return -162; 
     157                    return -163; 
     158                if (callback_write_op != &write_op) 
     159                    return -164; 
    133160            } 
    134             pending_op -= rc; 
     161            pending_op -= status; 
    135162        } 
    136         if (rc < 0) { 
     163        if (status == 0) { 
     164            PJ_LOG(3,("", "...error: timed out")); 
     165        } 
     166        if (status < 0) { 
    137167            return -170; 
    138168        } 
    139169    } 
     170 
     171    // Pending op is zero. 
     172    // Subsequent poll should yield zero too. 
     173    timeout.sec = timeout.msec = 0; 
     174    status = pj_ioqueue_poll(ioque, &timeout); 
     175    if (status != 0) 
     176        return -173; 
    140177 
    141178    // End time. 
     
    143180    t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo); 
    144181 
    145     if (rc < 0) { 
    146         return -150; 
     182    if (status < 0) { 
     183        return -176; 
    147184    } 
    148185 
     
    169206    pj_ioqueue_t *ioque = NULL; 
    170207    pj_ioqueue_key_t *skey, *ckey0, *ckey1; 
     208    pj_ioqueue_op_key_t accept_op; 
    171209    int bufsize = BUF_MIN_SIZE; 
    172210    pj_ssize_t status = -1; 
     
    206244 
    207245    // Create I/O Queue. 
    208     rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque); 
     246    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 
    209247    if (rc != PJ_SUCCESS) { 
    210248        app_perror("...ERROR in pj_ioqueue_create()", rc); 
     
    232270    // Server socket accept() 
    233271    client_addr_len = sizeof(pj_sockaddr_in); 
    234     status = pj_ioqueue_accept(ioque, skey, &csock0, &client_addr, &rmt_addr, &client_addr_len); 
     272    status = pj_ioqueue_accept(skey, &accept_op, &csock0,  
     273                               &client_addr, &rmt_addr, &client_addr_len); 
    235274    if (status != PJ_EPENDING) { 
    236275        app_perror("...ERROR in pj_ioqueue_accept()", rc); 
     
    248287 
    249288    // Client socket connect() 
    250     status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr)); 
     289    status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); 
    251290    if (status!=PJ_SUCCESS && status != PJ_EPENDING) { 
    252291        app_perror("...ERROR in pj_ioqueue_connect()", rc); 
     
    263302    callback_read_key = callback_write_key =  
    264303        callback_accept_key = callback_connect_key = NULL; 
     304    callback_accept_op = callback_read_op = callback_write_op = NULL; 
    265305 
    266306    while (pending_op) { 
     
    274314                } 
    275315                if (callback_accept_key != skey) { 
    276                     status=-41; goto on_error; 
     316                    status=-42; goto on_error; 
    277317                } 
     318                if (callback_accept_op != &accept_op) { 
     319                    status=-43; goto on_error; 
     320                } 
     321                callback_accept_status = -2; 
    278322            } 
    279323 
     
    285329                    status=-51; goto on_error; 
    286330                } 
     331                callback_connect_status = -2; 
    287332            } 
    288333 
     
    293338            } 
    294339        } 
     340    } 
     341 
     342    // There's no pending operation. 
     343    // When we poll the ioqueue, there must not be events. 
     344    if (pending_op == 0) { 
     345        pj_time_val timeout = {1, 0}; 
     346        status = pj_ioqueue_poll(ioque, &timeout); 
     347        if (status != 0) { 
     348            status=-60; goto on_error; 
     349        } 
    295350    } 
    296351 
     
    313368    // Test send and receive. 
    314369    t_elapsed.u32.lo = 0; 
    315     status = send_recv_test(ioque, ckey0, ckey1, send_buf, recv_buf, bufsize, &t_elapsed); 
     370    status = send_recv_test(ioque, ckey0, ckey1, send_buf,  
     371                            recv_buf, bufsize, &t_elapsed); 
    316372    if (status != 0) { 
    317373        goto on_error; 
     
    355411 
    356412    // Create I/O Queue. 
    357     rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque); 
     413    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 
    358414    if (!ioque) { 
    359415        status=-20; goto on_error; 
     
    382438 
    383439    // Client socket connect() 
    384     status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr)); 
     440    status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); 
    385441    if (status==PJ_SUCCESS) { 
    386442        // unexpectedly success! 
     
    418474    } 
    419475 
     476    // There's no pending operation. 
     477    // When we poll the ioqueue, there must not be events. 
     478    if (pending_op == 0) { 
     479        pj_time_val timeout = {1, 0}; 
     480        status = pj_ioqueue_poll(ioque, &timeout); 
     481        if (status != 0) { 
     482            status=-60; goto on_error; 
     483        } 
     484    } 
     485 
    420486    // Success 
    421487    status = 0; 
  • pjproject/main/pjlib/src/pjlib-test/ioq_udp.c

    r6 r11  
    3535#define TRACE_(msg)         PJ_LOG(3,(THIS_FILE,"....." msg)) 
    3636 
    37 static pj_ssize_t callback_read_size, 
    38                   callback_write_size, 
    39                   callback_accept_status, 
    40                   callback_connect_status; 
    41 static pj_ioqueue_key_t *callback_read_key, 
    42                         *callback_write_key, 
    43                         *callback_accept_key, 
    44                         *callback_connect_key; 
    45  
    46 static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) 
     37static pj_ssize_t            callback_read_size, 
     38                             callback_write_size, 
     39                             callback_accept_status, 
     40                             callback_connect_status; 
     41static pj_ioqueue_key_t     *callback_read_key, 
     42                            *callback_write_key, 
     43                            *callback_accept_key, 
     44                            *callback_connect_key; 
     45static pj_ioqueue_op_key_t  *callback_read_op, 
     46                            *callback_write_op, 
     47                            *callback_accept_op; 
     48 
     49static void on_ioqueue_read(pj_ioqueue_key_t *key,  
     50                            pj_ioqueue_op_key_t *op_key, 
     51                            pj_ssize_t bytes_read) 
    4752{ 
    4853    callback_read_key = key; 
     54    callback_read_op = op_key; 
    4955    callback_read_size = bytes_read; 
    5056} 
    5157 
    52 static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written) 
     58static void on_ioqueue_write(pj_ioqueue_key_t *key,  
     59                             pj_ioqueue_op_key_t *op_key, 
     60                             pj_ssize_t bytes_written) 
    5361{ 
    5462    callback_write_key = key; 
     63    callback_write_op = op_key; 
    5564    callback_write_size = bytes_written; 
    5665} 
    5766 
    58 static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, int status) 
     67static void on_ioqueue_accept(pj_ioqueue_key_t *key,  
     68                              pj_ioqueue_op_key_t *op_key, 
     69                              pj_sock_t sock, int status) 
    5970{ 
    6071    PJ_UNUSED_ARG(sock); 
    6172    callback_accept_key = key; 
     73    callback_accept_op = op_key; 
    6274    callback_accept_status = status; 
    6375} 
     
    8294#  define S_ADDR s_addr 
    8395#endif 
    84  
    85 /* 
    86  * native_format_test() 
    87  * This is just a simple test to verify that various structures in sock.h 
    88  * are really compatible with operating system's definitions. 
    89  */ 
    90 static int native_format_test(void) 
    91 { 
    92     pj_status_t rc; 
    93  
    94     // Test that PJ_INVALID_SOCKET is working. 
    95     { 
    96         pj_sock_t sock; 
    97         rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, -1, &sock); 
    98         if (rc == PJ_SUCCESS) 
    99             return -1020; 
    100     } 
    101  
    102     // Previous func will set errno var. 
    103     pj_set_os_error(PJ_SUCCESS); 
    104  
    105     return 0; 
    106 } 
    10796 
    10897/* 
     
    120109    pj_ioqueue_t *ioque = NULL; 
    121110    pj_ioqueue_key_t *skey, *ckey; 
     111    pj_ioqueue_op_key_t read_op, write_op; 
    122112    int bufsize = BUF_MIN_SIZE; 
    123113    pj_ssize_t bytes, status = -1; 
     
    158148    // Create I/O Queue. 
    159149    TRACE_("create ioqueue..."); 
    160     rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,  
    161                            PJ_IOQUEUE_DEFAULT_THREADS, &ioque); 
     150    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 
    162151    if (rc != PJ_SUCCESS) { 
    163152        status=-20; goto on_error; 
     
    196185    TRACE_("start recvfrom..."); 
    197186    addrlen = sizeof(addr); 
    198     bytes = pj_ioqueue_recvfrom(ioque, skey, recv_buf, bufsize, 0, 
    199                                 &addr, &addrlen); 
    200     if (bytes < 0 && bytes != PJ_EPENDING) { 
     187    bytes = bufsize; 
     188    rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0, 
     189                             &addr, &addrlen); 
     190    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
     191        app_perror("...error: pj_ioqueue_recvfrom", rc); 
    201192        status=-28; goto on_error; 
    202     } else if (bytes == PJ_EPENDING) { 
     193    } else if (rc == PJ_EPENDING) { 
    203194        recv_pending = 1; 
    204195        PJ_LOG(3, (THIS_FILE,  
     
    212203    // Write must return the number of bytes. 
    213204    TRACE_("start sendto..."); 
    214     bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, &addr,  
    215                               sizeof(addr)); 
    216     if (bytes != bufsize && bytes != PJ_EPENDING) { 
    217         PJ_LOG(1,(THIS_FILE,  
    218                   "......error: sendto returned %d", bytes)); 
     205    bytes = bufsize; 
     206    rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &addr,  
     207                           sizeof(addr)); 
     208    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
     209        app_perror("...error: pj_ioqueue_sendto", rc); 
    219210        status=-30; goto on_error; 
    220     } else if (bytes == PJ_EPENDING) { 
     211    } else if (rc == PJ_EPENDING) { 
    221212        send_pending = 1; 
    222213        PJ_LOG(3, (THIS_FILE,  
     
    233224    callback_read_key = callback_write_key =  
    234225        callback_accept_key = callback_connect_key = NULL; 
     226    callback_read_op = callback_write_op = NULL; 
    235227 
    236228    // Poll if pending. 
    237     while (send_pending && recv_pending) { 
     229    while (send_pending || recv_pending) { 
    238230        int rc; 
    239231        pj_time_val timeout = { 5, 0 }; 
     
    254246                status=-61; goto on_error; 
    255247            } 
    256  
    257248            if (callback_read_key != skey) { 
    258249                status=-65; goto on_error; 
     250            } 
     251            if (callback_read_op != &read_op) { 
     252                status=-66; goto on_error; 
    259253            } 
    260254 
     
    271265                status=-73; goto on_error; 
    272266            } 
    273  
    274267            if (callback_write_key != ckey) { 
    275268                status=-75; goto on_error; 
     269            } 
     270            if (callback_write_op != &write_op) { 
     271                status=-76; goto on_error; 
    276272            } 
    277273 
     
    327323     
    328324    /* Create IOQueue */ 
    329     rc = pj_ioqueue_create(pool, MAX, 
    330                            PJ_IOQUEUE_DEFAULT_THREADS, 
    331                            &ioqueue); 
     325    rc = pj_ioqueue_create(pool, MAX, &ioqueue); 
    332326    if (rc != PJ_SUCCESS || ioqueue == NULL) { 
    333327        app_perror("...error in pj_ioqueue_create", rc); 
     
    359353 
    360354    for (i=0; i<count; ++i) { 
    361         rc = pj_ioqueue_unregister(ioqueue, key[i]); 
     355        rc = pj_ioqueue_unregister(key[i]); 
    362356        if (rc != PJ_SUCCESS) { 
    363357            app_perror("...error in pj_ioqueue_unregister", rc); 
     
    394388    pj_pool_t *pool = NULL; 
    395389    pj_sock_t *inactive_sock=NULL; 
     390    pj_ioqueue_op_key_t *inactive_read_op; 
    396391    char *send_buf, *recv_buf; 
    397392    pj_ioqueue_t *ioque = NULL; 
     
    430425 
    431426    // Create I/O Queue. 
    432     rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,  
    433                            PJ_IOQUEUE_DEFAULT_THREADS, &ioque); 
     427    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 
    434428    if (rc != PJ_SUCCESS) { 
    435429        app_perror("...error: pj_ioqueue_create()", rc); 
     
    441435    inactive_sock = (pj_sock_t*)pj_pool_alloc(pool,  
    442436                                    inactive_sock_count*sizeof(pj_sock_t)); 
     437    inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool, 
     438                              inactive_sock_count*sizeof(pj_ioqueue_op_key_t)); 
    443439    memset(&addr, 0, sizeof(addr)); 
    444440    addr.sin_family = PJ_AF_INET; 
    445441    for (i=0; i<inactive_sock_count; ++i) { 
     442        pj_ssize_t bytes; 
     443 
    446444        rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]); 
    447445        if (rc != PJ_SUCCESS || inactive_sock[i] < 0) { 
     
    464462            goto on_error; 
    465463        } 
    466         rc = pj_ioqueue_read(ioque, key, recv_buf, bufsize); 
     464        bytes = bufsize; 
     465        rc = pj_ioqueue_recv(key, &inactive_read_op[i], recv_buf, &bytes, 0); 
    467466        if ( rc < 0 && rc != PJ_EPENDING) { 
    468467            pj_sock_close(inactive_sock[i]); 
     
    497496    for (i=0; i<LOOP; ++i) { 
    498497        pj_ssize_t bytes; 
     498        pj_ioqueue_op_key_t read_op, write_op; 
    499499 
    500500        // Randomize send buffer. 
     
    502502 
    503503        // Start reading on the server side. 
    504         rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize); 
     504        bytes = bufsize; 
     505        rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0); 
    505506        if (rc < 0 && rc != PJ_EPENDING) { 
    506507            app_perror("...error: pj_ioqueue_read()", rc); 
     
    509510 
    510511        // Starts send on the client side. 
    511         bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, 
    512                                         &addr, sizeof(addr)); 
    513         if (bytes != bufsize && bytes != PJ_EPENDING) { 
     512        bytes = bufsize; 
     513        rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, 
     514                               &addr, sizeof(addr)); 
     515        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
    514516            app_perror("...error: pj_ioqueue_write()", bytes); 
    515517            rc = -1; 
     
    600602    int bufsize, sock_count; 
    601603 
    602     PJ_LOG(3, (THIS_FILE, "...format test")); 
    603     if ((status = native_format_test()) != 0) 
    604         return status; 
    605     PJ_LOG(3, (THIS_FILE, "....native format test ok")); 
    606  
    607604    PJ_LOG(3, (THIS_FILE, "...compliance test")); 
    608605    if ((status=compliance_test()) != 0) { 
  • pjproject/main/pjlib/src/pjlib-test/main.c

    r6 r11  
    1212 
    1313 
    14 #if defined(PJ_WIN32) && PJ_WIN32!=0 
     14//#if defined(PJ_WIN32) && PJ_WIN32!=0 
     15#if 0 
    1516#include <windows.h> 
    1617static void boost(void) 
  • pjproject/main/pjlib/src/pjlib-test/test.h

    r6 r11  
    99#define GROUP_OS                    0 
    1010#define GROUP_DATA_STRUCTURE        0 
    11 #define GROUP_NETWORK               0 
     11#define GROUP_NETWORK               1 
    1212#define GROUP_EXTRA                 0 
    1313 
     
    3535#define INCLUDE_XML_TEST            GROUP_EXTRA 
    3636 
     37#define INCLUDE_ECHO_SERVER         0 
     38#define INCLUDE_ECHO_CLIENT         0 
    3739 
    38 #define INCLUDE_ECHO_SERVER         0 
    39 #define INCLUDE_ECHO_CLIENT         1 
    4040 
    4141#define ECHO_SERVER_MAX_THREADS     4 
     
    7474extern int echo_client(int sock_type, const char *server, int port); 
    7575 
     76extern int echo_srv_sync(void); 
     77extern int udp_echo_srv_ioqueue(void); 
     78extern int echo_srv_common_loop(pj_atomic_t *bytes_counter); 
     79 
    7680extern pj_pool_factory *mem; 
    7781 
  • pjproject/main/pjlib/src/pjlib-test/udp_echo_srv_sync.c

    r10 r11  
    99{ 
    1010    pj_sock_t    sock = (pj_sock_t)arg; 
    11     char         buf[1516]; 
     11    char         buf[512]; 
    1212    pj_status_t  last_recv_err = PJ_SUCCESS, last_write_err = PJ_SUCCESS; 
    1313 
     
    4949    pj_thread_t *thread[ECHO_SERVER_MAX_THREADS]; 
    5050    pj_status_t rc; 
    51     pj_highprec_t last_received, avg_bw, highest_bw; 
    52     pj_time_val last_print; 
    53     unsigned count; 
    5451    int i; 
    5552 
     
    8481    PJ_LOG(3,("", "...Press Ctrl-C to abort")); 
    8582 
     83    echo_srv_common_loop(total_bytes); 
     84    return 0; 
     85} 
     86 
     87 
     88int echo_srv_common_loop(pj_atomic_t *bytes_counter) 
     89{ 
     90    pj_highprec_t last_received, avg_bw, highest_bw; 
     91    pj_time_val last_print; 
     92    unsigned count; 
     93 
    8694    last_received = 0; 
    8795    pj_gettimeofday(&last_print); 
     
    96104        pj_thread_sleep(1000); 
    97105 
    98         received = cur_received = pj_atomic_get(total_bytes); 
     106        received = cur_received = pj_atomic_get(bytes_counter); 
    99107        cur_received = cur_received - last_received; 
    100108 
Note: See TracChangeset for help on using the changeset viewer.