Index: /pjproject/main/pjlib/build/pjlib.dsp =================================================================== --- /pjproject/main/pjlib/build/pjlib.dsp (revision 10) +++ /pjproject/main/pjlib/build/pjlib.dsp (revision 11) @@ -392,4 +392,8 @@ # Begin Source File +SOURCE=..\include\pj\compat\os_sunos.h +# End Source File +# Begin Source File + SOURCE=..\include\pj\compat\os_win32.h # End Source File Index: /pjproject/main/pjlib/build/pjlib_test.dsp =================================================================== --- /pjproject/main/pjlib/build/pjlib_test.dsp (revision 10) +++ /pjproject/main/pjlib/build/pjlib_test.dsp (revision 11) @@ -197,4 +197,8 @@ # Begin Source File +SOURCE="..\src\pjlib-test\udp_echo_srv_ioqueue.c" +# End Source File +# Begin Source File + SOURCE="..\src\pjlib-test\udp_echo_srv_sync.c" # End Source File Index: /pjproject/main/pjlib/include/pj/compat/os_linux.h =================================================================== --- /pjproject/main/pjlib/include/pj/compat/os_linux.h (revision 10) +++ /pjproject/main/pjlib/include/pj/compat/os_linux.h (revision 11) @@ -57,4 +57,19 @@ #define PJ_SOCK_HAS_INET_ATON 1 +/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return + * the status of non-blocking connect() operation. + */ +#define PJ_HAS_SO_ERROR 1 + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket recv() can not return immediate daata. + */ +#define PJ_BLOCKING_ERROR_VAL EAGAIN + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket connect() can not get connected immediately. + */ +#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS + /* Default threading is enabled, unless it's overridden. */ #ifndef PJ_HAS_THREADS Index: /pjproject/main/pjlib/include/pj/compat/os_linux_kernel.h =================================================================== --- /pjproject/main/pjlib/include/pj/compat/os_linux_kernel.h (revision 10) +++ /pjproject/main/pjlib/include/pj/compat/os_linux_kernel.h (revision 11) @@ -54,4 +54,19 @@ #define PJ_SOCK_HAS_INET_ATON 0 +/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return + * the status of non-blocking connect() operation. + */ +#define PJ_HAS_SO_ERROR 1 + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket recv() can not return immediate daata. + */ +#define PJ_BLOCKING_ERROR_VAL EAGAIN + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket connect() can not get connected immediately. + */ +#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS + #ifndef PJ_HAS_THREADS # define PJ_HAS_THREADS (1) Index: /pjproject/main/pjlib/include/pj/compat/os_palmos.h =================================================================== --- /pjproject/main/pjlib/include/pj/compat/os_palmos.h (revision 10) +++ /pjproject/main/pjlib/include/pj/compat/os_palmos.h (revision 11) @@ -46,4 +46,19 @@ #define PJ_SOCK_HAS_INET_ATON 0 +/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return + * the status of non-blocking connect() operation. + */ +#define PJ_HAS_SO_ERROR 0 + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket recv() can not return immediate daata. + */ +#define PJ_BLOCKING_ERROR_VAL xxx + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket connect() can not get connected immediately. + */ +#define PJ_BLOCKING_CONNECT_ERROR_VAL xxx + /* Default threading is enabled, unless it's overridden. */ #ifndef PJ_HAS_THREADS Index: /pjproject/main/pjlib/include/pj/compat/os_sunos.h =================================================================== --- /pjproject/main/pjlib/include/pj/compat/os_sunos.h (revision 10) +++ /pjproject/main/pjlib/include/pj/compat/os_sunos.h (revision 11) @@ -41,4 +41,19 @@ #define PJ_SOCK_HAS_INET_ATON 0 +/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return + * the status of non-blocking connect() operation. + */ +#define PJ_HAS_SO_ERROR 0 + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket recv() can not return immediate daata. + */ +#define PJ_BLOCKING_ERROR_VAL EWOULDBLOCK + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket connect() can not get connected immediately. + */ +#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS + /* Default threading is enabled, unless it's overridden. */ #ifndef PJ_HAS_THREADS Index: /pjproject/main/pjlib/include/pj/compat/os_win32.h =================================================================== --- /pjproject/main/pjlib/include/pj/compat/os_win32.h (revision 10) +++ /pjproject/main/pjlib/include/pj/compat/os_win32.h (revision 11) @@ -61,4 +61,19 @@ #define PJ_SOCK_HAS_INET_ATON 0 +/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return + * the status of non-blocking connect() operation. + */ +#define PJ_HAS_SO_ERROR 0 + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket recv() or send() can not return immediately. + */ +#define PJ_BLOCKING_ERROR_VAL WSAEWOULDBLOCK + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket connect() can not get connected immediately. + */ +#define PJ_BLOCKING_CONNECT_ERROR_VAL WSAEWOULDBLOCK + /* Default threading is enabled, unless it's overridden. */ #ifndef PJ_HAS_THREADS Index: /pjproject/main/pjlib/include/pj/doxygen.h =================================================================== --- /pjproject/main/pjlib/include/pj/doxygen.h (revision 10) +++ /pjproject/main/pjlib/include/pj/doxygen.h (revision 11) @@ -89,6 +89,9 @@ * PJLIB Page Documentation on navigation pane of your PDF reader. * - * - * + * - How to Submit Code to PJLIB Project + *\n + * Please read \ref pjlib_coding_convention_page before submitting + * your code. Send your code as patch against current Subversion tree + * to the appropriate mailing list. * * @@ -395,4 +398,50 @@ + +/*////////////////////////////////////////////////////////////////////////// */ +/* + CODING CONVENTION + */ + +/** + * @page pjlib_coding_convention_page Coding Convention + * + * Before you submit your code/patches to be included with PJLIB, you must + * make sure that your code is compliant with PJLIB coding convention. + * This is very important! Otherwise we would not accept your code. + * + * @section coding_conv_editor_sec Editor Settings + * + * The single most important thing in the whole coding convention is editor + * settings. It's more important than the correctness of your code (bugs will + * only crash the system, but incorrect tab size is mental!). + * + * Kindly set your editor as follows: + * - tab size to \b 8. + * - indentation to \b 4. + * + * With \c vi, you can do it with: + *
+ * :se ts=8 + * :se sts=4 + *+ * + * You should replace tab with eight spaces. + * + * @section coding_conv_detail_sec Coding Style + * + * Coding style MUST strictly follow K&R style. The rest of coding style + * must follow current style. You SHOULD be able to observe the style + * currently used by PJLIB from PJLIB sources, and apply the style to your + * code. If you're not able to do simple thing like to observe PJLIB + * coding style from the sources, then logic dictates that your ability to + * observe more difficult area in PJLIB such as memory allocation strategy, + * concurrency, etc is questionable. + * + * @section coding_conv_comment_sec Commenting Your Code + * + * Public API (e.g. in header files) MUST have doxygen compliant comments. + * + */ Index: /pjproject/main/pjlib/include/pj/ioqueue.h =================================================================== --- /pjproject/main/pjlib/include/pj/ioqueue.h (revision 10) +++ /pjproject/main/pjlib/include/pj/ioqueue.h (revision 11) @@ -1,4 +1,3 @@ /* $Id$ - * */ @@ -49,15 +48,52 @@ * @{ * - * This file provides abstraction for various event dispatching mechanisms. - * The interfaces for event dispatching vary alot, even in a single - * operating system. The abstraction here hopefully is suitable for most of - * the event dispatching available. - * - * Currently, the I/O Queue supports: - * - select(), as the common denominator, but the least efficient. - * - I/O Completion ports in Windows NT/2000/XP, which is the most efficient - * way to dispatch events in Windows NT based OSes, and most importantly, - * it doesn't have the limit on how many handles to monitor. And it works - * with files (not only sockets) as well. + * I/O Queue provides API for performing asynchronous I/O operations. It + * conforms to proactor pattern, which allows application to submit an + * asynchronous operation and to be notified later when the operation has + * completed. + * + * The framework works natively in platforms where asynchronous operation API + * exists, such as in Windows NT with IoCompletionPort/IOCP. In other + * platforms, the I/O queue abstracts the operating system's event poll API + * to provide semantics similar to IoCompletionPort with minimal penalties + * (i.e. per ioqueue and per handle mutex protection). + * + * The I/O queue provides more than just unified abstraction. It also: + * - makes sure that the operation uses the most effective way to utilize + * the underlying mechanism, to provide the maximum theoritical + * throughput possible on a given platform. + * - choose the most efficient mechanism for event polling on a given + * platform. + * + * Currently, the I/O Queue is implemented using: + * - select(), as the common denominator, but the least + * efficient. Also the number of descriptor is limited to + * \c PJ_IOQUEUE_MAX_HANDLES (which by default is 64). + * - /dev/epoll on Linux (user mode and kernel mode), + * a much faster replacement for select() on Linux (and more importantly + * doesn't have limitation on number of descriptors). + * - I/O Completion ports on Windows NT/2000/XP, which is the most + * efficient way to dispatch events in Windows NT based OSes, and most + * importantly, it doesn't have the limit on how many handles to monitor. + * And it works with files (not only sockets) as well. + * + * + * \section pj_ioqueue_concurrency_sec Concurrency Rules + * + * The items below describe rules that must be obeyed when using the I/O + * queue, with regard to concurrency: + * - in general, the I/O queue is thread safe (assuming the lock strategy + * is not changed to disable mutex protection). All operations, except + * unregistration which is described below, can be safely invoked + * simultaneously by multiple threads. + * - however, care must be taken when unregistering a key from the + * ioqueue. Application must take care that when one thread is issuing + * an unregistration, other thread is not simultaneously invoking an + * operation to the same key. + *\n + * This happens because the ioqueue functions are working with a pointer + * to the key, and there is a possible race condition where the pointer + * has been rendered invalid by other threads before the ioqueue has a + * chance to acquire mutex on it. * * \section pj_ioqeuue_examples_sec Examples @@ -70,18 +106,44 @@ */ - /** - * This structure describes the callbacks to be called when I/O operation - * completes. - */ + + +/** + * This structure describes operation specific key to be submitted to + * I/O Queue when performing the asynchronous operation. This key will + * be returned to the application when completion callback is called. + * + * Application normally wants to attach it's specific data in the + * \c user_data field so that it can keep track of which operation has + * completed when the callback is called. Alternatively, application can + * also extend this struct to include its data, because the pointer that + * is returned in the completion callback will be exactly the same as + * the pointer supplied when the asynchronous function is called. + */ +typedef struct pj_ioqueue_op_key_t +{ + void *internal__[32]; /**< Internal I/O Queue data. */ + void *user_data; /**< Application data. */ +} pj_ioqueue_op_key_t; + +/** + * This structure describes the callbacks to be called when I/O operation + * completes. + */ typedef struct pj_ioqueue_callback { /** - * This callback is called when #pj_ioqueue_read or #pj_ioqueue_recvfrom + * This callback is called when #pj_ioqueue_recv or #pj_ioqueue_recvfrom * completes. * * @param key The key. - * @param bytes_read The size of data that has just been read. + * @param op_key Operation key. + * @param bytes_read >= 0 to indicate the amount of data read, + * otherwise negative value containing the error + * code. To obtain the pj_status_t error code, use + * (pj_status_t code = -bytes_read). */ - void (*on_read_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_read); + void (*on_read_complete)(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read); /** @@ -90,7 +152,13 @@ * * @param key The key. - * @param bytes_read The size of data that has just been read. + * @param op_key Operation key. + * @param bytes_sent >= 0 to indicate the amount of data written, + * otherwise negative value containing the error + * code. To obtain the pj_status_t error code, use + * (pj_status_t code = -bytes_sent). */ - void (*on_write_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent); + void (*on_write_complete)(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_sent); /** @@ -98,9 +166,12 @@ * * @param key The key. + * @param op_key Operation key. * @param sock Newly connected socket. * @param status Zero if the operation completes successfully. */ - void (*on_accept_complete)(pj_ioqueue_key_t *key, pj_sock_t sock, - int status); + void (*on_accept_complete)(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t sock, + pj_status_t status); /** @@ -108,12 +179,14 @@ * * @param key The key. - * @param status Zero if the operation completes successfully. + * @param status PJ_SUCCESS if the operation completes successfully. */ - void (*on_connect_complete)(pj_ioqueue_key_t *key, int status); + void (*on_connect_complete)(pj_ioqueue_key_t *key, + pj_status_t status); } pj_ioqueue_callback; /** - * Types of I/O Queue operation. + * Types of pending I/O Queue operation. This enumeration is only used + * internally within the ioqueue. */ typedef enum pj_ioqueue_operation_e @@ -139,5 +212,4 @@ #define PJ_IOQUEUE_DEFAULT_THREADS 0 - /** * Create a new I/O Queue framework. @@ -146,8 +218,4 @@ * @param max_fd The maximum number of handles to be supported, which * should not exceed PJ_IOQUEUE_MAX_HANDLES. - * @param max_threads The maximum number of threads that are allowed to - * operate on a single descriptor simultaneously. If - * the value is zero, the framework will set it - * to a reasonable value. * @param ioqueue Pointer to hold the newly created I/O Queue. * @@ -156,5 +224,4 @@ PJ_DECL(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, pj_ioqueue_t **ioqueue); @@ -201,5 +268,7 @@ * retrieved later. * @param cb Callback to be called when I/O operation completes. - * @param key Pointer to receive the returned key. + * @param key Pointer to receive the key to be associated with this + * socket. Subsequent I/O queue operation will need this + * key. * * @return PJ_SUCCESS on success, or the error code. @@ -210,46 +279,63 @@ void *user_data, const pj_ioqueue_callback *cb, - pj_ioqueue_key_t **key); - -/** - * Unregister a handle from the I/O Queue framework. - * - * @param ioque The I/O Queue. - * @param key The key that uniquely identifies the handle, which is - * returned from the function #pj_ioqueue_register_sock() - * or other registration functions. + pj_ioqueue_key_t **key ); + +/** + * Unregister from the I/O Queue framework. Caller must make sure that + * the key doesn't have any pending operation before calling this function, + * or otherwise the behaviour is undefined (either callback will be called + * later when the data is sent/received, or the callback will not be called, + * or even something else). + * + * @param key The key that was previously obtained from registration. * * @return PJ_SUCCESS on success or the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key ); - - -/** - * Get user data associated with the I/O Queue key. - * - * @param key The key previously associated with the socket/handle with - * #pj_ioqueue_register_sock() (or other registration - * functions). - * - * @return The user data associated with the key, or NULL on error - * of if no data is associated with the key during +PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ); + + +/** + * Get user data associated with an ioqueue key. + * + * @param key The key that was previously obtained from registration. + * + * @return The user data associated with the descriptor, or NULL + * on error or if no data is associated with the key during * registration. */ PJ_DECL(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ); +/** + * Set or change the user data to be associated with the file descriptor or + * handle or socket descriptor. + * + * @param key The key that was previously obtained from registration. + * @param user_data User data to be associated with the descriptor. + * @param old_data Optional parameter to retrieve the old user data. + * + * @return PJ_SUCCESS on success or the error code. + */ +PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data); + #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 /** - * Instruct I/O Queue to wait for incoming connections on the specified - * listening socket. This function will return - * immediately (i.e. non-blocking) regardless whether some data has been - * transfered. If the function can't complete immediately, and the caller will - * be notified about the completion when it calls pj_ioqueue_poll(). - * - * @param ioqueue The I/O Queue + * Instruct I/O Queue to accept incoming connection on the specified + * listening socket. This function will return immediately (i.e. non-blocking) + * regardless whether a connection is immediately available. If the function + * can't complete immediately, the caller will be notified about the incoming + * connection when it calls pj_ioqueue_poll(). If a new connection is + * immediately available, the function returns PJ_SUCCESS with the new + * connection; in this case, the callback WILL NOT be called. + * * @param key The key which registered to the server socket. - * @param sock Argument which contain pointer to receive - * the socket for the incoming connection. + * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is + * called. + * @param new_sock Argument which contain pointer to receive the new socket + * for the incoming connection. * @param local Optional argument which contain pointer to variable to * receive local address. @@ -260,12 +346,14 @@ * address. This argument is optional. * @return - * - PJ_SUCCESS If there's a connection available immediately, which - * in this case the callback should have been called before - * the function returns. - * - PJ_EPENDING If accept is queued, or + * - PJ_SUCCESS When connection is available immediately, and the + * parameters will be updated to contain information about + * the new connection. In this case, a completion callback + * WILL NOT be called. + * - PJ_EPENDING If no connection is available immediately. When a new + * connection arrives, the callback will be called. * - non-zero which indicates the appropriate error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, pj_sock_t *sock, pj_sockaddr_t *local, @@ -275,7 +363,9 @@ /** * Initiate non-blocking socket connect. If the socket can NOT be connected - * immediately, the result will be reported during poll. - * - * @param ioqueue The ioqueue + * immediately, asynchronous connect() will be scheduled and caller will be + * notified via completion callback when it calls pj_ioqueue_poll(). If + * socket is connected immediately, the function returns PJ_SUCCESS and + * completion callback WILL NOT be called. + * * @param key The key associated with TCP socket * @param addr The remote address. @@ -283,11 +373,10 @@ * * @return - * - PJ_SUCCESS If socket is connected immediately, which in this case - * the callback should have been called. + * - PJ_SUCCESS If socket is connected immediately. In this case, the + * completion callback WILL NOT be called. * - PJ_EPENDING If operation is queued, or * - non-zero Indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, const pj_sockaddr_t *addr, int addrlen ); @@ -310,16 +399,73 @@ const pj_time_val *timeout); + /** * Instruct the I/O Queue to read from the specified handle. This function * returns immediately (i.e. non-blocking) regardless whether some data has * been transfered. If the operation can't complete immediately, caller will - * be notified about the completion when it calls pj_ioqueue_poll(). - * - * @param ioque The I/O Queue. + * be notified about the completion when it calls pj_ioqueue_poll(). If data + * is immediately available, the function will return PJ_SUCCESS and the + * callback WILL NOT be called. + * * @param key The key that uniquely identifies the handle. + * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is + * called. Caller must make sure that this key remains + * valid until the function completes. * @param buffer The buffer to hold the read data. The caller MUST make sure * that this buffer remain valid until the framework completes * reading the handle. - * @param buflen The maximum size to be read. + * @param length On input, it specifies the size of the buffer. If data is + * available to be read immediately, the function returns + * PJ_SUCCESS and this argument will be filled with the + * amount of data read. If the function is pending, caller + * will be notified about the amount of data read in the + * callback. This parameter can point to local variable in + * caller's stack and doesn't have to remain valid for the + * duration of pending operation. + * @param flags Recv flag. + * + * @return + * - PJ_SUCCESS If immediate data has been received in the buffer. In this + * case, the callback WILL NOT be called. + * - PJ_EPENDING If the operation has been queued, and the callback will be + * called when data has been received. + * - non-zero The return value indicates the error code. + */ +PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags ); + +/** + * This function behaves similarly as #pj_ioqueue_recv(), except that it is + * normally called for socket, and the remote address will also be returned + * along with the data. Caller MUST make sure that both buffer and addr + * remain valid until the framework completes reading the data. + * + * @param key The key that uniquely identifies the handle. + * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is + * called. + * @param buffer The buffer to hold the read data. The caller MUST make sure + * that this buffer remain valid until the framework completes + * reading the handle. + * @param length On input, it specifies the size of the buffer. If data is + * available to be read immediately, the function returns + * PJ_SUCCESS and this argument will be filled with the + * amount of data read. If the function is pending, caller + * will be notified about the amount of data read in the + * callback. This parameter can point to local variable in + * caller's stack and doesn't have to remain valid for the + * duration of pending operation. + * @param flags Recv flag. + * @param addr Optional Pointer to buffer to receive the address. + * @param addrlen On input, specifies the length of the address buffer. + * On output, it will be filled with the actual length of + * the address. This argument can be NULL if \c addr is not + * specified. * * @return @@ -330,79 +476,75 @@ * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen); - - -/** - * This function behaves similarly as #pj_ioqueue_read(), except that it is - * normally called for socket. - * - * @param ioque The I/O Queue. - * @param key The key that uniquely identifies the handle. - * @param buffer The buffer to hold the read data. The caller MUST make sure - * that this buffer remain valid until the framework completes - * reading the handle. - * @param buflen The maximum size to be read. - * @param flags Recv flag. - * - * @return - * - PJ_SUCCESS If immediate data has been received. In this case, the - * callback must have been called before this function - * returns, and no pending operation is scheduled. - * - PJ_EPENDING If the operation has been queued. - * - non-zero The return value indicates the error code. - */ -PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen, - unsigned flags ); - -/** - * This function behaves similarly as #pj_ioqueue_read(), except that it is - * normally called for socket, and the remote address will also be returned - * along with the data. Caller MUST make sure that both buffer and addr - * remain valid until the framework completes reading the data. - * - * @param ioque The I/O Queue. - * @param key The key that uniquely identifies the handle. - * @param buffer The buffer to hold the read data. The caller MUST make sure - * that this buffer remain valid until the framework completes - * reading the handle. - * @param buflen The maximum size to be read. - * @param flags Recv flag. - * @param addr Pointer to buffer to receive the address, or NULL. - * @param addrlen On input, specifies the length of the address buffer. - * On output, it will be filled with the actual length of - * the address. - * - * @return - * - PJ_SUCCESS If immediate data has been received. In this case, the - * callback must have been called before this function - * returns, and no pending operation is scheduled. - * - PJ_EPENDING If the operation has been queued. - * - non-zero The return value indicates the error code. - */ -PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags, pj_sockaddr_t *addr, int *addrlen); + /** * Instruct the I/O Queue to write to the handle. This function will return * immediately (i.e. non-blocking) regardless whether some data has been - * transfered. If the function can't complete immediately, and the caller will - * be notified about the completion when it calls pj_ioqueue_poll(). - * - * @param ioque the I/O Queue. + * transfered. If the function can't complete immediately, the caller will + * be notified about the completion when it calls pj_ioqueue_poll(). If + * operation completes immediately and data has been transfered, the function + * returns PJ_SUCCESS and the callback will NOT be called. + * + * @param key The key that identifies the handle. + * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is + * called. + * @param data The data to send. Caller MUST make sure that this buffer + * remains valid until the write operation completes. + * @param length On input, it specifies the length of data to send. When + * data was sent immediately, this function returns PJ_SUCCESS + * and this parameter contains the length of data sent. If + * data can not be sent immediately, an asynchronous operation + * is scheduled and caller will be notified via callback the + * number of bytes sent. This parameter can point to local + * variable on caller's stack and doesn't have to remain + * valid until the operation has completed. + * @param flags Send flags. + * + * @return + * - PJ_SUCCESS If data was immediately transfered. In this case, no + * pending operation has been scheduled and the callback + * WILL NOT be called. + * - PJ_EPENDING If the operation has been queued. Once data base been + * transfered, the callback will be called. + * - non-zero The return value indicates the error code. + */ +PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags ); + + +/** + * This function behaves similarly as #pj_ioqueue_write(), except that + * pj_sock_sendto() (or equivalent) will be called to send the data. + * * @param key the key that identifies the handle. + * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is + * called. * @param data the data to send. Caller MUST make sure that this buffer * remains valid until the write operation completes. - * @param datalen the length of the data. + * @param length On input, it specifies the length of data to send. When + * data was sent immediately, this function returns PJ_SUCCESS + * and this parameter contains the length of data sent. If + * data can not be sent immediately, an asynchronous operation + * is scheduled and caller will be notified via callback the + * number of bytes sent. This parameter can point to local + * variable on caller's stack and doesn't have to remain + * valid until the operation has completed. + * @param flags send flags. + * @param addr Optional remote address. + * @param addrlen Remote address length, \c addr is specified. * * @return @@ -411,54 +553,8 @@ * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen); - -/** - * This function behaves similarly as #pj_ioqueue_write(), except that - * pj_sock_send() (or equivalent) will be called to send the data. - * - * @param ioque the I/O Queue. - * @param key the key that identifies the handle. - * @param data the data to send. Caller MUST make sure that this buffer - * remains valid until the write operation completes. - * @param datalen the length of the data. - * @param flags send flags. - * - * @return - * - PJ_SUCCESS If data was immediately written. - * - PJ_EPENDING If the operation has been queued. - * - non-zero The return value indicates the error code. - */ -PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen, - unsigned flags ); - - -/** - * This function behaves similarly as #pj_ioqueue_write(), except that - * pj_sock_sendto() (or equivalent) will be called to send the data. - * - * @param ioque the I/O Queue. - * @param key the key that identifies the handle. - * @param data the data to send. Caller MUST make sure that this buffer - * remains valid until the write operation completes. - * @param datalen the length of the data. - * @param flags send flags. - * @param addr remote address. - * @param addrlen remote address length. - * - * @return - * - PJ_SUCCESS If data was immediately written. - * - PJ_EPENDING If the operation has been queued. - * - non-zero The return value indicates the error code. - */ -PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags, const pj_sockaddr_t *addr, Index: /pjproject/main/pjlib/include/pj/list.h =================================================================== --- /pjproject/main/pjlib/include/pj/list.h (revision 10) +++ /pjproject/main/pjlib/include/pj/list.h (revision 11) @@ -47,5 +47,5 @@ */ #define PJ_DECL_LIST_MEMBER(type) type *prev; /** List @a prev. */ \ - type *next; /** List @a next. */ + type *next /** List @a next. */ @@ -57,5 +57,5 @@ struct pj_list { - PJ_DECL_LIST_MEMBER(void) + PJ_DECL_LIST_MEMBER(void); }; Index: /pjproject/main/pjlib/include/pj/pool.h =================================================================== --- /pjproject/main/pjlib/include/pj/pool.h (revision 10) +++ /pjproject/main/pjlib/include/pj/pool.h (revision 11) @@ -114,5 +114,5 @@ typedef struct pj_pool_block { - PJ_DECL_LIST_MEMBER(struct pj_pool_block) /**< List's prev and next. */ + PJ_DECL_LIST_MEMBER(struct pj_pool_block); /**< List's prev and next. */ unsigned char *buf; /**< Start of buffer. */ unsigned char *cur; /**< Current alloc ptr. */ @@ -127,5 +127,5 @@ struct pj_pool_t { - PJ_DECL_LIST_MEMBER(struct pj_pool_t) + PJ_DECL_LIST_MEMBER(struct pj_pool_t); /** Pool name */ Index: /pjproject/main/pjlib/include/pj/sock_select.h =================================================================== --- /pjproject/main/pjlib/include/pj/sock_select.h (revision 10) +++ /pjproject/main/pjlib/include/pj/sock_select.h (revision 11) @@ -98,14 +98,4 @@ /** - * Get the number of descriptors in the set. - * - * @param fdsetp The descriptor set. - * - * @return Number of descriptors in the set. - */ -PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); - - -/** * This function wait for a number of file descriptors to change status. * The behaviour is the same as select() function call which appear in Index: /pjproject/main/pjlib/include/pj/xml.h =================================================================== --- /pjproject/main/pjlib/include/pj/xml.h (revision 10) +++ /pjproject/main/pjlib/include/pj/xml.h (revision 11) @@ -31,5 +31,5 @@ struct pj_xml_attr { - PJ_DECL_LIST_MEMBER(pj_xml_attr) + PJ_DECL_LIST_MEMBER(pj_xml_attr); pj_str_t name; /**< Attribute name. */ pj_str_t value; /**< Attribute value. */ @@ -40,5 +40,5 @@ typedef struct pj_xml_node_head { - PJ_DECL_LIST_MEMBER(pj_xml_node) + PJ_DECL_LIST_MEMBER(pj_xml_node); } pj_xml_node_head; @@ -46,5 +46,5 @@ struct pj_xml_node { - PJ_DECL_LIST_MEMBER(pj_xml_node) /** List @a prev and @a next member */ + PJ_DECL_LIST_MEMBER(pj_xml_node); /** List @a prev and @a next member */ pj_str_t name; /** Node name. */ pj_xml_attr attr_head; /** Attribute list. */ Index: /pjproject/main/pjlib/src/pj/config.c =================================================================== --- /pjproject/main/pjlib/src/pj/config.c (revision 10) +++ /pjproject/main/pjlib/src/pj/config.c (revision 11) @@ -5,5 +5,5 @@ static const char *id = "config.c"; -const char *PJ_VERSION = "0.3.0-pre1"; +const char *PJ_VERSION = "0.3.0-pre4"; PJ_DEF(void) pj_dump_config(void) Index: /pjproject/main/pjlib/src/pj/ioqueue_select.c =================================================================== --- /pjproject/main/pjlib/src/pj/ioqueue_select.c (revision 10) +++ /pjproject/main/pjlib/src/pj/ioqueue_select.c (revision 11) @@ -35,19 +35,29 @@ #define THIS_FILE "ioq_select" -#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \ - (op & PJ_IOQUEUE_OP_RECV) || \ - (op & PJ_IOQUEUE_OP_RECV_FROM)) -#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \ - (op & PJ_IOQUEUE_OP_SEND) || \ - (op & PJ_IOQUEUE_OP_SEND_TO)) - - -#if PJ_HAS_TCP -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT) -# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT) -#else -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0 -# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0 -#endif +/* + * The select ioqueue relies on socket functions (pj_sock_xxx()) to return + * the correct error code. + */ +#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) +# error "Error reporting must be enabled for this function to work!" +#endif + +/** + * Get the number of descriptors in the set. This is defined in sock_select.c + * This function will only return the number of sockets set from PJ_FD_SET + * operation. When the set is modified by other means (such as by select()), + * the count will not be reflected here. + * + * That's why don't export this function in the header file, to avoid + * misunderstanding. + * + * @param fdsetp The descriptor set. + * + * @return Number of descriptors in the set. + */ +PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); + + + /* @@ -61,4 +71,58 @@ #endif +struct generic_operation +{ + PJ_DECL_LIST_MEMBER(struct generic_operation); + pj_ioqueue_operation_e op; +}; + +struct read_operation +{ + PJ_DECL_LIST_MEMBER(struct read_operation); + pj_ioqueue_operation_e op; + + void *buf; + pj_size_t size; + unsigned flags; + pj_sockaddr_t *rmt_addr; + int *rmt_addrlen; +}; + +struct write_operation +{ + PJ_DECL_LIST_MEMBER(struct write_operation); + pj_ioqueue_operation_e op; + + char *buf; + pj_size_t size; + pj_ssize_t written; + unsigned flags; + pj_sockaddr_in rmt_addr; + int rmt_addrlen; +}; + +#if PJ_HAS_TCP +struct accept_operation +{ + PJ_DECL_LIST_MEMBER(struct accept_operation); + pj_ioqueue_operation_e op; + + pj_sock_t *accept_fd; + pj_sockaddr_t *local_addr; + pj_sockaddr_t *rmt_addr; + int *addrlen; +}; +#endif + +union operation_key +{ + struct generic_operation generic; + struct read_operation read; + struct write_operation write; +#if PJ_HAS_TCP + struct accept_operation accept; +#endif +}; + /* * This describes each key. @@ -66,23 +130,15 @@ struct pj_ioqueue_key_t { - PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) + PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); + pj_ioqueue_t *ioqueue; pj_sock_t fd; - pj_ioqueue_operation_e op; void *user_data; pj_ioqueue_callback cb; - - void *rd_buf; - unsigned rd_flags; - pj_size_t rd_buflen; - void *wr_buf; - pj_size_t wr_buflen; - - pj_sockaddr_t *rmt_addr; - int *rmt_addrlen; - - pj_sockaddr_t *local_addr; - int *local_addrlen; - - pj_sock_t *accept_fd; + int connecting; + struct read_operation read_list; + struct write_operation write_list; +#if PJ_HAS_TCP + struct accept_operation accept_list; +#endif }; @@ -95,5 +151,5 @@ pj_bool_t auto_delete_lock; unsigned max, count; - pj_ioqueue_key_t hlist; + pj_ioqueue_key_t key_list; pj_fd_set_t rfdset; pj_fd_set_t wfdset; @@ -110,36 +166,37 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioque; + pj_ioqueue_t *ioqueue; pj_status_t rc; - PJ_UNUSED_ARG(max_threads); - - if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { - pj_assert(!"max_fd too large"); - return PJ_EINVAL; - } - - ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); - ioque->max = max_fd; - ioque->count = 0; - PJ_FD_ZERO(&ioque->rfdset); - PJ_FD_ZERO(&ioque->wfdset); + /* Check that arguments are valid. */ + PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && + max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, + PJ_EINVAL); + + /* Check that size of pj_ioqueue_op_key_t is sufficient */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + ioqueue->max = max_fd; + ioqueue->count = 0; + PJ_FD_ZERO(&ioqueue->rfdset); + PJ_FD_ZERO(&ioqueue->wfdset); #if PJ_HAS_TCP - PJ_FD_ZERO(&ioque->xfdset); -#endif - pj_list_init(&ioque->hlist); - - rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock); + PJ_FD_ZERO(&ioqueue->xfdset); +#endif + pj_list_init(&ioqueue->key_list); + + rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock); if (rc != PJ_SUCCESS) return rc; - ioque->auto_delete_lock = PJ_TRUE; - - PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque)); - - *p_ioqueue = ioque; + ioqueue->auto_delete_lock = PJ_TRUE; + + PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); + + *p_ioqueue = ioqueue; return PJ_SUCCESS; } @@ -150,12 +207,14 @@ * Destroy ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque) +PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(ioque, PJ_EINVAL); - - if (ioque->auto_delete_lock) - rc = pj_lock_destroy(ioque->lock); + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + + pj_lock_acquire(ioqueue->lock); + + if (ioqueue->auto_delete_lock) + rc = pj_lock_destroy(ioqueue->lock); return rc; @@ -164,24 +223,4 @@ /* - * pj_ioqueue_set_lock() - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, - pj_lock_t *lock, - pj_bool_t auto_delete ) -{ - PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); - - if (ioque->auto_delete_lock) { - pj_lock_destroy(ioque->lock); - } - - ioque->lock = lock; - ioque->auto_delete_lock = auto_delete; - - return PJ_SUCCESS; -} - - -/* * pj_ioqueue_register_sock() * @@ -189,5 +228,5 @@ */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, - pj_ioqueue_t *ioque, + pj_ioqueue_t *ioqueue, pj_sock_t sock, void *user_data, @@ -199,10 +238,10 @@ pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET && + PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && cb && p_key, PJ_EINVAL); - pj_lock_acquire(ioque->lock); - - if (ioque->count >= ioque->max) { + pj_lock_acquire(ioqueue->lock); + + if (ioqueue->count >= ioqueue->max) { rc = PJ_ETOOMANY; goto on_return; @@ -212,5 +251,5 @@ value = 1; #ifdef PJ_WIN32 - if (ioctlsocket(sock, FIONBIO, (unsigned long*)&value)) { + if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) { #else if (ioctl(sock, FIONBIO, &value)) { @@ -222,6 +261,12 @@ /* Create key. */ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); + key->ioqueue = ioqueue; key->fd = sock; key->user_data = user_data; + pj_list_init(&key->read_list); + pj_list_init(&key->write_list); +#if PJ_HAS_TCP + pj_list_init(&key->accept_list); +#endif /* Save callback. */ @@ -229,10 +274,11 @@ /* Register */ - pj_list_insert_before(&ioque->hlist, key); - ++ioque->count; + pj_list_insert_before(&ioqueue->key_list, key); + ++ioqueue->count; on_return: + /* On error, socket may be left in non-blocking mode. */ *p_key = key; - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return rc; @@ -244,21 +290,24 @@ * Unregister handle from ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key) -{ - PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); - - pj_lock_acquire(ioque->lock); - - pj_assert(ioque->count > 0); - --ioque->count; +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) +{ + pj_ioqueue_t *ioqueue; + + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + ioqueue = key->ioqueue; + + pj_lock_acquire(ioqueue->lock); + + pj_assert(ioqueue->count > 0); + --ioqueue->count; pj_list_erase(key); - PJ_FD_CLR(key->fd, &ioque->rfdset); - PJ_FD_CLR(key->fd, &ioque->wfdset); + PJ_FD_CLR(key->fd, &ioqueue->rfdset); + PJ_FD_CLR(key->fd, &ioqueue->wfdset); #if PJ_HAS_TCP - PJ_FD_CLR(key->fd, &ioque->xfdset); -#endif - - pj_lock_release(ioque->lock); + PJ_FD_CLR(key->fd, &ioqueue->xfdset); +#endif + + pj_lock_release(ioqueue->lock); return PJ_SUCCESS; } @@ -276,9 +325,26 @@ +/* + * pj_ioqueue_set_user_data() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + if (old_data) + *old_data = key->user_data; + key->user_data = user_data; + + return PJ_SUCCESS; +} + + /* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */ #if VALIDATE_FD_SET -static void validate_sets(const pj_ioqueue_t *ioque, +static void validate_sets(const pj_ioqueue_t *ioqueue, const pj_fd_set_t *rfdset, const pj_fd_set_t *wfdset, @@ -287,11 +353,9 @@ pj_ioqueue_key_t *key; - key = ioque->hlist.next; - while (key != &ioque->hlist) { - if ((key->op & PJ_IOQUEUE_OP_READ) - || (key->op & PJ_IOQUEUE_OP_RECV) - || (key->op & PJ_IOQUEUE_OP_RECV_FROM) + key = ioqueue->key_list.next; + while (key != &ioqueue->key_list) { + if (!pj_list_empty(&key->read_list) #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 - || (key->op & PJ_IOQUEUE_OP_ACCEPT) + || !pj_list_empty(&key->accept_list) #endif ) @@ -302,9 +366,7 @@ pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0); } - if ((key->op & PJ_IOQUEUE_OP_WRITE) - || (key->op & PJ_IOQUEUE_OP_SEND) - || (key->op & PJ_IOQUEUE_OP_SEND_TO) + if (!pj_list_empty(&key->write_list) #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 - || (key->op & PJ_IOQUEUE_OP_CONNECT) + || key->connecting #endif ) @@ -316,5 +378,5 @@ } #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 - if (key->op & PJ_IOQUEUE_OP_CONNECT) + if (key->connecting) { pj_assert(PJ_FD_ISSET(key->fd, xfdset)); @@ -348,5 +410,5 @@ * work on fd_set copy of the ioqueue (not the original one). */ -PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { pj_fd_set_t rfdset, wfdset, xfdset; @@ -354,12 +416,17 @@ pj_ioqueue_key_t *h; + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + /* Lock ioqueue before making fd_set copies */ - pj_lock_acquire(ioque->lock); - - if (PJ_FD_COUNT(&ioque->rfdset)==0 && - PJ_FD_COUNT(&ioque->wfdset)==0 && - PJ_FD_COUNT(&ioque->xfdset)==0) + pj_lock_acquire(ioqueue->lock); + + /* We will only do select() when there are sockets to be polled. + * Otherwise select() will return error. + */ + if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && + PJ_FD_COUNT(&ioqueue->wfdset)==0 && + PJ_FD_COUNT(&ioqueue->xfdset)==0) { - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); if (timeout) pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); @@ -368,8 +435,8 @@ /* Copy ioqueue's pj_fd_set_t to local variables. */ - pj_memcpy(&rfdset, &ioque->rfdset, sizeof(pj_fd_set_t)); - pj_memcpy(&wfdset, &ioque->wfdset, sizeof(pj_fd_set_t)); + pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t)); + pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t)); #if PJ_HAS_TCP - pj_memcpy(&xfdset, &ioque->xfdset, sizeof(pj_fd_set_t)); + pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t)); #else PJ_FD_ZERO(&xfdset); @@ -377,9 +444,9 @@ #if VALIDATE_FD_SET - validate_sets(ioque, &rfdset, &wfdset, &xfdset); + validate_sets(ioqueue, &rfdset, &wfdset, &xfdset); #endif /* Unlock ioqueue before select(). */ - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout); @@ -388,170 +455,36 @@ return count; - /* Lock ioqueue again before scanning for signalled sockets. */ - pj_lock_acquire(ioque->lock); - -#if PJ_HAS_TCP - /* Scan for exception socket */ - h = ioque->hlist.next; -do_except_scan: - for ( ; h!=&ioque->hlist; h = h->next) { - if ((h->op & PJ_IOQUEUE_OP_CONNECT) && PJ_FD_ISSET(h->fd, &xfdset)) - break; - } - if (h != &ioque->hlist) { - /* 'connect()' should be the only operation. */ - pj_assert((h->op == PJ_IOQUEUE_OP_CONNECT)); - - /* Clear operation. */ - h->op &= ~(PJ_IOQUEUE_OP_CONNECT); - PJ_FD_CLR(h->fd, &ioque->wfdset); - PJ_FD_CLR(h->fd, &ioque->xfdset); - PJ_FD_CLR(h->fd, &wfdset); - PJ_FD_CLR(h->fd, &xfdset); - - /* Call callback. */ - if (h->cb.on_connect_complete) - (*h->cb.on_connect_complete)(h, -1); - - /* Re-scan exception list. */ - goto do_except_scan; - } -#endif /* PJ_HAS_TCP */ - - /* Scan for readable socket. */ - h = ioque->hlist.next; -do_readable_scan: - for ( ; h!=&ioque->hlist; h = h->next) { - if ((PJ_IOQUEUE_IS_READ_OP(h->op) || PJ_IOQUEUE_IS_ACCEPT_OP(h->op)) && - PJ_FD_ISSET(h->fd, &rfdset)) + /* Lock ioqueue again before scanning for signalled sockets. + * We must strictly use recursive mutex since application may invoke + * the ioqueue again inside the callback. + */ + pj_lock_acquire(ioqueue->lock); + + /* Scan for writable sockets first to handle piggy-back data + * coming with accept(). + */ + h = ioqueue->key_list.next; +do_writable_scan: + for ( ; h!=&ioqueue->key_list; h = h->next) { + if ( (!pj_list_empty(&h->write_list) || h->connecting) + && PJ_FD_ISSET(h->fd, &wfdset)) { break; } } - if (h != &ioque->hlist) { - pj_status_t rc; - - pj_assert(PJ_IOQUEUE_IS_READ_OP(h->op) || - PJ_IOQUEUE_IS_ACCEPT_OP(h->op)); - -# if PJ_HAS_TCP - if ((h->op & PJ_IOQUEUE_OP_ACCEPT)) { - /* accept() must be the only operation specified on server socket */ - pj_assert(h->op == PJ_IOQUEUE_OP_ACCEPT); - - rc=pj_sock_accept(h->fd, h->accept_fd, h->rmt_addr, h->rmt_addrlen); - if (rc==0 && h->local_addr) { - rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, - h->local_addrlen); - } - - h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); - PJ_FD_CLR(h->fd, &ioque->rfdset); - - /* Call callback. */ - if (h->cb.on_accept_complete) - (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); - - /* Re-scan readable sockets. */ - goto do_readable_scan; - } - else { -# endif - pj_ssize_t bytes_read = h->rd_buflen; - - if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { - rc = pj_sock_recvfrom(h->fd, h->rd_buf, &bytes_read, 0, - h->rmt_addr, h->rmt_addrlen); - } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { - rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); - } else { - /* - * User has specified pj_ioqueue_read(). - * On Win32, we should do ReadFile(). But because we got - * here because of select() anyway, user must have put a - * socket descriptor on h->fd, which in this case we can - * just call pj_sock_recv() instead of ReadFile(). - * On Unix, user may put a file in h->fd, so we'll have - * to call read() here. - * This may not compile on systems which doesn't have - * read(). That's why we only specify PJ_LINUX here so - * that error is easier to catch. - */ -# if defined(PJ_WIN32) && PJ_WIN32 != 0 - rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); -# elif (defined(PJ_LINUX) && PJ_LINUX != 0) || \ - (defined(PJ_SUNOS) && PJ_SUNOS != 0) - bytes_read = read(h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); -# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 - bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; -# else -# error "Implement read() for this platform!" -# endif - } - - if (rc != PJ_SUCCESS) { -# if defined(PJ_WIN32) && PJ_WIN32 != 0 - /* On Win32, for UDP, WSAECONNRESET on the receive side - * indicates that previous sending has triggered ICMP Port - * Unreachable message. - * But we wouldn't know at this point which one of previous - * key that has triggered the error, since UDP socket can - * be shared! - * So we'll just ignore it! - */ - - if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { - PJ_LOG(4,(THIS_FILE, - "Ignored ICMP port unreach. on key=%p", h)); - } -# endif - - /* In any case we would report this to caller. */ - bytes_read = -rc; - } - - h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | - PJ_IOQUEUE_OP_RECV_FROM); - PJ_FD_CLR(h->fd, &ioque->rfdset); - PJ_FD_CLR(h->fd, &rfdset); - - /* Call callback. */ - if (h->cb.on_read_complete) - (*h->cb.on_read_complete)(h, bytes_read); - - /* Re-scan readable sockets. */ - goto do_readable_scan; - - } - } - - /* Scan for writable socket */ - h = ioque->hlist.next; -do_writable_scan: - for ( ; h!=&ioque->hlist; h = h->next) { - if ((PJ_IOQUEUE_IS_WRITE_OP(h->op) || PJ_IOQUEUE_IS_CONNECT_OP(h->op)) - && PJ_FD_ISSET(h->fd, &wfdset)) - { - break; - } - } - if (h != &ioque->hlist) { - pj_assert(PJ_IOQUEUE_IS_WRITE_OP(h->op) || - PJ_IOQUEUE_IS_CONNECT_OP(h->op)); + if (h != &ioqueue->key_list) { + pj_assert(!pj_list_empty(&h->write_list) || h->connecting); #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 - if ((h->op & PJ_IOQUEUE_OP_CONNECT)) { + if (h->connecting) { /* Completion of connect() operation */ pj_ssize_t bytes_transfered; -#if (defined(PJ_LINUX) && PJ_LINUX!=0) || \ - (defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0) +#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) /* from connect(2): - * On Linux, use getsockopt to read the SO_ERROR option at - * level SOL_SOCKET to determine whether connect() completed - * successfully (if SO_ERROR is zero). - */ + * On Linux, use getsockopt to read the SO_ERROR option at + * level SOL_SOCKET to determine whether connect() completed + * successfully (if SO_ERROR is zero). + */ int value; socklen_t vallen = sizeof(value); @@ -590,7 +523,7 @@ /* Clear operation. */ - h->op &= (~PJ_IOQUEUE_OP_CONNECT); - PJ_FD_CLR(h->fd, &ioque->wfdset); - PJ_FD_CLR(h->fd, &ioque->xfdset); + h->connecting = 0; + PJ_FD_CLR(h->fd, &ioqueue->wfdset); + PJ_FD_CLR(h->fd, &ioqueue->xfdset); /* Call callback. */ @@ -604,21 +537,227 @@ #endif /* PJ_HAS_TCP */ { - /* Completion of write(), send(), or sendto() operation. */ - - /* Clear operation. */ - h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | - PJ_IOQUEUE_OP_SEND_TO); - PJ_FD_CLR(h->fd, &ioque->wfdset); + /* Socket is writable. */ + struct write_operation *write_op; + pj_ssize_t sent; + pj_status_t send_rc; + + /* Get the first in the queue. */ + write_op = h->write_list.next; + + /* Send the data. */ + sent = write_op->size - write_op->written; + if (write_op->op == PJ_IOQUEUE_OP_SEND) { + send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, + &sent, write_op->flags); + } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { + send_rc = pj_sock_sendto(h->fd, + write_op->buf+write_op->written, + &sent, write_op->flags, + &write_op->rmt_addr, + write_op->rmt_addrlen); + } else { + pj_assert(!"Invalid operation type!"); + send_rc = PJ_EBUG; + } + + if (send_rc == PJ_SUCCESS) { + write_op->written += sent; + } else { + pj_assert(send_rc > 0); + write_op->written = -send_rc; + } + + /* In any case we don't need to process this descriptor again. */ PJ_FD_CLR(h->fd, &wfdset); - /* Call callback. */ - /* All data must have been sent? */ - if (h->cb.on_write_complete) - (*h->cb.on_write_complete)(h, h->wr_buflen); - + /* Are we finished with this buffer? */ + if (send_rc!=PJ_SUCCESS || + write_op->written == (pj_ssize_t)write_op->size) + { + pj_list_erase(write_op); + + /* Clear operation if there's no more data to send. */ + if (pj_list_empty(&h->write_list)) + PJ_FD_CLR(h->fd, &ioqueue->wfdset); + + /* Call callback. */ + if (h->cb.on_write_complete) { + (*h->cb.on_write_complete)(h, + (pj_ioqueue_op_key_t*)write_op, + write_op->written); + } + } + /* Re-scan writable sockets. */ goto do_writable_scan; } } + + /* Scan for readable socket. */ + h = ioqueue->key_list.next; +do_readable_scan: + for ( ; h!=&ioqueue->key_list; h = h->next) { + if ((!pj_list_empty(&h->read_list) +#if PJ_HAS_TCP + || !pj_list_empty(&h->accept_list) +#endif + ) && PJ_FD_ISSET(h->fd, &rfdset)) + { + break; + } + } + if (h != &ioqueue->key_list) { + pj_status_t rc; + +#if PJ_HAS_TCP + pj_assert(!pj_list_empty(&h->read_list) || + !pj_list_empty(&h->accept_list)); +#else + pj_assert(!pj_list_empty(&h->read_list)); +#endif + +# if PJ_HAS_TCP + if (!pj_list_empty(&h->accept_list)) { + + struct accept_operation *accept_op; + + /* Get one accept operation from the list. */ + accept_op = h->accept_list.next; + pj_list_erase(accept_op); + + rc=pj_sock_accept(h->fd, accept_op->accept_fd, + accept_op->rmt_addr, accept_op->addrlen); + if (rc==PJ_SUCCESS && accept_op->local_addr) { + rc = pj_sock_getsockname(*accept_op->accept_fd, + accept_op->local_addr, + accept_op->addrlen); + } + + /* Clear bit in fdset if there is no more pending accept */ + if (pj_list_empty(&h->accept_list)) + PJ_FD_CLR(h->fd, &ioqueue->rfdset); + + /* Call callback. */ + if (h->cb.on_accept_complete) + (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, + *accept_op->accept_fd, rc); + + /* Re-scan readable sockets. */ + goto do_readable_scan; + } + else { +# endif + struct read_operation *read_op; + pj_ssize_t bytes_read; + + pj_assert(!pj_list_empty(&h->read_list)); + + /* Get one pending read operation from the list. */ + read_op = h->read_list.next; + pj_list_erase(read_op); + + bytes_read = read_op->size; + + if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { + rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, + read_op->rmt_addr, + read_op->rmt_addrlen); + } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); + } else { + pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); + /* + * User has specified pj_ioqueue_read(). + * On Win32, we should do ReadFile(). But because we got + * here because of select() anyway, user must have put a + * socket descriptor on h->fd, which in this case we can + * just call pj_sock_recv() instead of ReadFile(). + * On Unix, user may put a file in h->fd, so we'll have + * to call read() here. + * This may not compile on systems which doesn't have + * read(). That's why we only specify PJ_LINUX here so + * that error is easier to catch. + */ +# if defined(PJ_WIN32) && PJ_WIN32 != 0 + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); + //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, + // &bytes_read, NULL); +# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) + bytes_read = read(h->fd, h->rd_buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); +# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 + bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; +# else +# error "Implement read() for this platform!" +# endif + } + + if (rc != PJ_SUCCESS) { +# if defined(PJ_WIN32) && PJ_WIN32 != 0 + /* On Win32, for UDP, WSAECONNRESET on the receive side + * indicates that previous sending has triggered ICMP Port + * Unreachable message. + * But we wouldn't know at this point which one of previous + * key that has triggered the error, since UDP socket can + * be shared! + * So we'll just ignore it! + */ + + if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { + //PJ_LOG(4,(THIS_FILE, + // "Ignored ICMP port unreach. on key=%p", h)); + } +# endif + + /* In any case we would report this to caller. */ + bytes_read = -rc; + } + + /* Clear fdset if there is no pending read. */ + if (pj_list_empty(&h->read_list)) + PJ_FD_CLR(h->fd, &ioqueue->rfdset); + + /* In any case clear from temporary set. */ + PJ_FD_CLR(h->fd, &rfdset); + + /* Call callback. */ + if (h->cb.on_read_complete) + (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, + bytes_read); + + /* Re-scan readable sockets. */ + goto do_readable_scan; + + } + } + +#if PJ_HAS_TCP + /* Scan for exception socket for TCP connection error. */ + h = ioqueue->key_list.next; +do_except_scan: + for ( ; h!=&ioqueue->key_list; h = h->next) { + if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset)) + break; + } + if (h != &ioqueue->key_list) { + + pj_assert(h->connecting); + + /* Clear operation. */ + h->connecting = 0; + PJ_FD_CLR(h->fd, &ioqueue->wfdset); + PJ_FD_CLR(h->fd, &ioqueue->xfdset); + PJ_FD_CLR(h->fd, &wfdset); + PJ_FD_CLR(h->fd, &xfdset); + + /* Call callback. */ + if (h->cb.on_connect_complete) + (*h->cb.on_connect_complete)(h, -1); + + /* Re-scan exception list. */ + goto do_except_scan; + } +#endif /* PJ_HAS_TCP */ /* Shouldn't happen. */ @@ -629,76 +768,64 @@ //count = 0; - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return count; } /* - * pj_ioqueue_read() - * - * Start asynchronous read from the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); + * pj_ioqueue_recv() + * + * Start asynchronous recv() from the socket. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags ) +{ + pj_status_t status; + pj_ssize_t size; + struct read_operation *read_op; + pj_ioqueue_t *ioqueue; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_READ; - key->rd_flags = 0; - key->rd_buf = buffer; - key->rd_buflen = buflen; - PJ_FD_SET(key->fd, &ioque->rfdset); - - pj_lock_release(ioque->lock); + /* Try to see if there's data immediately available. + */ + size = *length; + status = pj_sock_recv(key->fd, buffer, &size, flags); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); + + read_op = (struct read_operation*)op_key; + + read_op->op = PJ_IOQUEUE_OP_RECV; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + + pj_list_insert_before(&key->read_list, read_op); + PJ_FD_SET(key->fd, &ioqueue->rfdset); + + pj_lock_release(ioqueue->lock); return PJ_EPENDING; } - -/* - * pj_ioqueue_recv() - * - * Start asynchronous recv() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen, - unsigned flags ) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_RECV; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - PJ_FD_SET(key->fd, &ioque->rfdset); - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - /* * pj_ioqueue_recvfrom() @@ -706,167 +833,215 @@ * Start asynchronous recvfrom() from the socket. */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags, pj_sockaddr_t *addr, int *addrlen) { - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); + pj_status_t status; + pj_ssize_t size; + struct read_operation *read_op; + pj_ioqueue_t *ioqueue; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_RECV_FROM; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - key->rmt_addr = addr; - key->rmt_addrlen = addrlen; - PJ_FD_SET(key->fd, &ioque->rfdset); - - pj_lock_release(ioque->lock); + /* Try to see if there's data immediately available. + */ + size = *length; + status = pj_sock_recvfrom(key->fd, buffer, &size, flags, + addr, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); + + read_op = (struct read_operation*)op_key; + + read_op->op = PJ_IOQUEUE_OP_RECV_FROM; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + read_op->rmt_addr = addr; + read_op->rmt_addrlen = addrlen; + + pj_list_insert_before(&key->read_list, read_op); + PJ_FD_SET(key->fd, &ioqueue->rfdset); + + pj_lock_release(ioqueue->lock); return PJ_EPENDING; } /* - * pj_ioqueue_write() + * pj_ioqueue_send() + * + * Start asynchronous send() to the descriptor. + */ +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags) +{ + pj_ioqueue_t *ioqueue; + struct write_operation *write_op; + pj_status_t status; + pj_ssize_t sent; + + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_send(key->fd, data, &sent, flags); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * Schedule asynchronous send. + */ + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); + + write_op = (struct write_operation*)op_key; + write_op->op = PJ_IOQUEUE_OP_SEND; + write_op->buf = NULL; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + + pj_list_insert_before(&key->write_list, write_op); + PJ_FD_SET(key->fd, &ioqueue->wfdset); + + pj_lock_release(ioqueue->lock); + + return PJ_EPENDING; +} + + +/* + * pj_ioqueue_sendto() * * Start asynchronous write() to the descriptor. */ -PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, 0); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_WRITE; - key->wr_buf = NULL; - key->wr_buflen = datalen; - PJ_FD_SET(key->fd, &ioque->wfdset); - - pj_lock_release(ioque->lock); - - return PJ_EPENDING; -} - -/* - * pj_ioqueue_send() - * - * Start asynchronous send() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen, - unsigned flags) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, flags); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND; - key->wr_buf = NULL; - key->wr_buflen = datalen; - PJ_FD_SET(key->fd, &ioque->wfdset); - - pj_lock_release(ioque->lock); - - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_sendto() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags, const pj_sockaddr_t *addr, int addrlen) { - pj_status_t rc; + pj_ioqueue_t *ioqueue; + struct write_operation *write_op; + pj_status_t status; pj_ssize_t sent; - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_sendto() if it returns error. */ - rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND_TO; - key->wr_buf = NULL; - key->wr_buflen = datalen; - PJ_FD_SET(key->fd, &ioque->wfdset); - - pj_lock_release(ioque->lock); + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * Check that address storage can hold the address parameter. + */ + PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG); + + /* + * Schedule asynchronous send. + */ + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); + + write_op = (struct write_operation*)op_key; + write_op->op = PJ_IOQUEUE_OP_SEND_TO; + write_op->buf = NULL; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + pj_memcpy(&write_op->rmt_addr, addr, addrlen); + write_op->rmt_addrlen = addrlen; + + pj_list_insert_before(&key->write_list, write_op); + PJ_FD_SET(key->fd, &ioqueue->wfdset); + + pj_lock_release(ioqueue->lock); + return PJ_EPENDING; } @@ -876,29 +1051,65 @@ * Initiate overlapped accept() operation. */ -PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) -{ +PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) +{ + pj_ioqueue_t *ioqueue; + struct accept_operation *accept_op; + pj_status_t status; + /* check parameters. All must be specified! */ - pj_assert(ioqueue && key && new_sock); - - /* Server socket must have no other operation! */ - pj_assert(key->op == 0); - + PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + + /* Fast track: + * See if there's new connection available immediately. + */ + if (pj_list_empty(&key->accept_list)) { + status = pj_sock_accept(key->fd, new_sock, remote, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! New connection is available! */ + if (local && addrlen) { + status = pj_sock_getsockname(*new_sock, local, addrlen); + if (status != PJ_SUCCESS) { + pj_sock_close(*new_sock); + *new_sock = PJ_INVALID_SOCKET; + return status; + } + } + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * No connection is available immediately. + * Schedule accept() operation to be completed when there is incoming + * connection available. + */ + ioqueue = key->ioqueue; + accept_op = (struct accept_operation*)op_key; + pj_lock_acquire(ioqueue->lock); - key->op = PJ_IOQUEUE_OP_ACCEPT; - key->accept_fd = new_sock; - key->rmt_addr = remote; - key->rmt_addrlen = addrlen; - key->local_addr = local; - key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */ - + accept_op->op = PJ_IOQUEUE_OP_ACCEPT; + accept_op->accept_fd = new_sock; + accept_op->rmt_addr = remote; + accept_op->addrlen= addrlen; + accept_op->local_addr = local; + + pj_list_insert_before(&key->accept_list, accept_op); PJ_FD_SET(key->fd, &ioqueue->rfdset); pj_lock_release(ioqueue->lock); + return PJ_EPENDING; } @@ -908,28 +1119,28 @@ * since there's no overlapped version of connect()). */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, const pj_sockaddr_t *addr, int addrlen ) { - pj_status_t rc; + pj_ioqueue_t *ioqueue; + pj_status_t status; /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL); - - /* Connecting socket must have no other operation! */ - PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY); + PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); + + /* Check if socket has not been marked for connecting */ + if (key->connecting != 0) + return PJ_EPENDING; - rc = pj_sock_connect(key->fd, addr, addrlen); - if (rc == PJ_SUCCESS) { + status = pj_sock_connect(key->fd, addr, addrlen); + if (status == PJ_SUCCESS) { /* Connected! */ return PJ_SUCCESS; } else { - if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || - rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) - { + if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { /* Pending! */ + ioqueue = key->ioqueue; pj_lock_acquire(ioqueue->lock); - key->op = PJ_IOQUEUE_OP_CONNECT; + key->connecting = PJ_TRUE; PJ_FD_SET(key->fd, &ioqueue->wfdset); PJ_FD_SET(key->fd, &ioqueue->xfdset); @@ -938,5 +1149,5 @@ } else { /* Error! */ - return rc; + return status; } } Index: /pjproject/main/pjlib/src/pj/ioqueue_winnt.c =================================================================== --- /pjproject/main/pjlib/src/pj/ioqueue_winnt.c (revision 10) +++ /pjproject/main/pjlib/src/pj/ioqueue_winnt.c (revision 11) @@ -24,5 +24,14 @@ -#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+20) +/* The address specified in AcceptEx() must be 16 more than the size of + * SOCKADDR (source: MSDN). + */ +#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16) + +typedef struct generic_overlapped +{ + WSAOVERLAPPED overlapped; + pj_ioqueue_operation_e operation; +} generic_overlapped; /* @@ -34,4 +43,6 @@ pj_ioqueue_operation_e operation; WSABUF wsabuf; + pj_sockaddr_in dummy_addr; + int dummy_addrlen; } ioqueue_overlapped; @@ -54,15 +65,25 @@ /* + * Structure to hold pending operation key. + */ +union operation_key +{ + generic_overlapped generic; + ioqueue_overlapped overlapped; +#if PJ_HAS_TCP + ioqueue_accept_rec accept; +#endif +}; + +/* * Structure for individual socket. */ struct pj_ioqueue_key_t { + pj_ioqueue_t *ioqueue; HANDLE hnd; void *user_data; - ioqueue_overlapped recv_overlapped; - ioqueue_overlapped send_overlapped; #if PJ_HAS_TCP int connecting; - ioqueue_accept_rec accept_overlapped; #endif pj_ioqueue_callback cb; @@ -108,6 +129,11 @@ &remote, &remotelen); - pj_memcpy(accept_overlapped->local, local, locallen); - pj_memcpy(accept_overlapped->remote, remote, locallen); + if (*accept_overlapped->addrlen > locallen) { + pj_memcpy(accept_overlapped->local, local, locallen); + pj_memcpy(accept_overlapped->remote, remote, locallen); + } else { + pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen); + pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen); + } *accept_overlapped->addrlen = locallen; if (accept_overlapped->newsock_ptr) @@ -121,5 +147,4 @@ pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; HANDLE hEvent = ioqueue->connecting_handles[pos]; - unsigned long optval; /* Remove key from array of connecting handles. */ @@ -144,10 +169,4 @@ } - /* Set socket to blocking again. */ - optval = 0; - if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { - DWORD dwStatus; - dwStatus = WSAGetLastError(); - } } @@ -184,5 +203,6 @@ ioqueue->connecting_handles[pos], &net_events); - *connect_err = net_events.iErrorCode[FD_CONNECT_BIT]; + *connect_err = + PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); /* Erase socket from pending connect. */ @@ -195,76 +215,92 @@ #endif - +/* + * pj_ioqueue_create() + */ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, - pj_ioqueue_t **ioqueue) -{ - pj_ioqueue_t *ioq; + pj_ioqueue_t **p_ioqueue) +{ + pj_ioqueue_t *ioqueue; pj_status_t rc; PJ_UNUSED_ARG(max_fd); - PJ_ASSERT_RETURN(pool && ioqueue, PJ_EINVAL); - - ioq = pj_pool_zalloc(pool, sizeof(*ioq)); - ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, max_threads); - if (ioq->iocp == NULL) + PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); + + rc = sizeof(union operation_key); + + /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); + ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (ioqueue->iocp == NULL) return PJ_RETURN_OS_ERROR(GetLastError()); - rc = pj_lock_create_simple_mutex(pool, NULL, &ioq->lock); + rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); if (rc != PJ_SUCCESS) { - CloseHandle(ioq->iocp); + CloseHandle(ioqueue->iocp); return rc; } - ioq->auto_delete_lock = PJ_TRUE; - - *ioqueue = ioq; - - PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioq)); + ioqueue->auto_delete_lock = PJ_TRUE; + + *p_ioqueue = ioqueue; + + PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); return PJ_SUCCESS; } -PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioque ) +/* + * pj_ioqueue_destroy() + */ +PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) { unsigned i; PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(ioque, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); /* Destroy events in the pool */ - for (i=0; i