Changeset 11 for pjproject/main
- Timestamp:
- Nov 6, 2005 9:37:47 AM (19 years ago)
- Location:
- pjproject/main/pjlib
- Files:
-
- 1 added
- 23 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/main/pjlib/build/pjlib.dsp
r1 r11 392 392 # Begin Source File 393 393 394 SOURCE=..\include\pj\compat\os_sunos.h 395 # End Source File 396 # Begin Source File 397 394 398 SOURCE=..\include\pj\compat\os_win32.h 395 399 # End Source File -
pjproject/main/pjlib/build/pjlib_test.dsp
r1 r11 197 197 # Begin Source File 198 198 199 SOURCE="..\src\pjlib-test\udp_echo_srv_ioqueue.c" 200 # End Source File 201 # Begin Source File 202 199 203 SOURCE="..\src\pjlib-test\udp_echo_srv_sync.c" 200 204 # End Source File -
pjproject/main/pjlib/include/pj/compat/os_linux.h
r4 r11 57 57 #define PJ_SOCK_HAS_INET_ATON 1 58 58 59 /* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 60 * the status of non-blocking connect() operation. 61 */ 62 #define PJ_HAS_SO_ERROR 1 63 64 /* This value specifies the value set in errno by the OS when a non-blocking 65 * socket recv() can not return immediate daata. 66 */ 67 #define PJ_BLOCKING_ERROR_VAL EAGAIN 68 69 /* This value specifies the value set in errno by the OS when a non-blocking 70 * socket connect() can not get connected immediately. 71 */ 72 #define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS 73 59 74 /* Default threading is enabled, unless it's overridden. */ 60 75 #ifndef PJ_HAS_THREADS -
pjproject/main/pjlib/include/pj/compat/os_linux_kernel.h
r4 r11 54 54 #define PJ_SOCK_HAS_INET_ATON 0 55 55 56 /* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 57 * the status of non-blocking connect() operation. 58 */ 59 #define PJ_HAS_SO_ERROR 1 60 61 /* This value specifies the value set in errno by the OS when a non-blocking 62 * socket recv() can not return immediate daata. 63 */ 64 #define PJ_BLOCKING_ERROR_VAL EAGAIN 65 66 /* This value specifies the value set in errno by the OS when a non-blocking 67 * socket connect() can not get connected immediately. 68 */ 69 #define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS 70 56 71 #ifndef PJ_HAS_THREADS 57 72 # define PJ_HAS_THREADS (1) -
pjproject/main/pjlib/include/pj/compat/os_palmos.h
r4 r11 46 46 #define PJ_SOCK_HAS_INET_ATON 0 47 47 48 /* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 49 * the status of non-blocking connect() operation. 50 */ 51 #define PJ_HAS_SO_ERROR 0 52 53 /* This value specifies the value set in errno by the OS when a non-blocking 54 * socket recv() can not return immediate daata. 55 */ 56 #define PJ_BLOCKING_ERROR_VAL xxx 57 58 /* This value specifies the value set in errno by the OS when a non-blocking 59 * socket connect() can not get connected immediately. 60 */ 61 #define PJ_BLOCKING_CONNECT_ERROR_VAL xxx 62 48 63 /* Default threading is enabled, unless it's overridden. */ 49 64 #ifndef PJ_HAS_THREADS -
pjproject/main/pjlib/include/pj/compat/os_sunos.h
r4 r11 41 41 #define PJ_SOCK_HAS_INET_ATON 0 42 42 43 /* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 44 * the status of non-blocking connect() operation. 45 */ 46 #define PJ_HAS_SO_ERROR 0 47 48 /* This value specifies the value set in errno by the OS when a non-blocking 49 * socket recv() can not return immediate daata. 50 */ 51 #define PJ_BLOCKING_ERROR_VAL EWOULDBLOCK 52 53 /* This value specifies the value set in errno by the OS when a non-blocking 54 * socket connect() can not get connected immediately. 55 */ 56 #define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS 57 43 58 /* Default threading is enabled, unless it's overridden. */ 44 59 #ifndef PJ_HAS_THREADS -
pjproject/main/pjlib/include/pj/compat/os_win32.h
r4 r11 61 61 #define PJ_SOCK_HAS_INET_ATON 0 62 62 63 /* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return 64 * the status of non-blocking connect() operation. 65 */ 66 #define PJ_HAS_SO_ERROR 0 67 68 /* This value specifies the value set in errno by the OS when a non-blocking 69 * socket recv() or send() can not return immediately. 70 */ 71 #define PJ_BLOCKING_ERROR_VAL WSAEWOULDBLOCK 72 73 /* This value specifies the value set in errno by the OS when a non-blocking 74 * socket connect() can not get connected immediately. 75 */ 76 #define PJ_BLOCKING_CONNECT_ERROR_VAL WSAEWOULDBLOCK 77 63 78 /* Default threading is enabled, unless it's overridden. */ 64 79 #ifndef PJ_HAS_THREADS -
pjproject/main/pjlib/include/pj/doxygen.h
r8 r11 89 89 * <b>PJLIB Page Documentation</b> on navigation pane of your PDF reader. 90 90 * 91 * 92 * 91 * - <b>How to Submit Code to PJLIB Project</b> 92 *\n 93 * Please read \ref pjlib_coding_convention_page before submitting 94 * your code. Send your code as patch against current Subversion tree 95 * to the appropriate mailing list. 93 96 * 94 97 * … … 395 398 396 399 400 401 /*////////////////////////////////////////////////////////////////////////// */ 402 /* 403 CODING CONVENTION 404 */ 405 406 /** 407 * @page pjlib_coding_convention_page Coding Convention 408 * 409 * Before you submit your code/patches to be included with PJLIB, you must 410 * make sure that your code is compliant with PJLIB coding convention. 411 * <b>This is very important!</b> Otherwise we would not accept your code. 412 * 413 * @section coding_conv_editor_sec Editor Settings 414 * 415 * The single most important thing in the whole coding convention is editor 416 * settings. It's more important than the correctness of your code (bugs will 417 * only crash the system, but incorrect tab size is mental!). 418 * 419 * Kindly set your editor as follows: 420 * - tab size to \b 8. 421 * - indentation to \b 4. 422 * 423 * With \c vi, you can do it with: 424 * <pre> 425 * :se ts=8 426 * :se sts=4 427 * </pre> 428 * 429 * You should replace tab with eight spaces. 430 * 431 * @section coding_conv_detail_sec Coding Style 432 * 433 * Coding style MUST strictly follow K&R style. The rest of coding style 434 * must follow current style. You SHOULD be able to observe the style 435 * currently used by PJLIB from PJLIB sources, and apply the style to your 436 * code. If you're not able to do simple thing like to observe PJLIB 437 * coding style from the sources, then logic dictates that your ability to 438 * observe more difficult area in PJLIB such as memory allocation strategy, 439 * concurrency, etc is questionable. 440 * 441 * @section coding_conv_comment_sec Commenting Your Code 442 * 443 * Public API (e.g. in header files) MUST have doxygen compliant comments. 444 * 445 */ 397 446 398 447 -
pjproject/main/pjlib/include/pj/ioqueue.h
r4 r11 1 1 /* $Id$ 2 *3 2 */ 4 3 … … 49 48 * @{ 50 49 * 51 * This file provides abstraction for various event dispatching mechanisms. 52 * The interfaces for event dispatching vary alot, even in a single 53 * operating system. The abstraction here hopefully is suitable for most of 54 * the event dispatching available. 55 * 56 * Currently, the I/O Queue supports: 57 * - select(), as the common denominator, but the least efficient. 58 * - I/O Completion ports in Windows NT/2000/XP, which is the most efficient 59 * way to dispatch events in Windows NT based OSes, and most importantly, 60 * it doesn't have the limit on how many handles to monitor. And it works 61 * with files (not only sockets) as well. 50 * I/O Queue provides API for performing asynchronous I/O operations. It 51 * conforms to proactor pattern, which allows application to submit an 52 * asynchronous operation and to be notified later when the operation has 53 * completed. 54 * 55 * The framework works natively in platforms where asynchronous operation API 56 * exists, such as in Windows NT with IoCompletionPort/IOCP. In other 57 * platforms, the I/O queue abstracts the operating system's event poll API 58 * to provide semantics similar to IoCompletionPort with minimal penalties 59 * (i.e. per ioqueue and per handle mutex protection). 60 * 61 * The I/O queue provides more than just unified abstraction. It also: 62 * - makes sure that the operation uses the most effective way to utilize 63 * the underlying mechanism, to provide the maximum theoritical 64 * throughput possible on a given platform. 65 * - choose the most efficient mechanism for event polling on a given 66 * platform. 67 * 68 * Currently, the I/O Queue is implemented using: 69 * - <tt><b>select()</b></tt>, as the common denominator, but the least 70 * efficient. Also the number of descriptor is limited to 71 * \c PJ_IOQUEUE_MAX_HANDLES (which by default is 64). 72 * - <tt><b>/dev/epoll</b></tt> on Linux (user mode and kernel mode), 73 * a much faster replacement for select() on Linux (and more importantly 74 * doesn't have limitation on number of descriptors). 75 * - <b>I/O Completion ports</b> on Windows NT/2000/XP, which is the most 76 * efficient way to dispatch events in Windows NT based OSes, and most 77 * importantly, it doesn't have the limit on how many handles to monitor. 78 * And it works with files (not only sockets) as well. 79 * 80 * 81 * \section pj_ioqueue_concurrency_sec Concurrency Rules 82 * 83 * The items below describe rules that must be obeyed when using the I/O 84 * queue, with regard to concurrency: 85 * - in general, the I/O queue is thread safe (assuming the lock strategy 86 * is not changed to disable mutex protection). All operations, except 87 * unregistration which is described below, can be safely invoked 88 * simultaneously by multiple threads. 89 * - however, <b>care must be taken when unregistering a key</b> from the 90 * ioqueue. Application must take care that when one thread is issuing 91 * an unregistration, other thread is not simultaneously invoking an 92 * operation <b>to the same key</b>. 93 *\n 94 * This happens because the ioqueue functions are working with a pointer 95 * to the key, and there is a possible race condition where the pointer 96 * has been rendered invalid by other threads before the ioqueue has a 97 * chance to acquire mutex on it. 62 98 * 63 99 * \section pj_ioqeuue_examples_sec Examples … … 70 106 */ 71 107 72 /** 73 * This structure describes the callbacks to be called when I/O operation 74 * completes. 75 */ 108 109 110 /** 111 * This structure describes operation specific key to be submitted to 112 * I/O Queue when performing the asynchronous operation. This key will 113 * be returned to the application when completion callback is called. 114 * 115 * Application normally wants to attach it's specific data in the 116 * \c user_data field so that it can keep track of which operation has 117 * completed when the callback is called. Alternatively, application can 118 * also extend this struct to include its data, because the pointer that 119 * is returned in the completion callback will be exactly the same as 120 * the pointer supplied when the asynchronous function is called. 121 */ 122 typedef struct pj_ioqueue_op_key_t 123 { 124 void *internal__[32]; /**< Internal I/O Queue data. */ 125 void *user_data; /**< Application data. */ 126 } pj_ioqueue_op_key_t; 127 128 /** 129 * This structure describes the callbacks to be called when I/O operation 130 * completes. 131 */ 76 132 typedef struct pj_ioqueue_callback 77 133 { 78 134 /** 79 * This callback is called when #pj_ioqueue_re ador #pj_ioqueue_recvfrom135 * This callback is called when #pj_ioqueue_recv or #pj_ioqueue_recvfrom 80 136 * completes. 81 137 * 82 138 * @param key The key. 83 * @param bytes_read The size of data that has just been read. 139 * @param op_key Operation key. 140 * @param bytes_read >= 0 to indicate the amount of data read, 141 * otherwise negative value containing the error 142 * code. To obtain the pj_status_t error code, use 143 * (pj_status_t code = -bytes_read). 84 144 */ 85 void (*on_read_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_read); 145 void (*on_read_complete)(pj_ioqueue_key_t *key, 146 pj_ioqueue_op_key_t *op_key, 147 pj_ssize_t bytes_read); 86 148 87 149 /** … … 90 152 * 91 153 * @param key The key. 92 * @param bytes_read The size of data that has just been read. 154 * @param op_key Operation key. 155 * @param bytes_sent >= 0 to indicate the amount of data written, 156 * otherwise negative value containing the error 157 * code. To obtain the pj_status_t error code, use 158 * (pj_status_t code = -bytes_sent). 93 159 */ 94 void (*on_write_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent); 160 void (*on_write_complete)(pj_ioqueue_key_t *key, 161 pj_ioqueue_op_key_t *op_key, 162 pj_ssize_t bytes_sent); 95 163 96 164 /** … … 98 166 * 99 167 * @param key The key. 168 * @param op_key Operation key. 100 169 * @param sock Newly connected socket. 101 170 * @param status Zero if the operation completes successfully. 102 171 */ 103 void (*on_accept_complete)(pj_ioqueue_key_t *key, pj_sock_t sock, 104 int status); 172 void (*on_accept_complete)(pj_ioqueue_key_t *key, 173 pj_ioqueue_op_key_t *op_key, 174 pj_sock_t sock, 175 pj_status_t status); 105 176 106 177 /** … … 108 179 * 109 180 * @param key The key. 110 * @param status Zeroif the operation completes successfully.181 * @param status PJ_SUCCESS if the operation completes successfully. 111 182 */ 112 void (*on_connect_complete)(pj_ioqueue_key_t *key, int status); 183 void (*on_connect_complete)(pj_ioqueue_key_t *key, 184 pj_status_t status); 113 185 } pj_ioqueue_callback; 114 186 115 187 116 188 /** 117 * Types of I/O Queue operation. 189 * Types of pending I/O Queue operation. This enumeration is only used 190 * internally within the ioqueue. 118 191 */ 119 192 typedef enum pj_ioqueue_operation_e … … 139 212 #define PJ_IOQUEUE_DEFAULT_THREADS 0 140 213 141 142 214 /** 143 215 * Create a new I/O Queue framework. … … 146 218 * @param max_fd The maximum number of handles to be supported, which 147 219 * should not exceed PJ_IOQUEUE_MAX_HANDLES. 148 * @param max_threads The maximum number of threads that are allowed to149 * operate on a single descriptor simultaneously. If150 * the value is zero, the framework will set it151 * to a reasonable value.152 220 * @param ioqueue Pointer to hold the newly created I/O Queue. 153 221 * … … 156 224 PJ_DECL(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 157 225 pj_size_t max_fd, 158 int max_threads,159 226 pj_ioqueue_t **ioqueue); 160 227 … … 201 268 * retrieved later. 202 269 * @param cb Callback to be called when I/O operation completes. 203 * @param key Pointer to receive the returned key. 270 * @param key Pointer to receive the key to be associated with this 271 * socket. Subsequent I/O queue operation will need this 272 * key. 204 273 * 205 274 * @return PJ_SUCCESS on success, or the error code. … … 210 279 void *user_data, 211 280 const pj_ioqueue_callback *cb, 212 pj_ioqueue_key_t **key); 213 214 /** 215 * Unregister a handle from the I/O Queue framework. 216 * 217 * @param ioque The I/O Queue. 218 * @param key The key that uniquely identifies the handle, which is 219 * returned from the function #pj_ioqueue_register_sock() 220 * or other registration functions. 281 pj_ioqueue_key_t **key ); 282 283 /** 284 * Unregister from the I/O Queue framework. Caller must make sure that 285 * the key doesn't have any pending operation before calling this function, 286 * or otherwise the behaviour is undefined (either callback will be called 287 * later when the data is sent/received, or the callback will not be called, 288 * or even something else). 289 * 290 * @param key The key that was previously obtained from registration. 221 291 * 222 292 * @return PJ_SUCCESS on success or the error code. 223 293 */ 224 PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, 225 pj_ioqueue_key_t *key ); 226 227 228 /** 229 * Get user data associated with the I/O Queue key. 230 * 231 * @param key The key previously associated with the socket/handle with 232 * #pj_ioqueue_register_sock() (or other registration 233 * functions). 234 * 235 * @return The user data associated with the key, or NULL on error 236 * of if no data is associated with the key during 294 PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ); 295 296 297 /** 298 * Get user data associated with an ioqueue key. 299 * 300 * @param key The key that was previously obtained from registration. 301 * 302 * @return The user data associated with the descriptor, or NULL 303 * on error or if no data is associated with the key during 237 304 * registration. 238 305 */ 239 306 PJ_DECL(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ); 240 307 308 /** 309 * Set or change the user data to be associated with the file descriptor or 310 * handle or socket descriptor. 311 * 312 * @param key The key that was previously obtained from registration. 313 * @param user_data User data to be associated with the descriptor. 314 * @param old_data Optional parameter to retrieve the old user data. 315 * 316 * @return PJ_SUCCESS on success or the error code. 317 */ 318 PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 319 void *user_data, 320 void **old_data); 321 241 322 242 323 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 243 324 /** 244 * Instruct I/O Queue to wait for incoming connections on the specified 245 * listening socket. This function will return 246 * immediately (i.e. non-blocking) regardless whether some data has been 247 * transfered. If the function can't complete immediately, and the caller will 248 * be notified about the completion when it calls pj_ioqueue_poll(). 249 * 250 * @param ioqueue The I/O Queue 325 * Instruct I/O Queue to accept incoming connection on the specified 326 * listening socket. This function will return immediately (i.e. non-blocking) 327 * regardless whether a connection is immediately available. If the function 328 * can't complete immediately, the caller will be notified about the incoming 329 * connection when it calls pj_ioqueue_poll(). If a new connection is 330 * immediately available, the function returns PJ_SUCCESS with the new 331 * connection; in this case, the callback WILL NOT be called. 332 * 251 333 * @param key The key which registered to the server socket. 252 * @param sock Argument which contain pointer to receive 253 * the socket for the incoming connection. 334 * @param op_key An operation specific key to be associated with the 335 * pending operation, so that application can keep track of 336 * which operation has been completed when the callback is 337 * called. 338 * @param new_sock Argument which contain pointer to receive the new socket 339 * for the incoming connection. 254 340 * @param local Optional argument which contain pointer to variable to 255 341 * receive local address. … … 260 346 * address. This argument is optional. 261 347 * @return 262 * - PJ_SUCCESS If there's a connection available immediately, which 263 * in this case the callback should have been called before 264 * the function returns. 265 * - PJ_EPENDING If accept is queued, or 348 * - PJ_SUCCESS When connection is available immediately, and the 349 * parameters will be updated to contain information about 350 * the new connection. In this case, a completion callback 351 * WILL NOT be called. 352 * - PJ_EPENDING If no connection is available immediately. When a new 353 * connection arrives, the callback will be called. 266 354 * - non-zero which indicates the appropriate error code. 267 355 */ 268 PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_ t *ioqueue,269 pj_ioqueue_key_t *key,356 PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 357 pj_ioqueue_op_key_t *op_key, 270 358 pj_sock_t *sock, 271 359 pj_sockaddr_t *local, … … 275 363 /** 276 364 * Initiate non-blocking socket connect. If the socket can NOT be connected 277 * immediately, the result will be reported during poll. 278 * 279 * @param ioqueue The ioqueue 365 * immediately, asynchronous connect() will be scheduled and caller will be 366 * notified via completion callback when it calls pj_ioqueue_poll(). If 367 * socket is connected immediately, the function returns PJ_SUCCESS and 368 * completion callback WILL NOT be called. 369 * 280 370 * @param key The key associated with TCP socket 281 371 * @param addr The remote address. … … 283 373 * 284 374 * @return 285 * - PJ_SUCCESS If socket is connected immediately , which in this case286 * the callback should have beencalled.375 * - PJ_SUCCESS If socket is connected immediately. In this case, the 376 * completion callback WILL NOT be called. 287 377 * - PJ_EPENDING If operation is queued, or 288 378 * - non-zero Indicates the error code. 289 379 */ 290 PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, 291 pj_ioqueue_key_t *key, 380 PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 292 381 const pj_sockaddr_t *addr, 293 382 int addrlen ); … … 310 399 const pj_time_val *timeout); 311 400 401 312 402 /** 313 403 * Instruct the I/O Queue to read from the specified handle. This function 314 404 * returns immediately (i.e. non-blocking) regardless whether some data has 315 405 * been transfered. If the operation can't complete immediately, caller will 316 * be notified about the completion when it calls pj_ioqueue_poll(). 317 * 318 * @param ioque The I/O Queue. 406 * be notified about the completion when it calls pj_ioqueue_poll(). If data 407 * is immediately available, the function will return PJ_SUCCESS and the 408 * callback WILL NOT be called. 409 * 319 410 * @param key The key that uniquely identifies the handle. 411 * @param op_key An operation specific key to be associated with the 412 * pending operation, so that application can keep track of 413 * which operation has been completed when the callback is 414 * called. Caller must make sure that this key remains 415 * valid until the function completes. 320 416 * @param buffer The buffer to hold the read data. The caller MUST make sure 321 417 * that this buffer remain valid until the framework completes 322 418 * reading the handle. 323 * @param buflen The maximum size to be read. 419 * @param length On input, it specifies the size of the buffer. If data is 420 * available to be read immediately, the function returns 421 * PJ_SUCCESS and this argument will be filled with the 422 * amount of data read. If the function is pending, caller 423 * will be notified about the amount of data read in the 424 * callback. This parameter can point to local variable in 425 * caller's stack and doesn't have to remain valid for the 426 * duration of pending operation. 427 * @param flags Recv flag. 428 * 429 * @return 430 * - PJ_SUCCESS If immediate data has been received in the buffer. In this 431 * case, the callback WILL NOT be called. 432 * - PJ_EPENDING If the operation has been queued, and the callback will be 433 * called when data has been received. 434 * - non-zero The return value indicates the error code. 435 */ 436 PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, 437 pj_ioqueue_op_key_t *op_key, 438 void *buffer, 439 pj_ssize_t *length, 440 unsigned flags ); 441 442 /** 443 * This function behaves similarly as #pj_ioqueue_recv(), except that it is 444 * normally called for socket, and the remote address will also be returned 445 * along with the data. Caller MUST make sure that both buffer and addr 446 * remain valid until the framework completes reading the data. 447 * 448 * @param key The key that uniquely identifies the handle. 449 * @param op_key An operation specific key to be associated with the 450 * pending operation, so that application can keep track of 451 * which operation has been completed when the callback is 452 * called. 453 * @param buffer The buffer to hold the read data. The caller MUST make sure 454 * that this buffer remain valid until the framework completes 455 * reading the handle. 456 * @param length On input, it specifies the size of the buffer. If data is 457 * available to be read immediately, the function returns 458 * PJ_SUCCESS and this argument will be filled with the 459 * amount of data read. If the function is pending, caller 460 * will be notified about the amount of data read in the 461 * callback. This parameter can point to local variable in 462 * caller's stack and doesn't have to remain valid for the 463 * duration of pending operation. 464 * @param flags Recv flag. 465 * @param addr Optional Pointer to buffer to receive the address. 466 * @param addrlen On input, specifies the length of the address buffer. 467 * On output, it will be filled with the actual length of 468 * the address. This argument can be NULL if \c addr is not 469 * specified. 324 470 * 325 471 * @return … … 330 476 * - non-zero The return value indicates the error code. 331 477 */ 332 PJ_DECL(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, 333 pj_ioqueue_key_t *key, 334 void *buffer, 335 pj_size_t buflen); 336 337 338 /** 339 * This function behaves similarly as #pj_ioqueue_read(), except that it is 340 * normally called for socket. 341 * 342 * @param ioque The I/O Queue. 343 * @param key The key that uniquely identifies the handle. 344 * @param buffer The buffer to hold the read data. The caller MUST make sure 345 * that this buffer remain valid until the framework completes 346 * reading the handle. 347 * @param buflen The maximum size to be read. 348 * @param flags Recv flag. 349 * 350 * @return 351 * - PJ_SUCCESS If immediate data has been received. In this case, the 352 * callback must have been called before this function 353 * returns, and no pending operation is scheduled. 354 * - PJ_EPENDING If the operation has been queued. 355 * - non-zero The return value indicates the error code. 356 */ 357 PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, 358 pj_ioqueue_key_t *key, 359 void *buffer, 360 pj_size_t buflen, 361 unsigned flags ); 362 363 /** 364 * This function behaves similarly as #pj_ioqueue_read(), except that it is 365 * normally called for socket, and the remote address will also be returned 366 * along with the data. Caller MUST make sure that both buffer and addr 367 * remain valid until the framework completes reading the data. 368 * 369 * @param ioque The I/O Queue. 370 * @param key The key that uniquely identifies the handle. 371 * @param buffer The buffer to hold the read data. The caller MUST make sure 372 * that this buffer remain valid until the framework completes 373 * reading the handle. 374 * @param buflen The maximum size to be read. 375 * @param flags Recv flag. 376 * @param addr Pointer to buffer to receive the address, or NULL. 377 * @param addrlen On input, specifies the length of the address buffer. 378 * On output, it will be filled with the actual length of 379 * the address. 380 * 381 * @return 382 * - PJ_SUCCESS If immediate data has been received. In this case, the 383 * callback must have been called before this function 384 * returns, and no pending operation is scheduled. 385 * - PJ_EPENDING If the operation has been queued. 386 * - non-zero The return value indicates the error code. 387 */ 388 PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, 389 pj_ioqueue_key_t *key, 478 PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 479 pj_ioqueue_op_key_t *op_key, 390 480 void *buffer, 391 pj_s ize_t buflen,481 pj_ssize_t *length, 392 482 unsigned flags, 393 483 pj_sockaddr_t *addr, 394 484 int *addrlen); 395 485 486 396 487 /** 397 488 * Instruct the I/O Queue to write to the handle. This function will return 398 489 * immediately (i.e. non-blocking) regardless whether some data has been 399 * transfered. If the function can't complete immediately, and the caller will 400 * be notified about the completion when it calls pj_ioqueue_poll(). 401 * 402 * @param ioque the I/O Queue. 490 * transfered. If the function can't complete immediately, the caller will 491 * be notified about the completion when it calls pj_ioqueue_poll(). If 492 * operation completes immediately and data has been transfered, the function 493 * returns PJ_SUCCESS and the callback will NOT be called. 494 * 495 * @param key The key that identifies the handle. 496 * @param op_key An operation specific key to be associated with the 497 * pending operation, so that application can keep track of 498 * which operation has been completed when the callback is 499 * called. 500 * @param data The data to send. Caller MUST make sure that this buffer 501 * remains valid until the write operation completes. 502 * @param length On input, it specifies the length of data to send. When 503 * data was sent immediately, this function returns PJ_SUCCESS 504 * and this parameter contains the length of data sent. If 505 * data can not be sent immediately, an asynchronous operation 506 * is scheduled and caller will be notified via callback the 507 * number of bytes sent. This parameter can point to local 508 * variable on caller's stack and doesn't have to remain 509 * valid until the operation has completed. 510 * @param flags Send flags. 511 * 512 * @return 513 * - PJ_SUCCESS If data was immediately transfered. In this case, no 514 * pending operation has been scheduled and the callback 515 * WILL NOT be called. 516 * - PJ_EPENDING If the operation has been queued. Once data base been 517 * transfered, the callback will be called. 518 * - non-zero The return value indicates the error code. 519 */ 520 PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 521 pj_ioqueue_op_key_t *op_key, 522 const void *data, 523 pj_ssize_t *length, 524 unsigned flags ); 525 526 527 /** 528 * This function behaves similarly as #pj_ioqueue_write(), except that 529 * pj_sock_sendto() (or equivalent) will be called to send the data. 530 * 403 531 * @param key the key that identifies the handle. 532 * @param op_key An operation specific key to be associated with the 533 * pending operation, so that application can keep track of 534 * which operation has been completed when the callback is 535 * called. 404 536 * @param data the data to send. Caller MUST make sure that this buffer 405 537 * remains valid until the write operation completes. 406 * @param datalen the length of the data. 538 * @param length On input, it specifies the length of data to send. When 539 * data was sent immediately, this function returns PJ_SUCCESS 540 * and this parameter contains the length of data sent. If 541 * data can not be sent immediately, an asynchronous operation 542 * is scheduled and caller will be notified via callback the 543 * number of bytes sent. This parameter can point to local 544 * variable on caller's stack and doesn't have to remain 545 * valid until the operation has completed. 546 * @param flags send flags. 547 * @param addr Optional remote address. 548 * @param addrlen Remote address length, \c addr is specified. 407 549 * 408 550 * @return … … 411 553 * - non-zero The return value indicates the error code. 412 554 */ 413 PJ_DECL(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, 414 pj_ioqueue_key_t *key, 415 const void *data, 416 pj_size_t datalen); 417 418 /** 419 * This function behaves similarly as #pj_ioqueue_write(), except that 420 * pj_sock_send() (or equivalent) will be called to send the data. 421 * 422 * @param ioque the I/O Queue. 423 * @param key the key that identifies the handle. 424 * @param data the data to send. Caller MUST make sure that this buffer 425 * remains valid until the write operation completes. 426 * @param datalen the length of the data. 427 * @param flags send flags. 428 * 429 * @return 430 * - PJ_SUCCESS If data was immediately written. 431 * - PJ_EPENDING If the operation has been queued. 432 * - non-zero The return value indicates the error code. 433 */ 434 PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, 435 pj_ioqueue_key_t *key, 436 const void *data, 437 pj_size_t datalen, 438 unsigned flags ); 439 440 441 /** 442 * This function behaves similarly as #pj_ioqueue_write(), except that 443 * pj_sock_sendto() (or equivalent) will be called to send the data. 444 * 445 * @param ioque the I/O Queue. 446 * @param key the key that identifies the handle. 447 * @param data the data to send. Caller MUST make sure that this buffer 448 * remains valid until the write operation completes. 449 * @param datalen the length of the data. 450 * @param flags send flags. 451 * @param addr remote address. 452 * @param addrlen remote address length. 453 * 454 * @return 455 * - PJ_SUCCESS If data was immediately written. 456 * - PJ_EPENDING If the operation has been queued. 457 * - non-zero The return value indicates the error code. 458 */ 459 PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, 460 pj_ioqueue_key_t *key, 555 PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 556 pj_ioqueue_op_key_t *op_key, 461 557 const void *data, 462 pj_s ize_t datalen,558 pj_ssize_t *length, 463 559 unsigned flags, 464 560 const pj_sockaddr_t *addr, -
pjproject/main/pjlib/include/pj/list.h
r4 r11 47 47 */ 48 48 #define PJ_DECL_LIST_MEMBER(type) type *prev; /** List @a prev. */ \ 49 type *next ;/** List @a next. */49 type *next /** List @a next. */ 50 50 51 51 … … 57 57 struct pj_list 58 58 { 59 PJ_DECL_LIST_MEMBER(void) 59 PJ_DECL_LIST_MEMBER(void); 60 60 }; 61 61 -
pjproject/main/pjlib/include/pj/pool.h
r4 r11 114 114 typedef struct pj_pool_block 115 115 { 116 PJ_DECL_LIST_MEMBER(struct pj_pool_block) 116 PJ_DECL_LIST_MEMBER(struct pj_pool_block); /**< List's prev and next. */ 117 117 unsigned char *buf; /**< Start of buffer. */ 118 118 unsigned char *cur; /**< Current alloc ptr. */ … … 127 127 struct pj_pool_t 128 128 { 129 PJ_DECL_LIST_MEMBER(struct pj_pool_t) 129 PJ_DECL_LIST_MEMBER(struct pj_pool_t); 130 130 131 131 /** Pool name */ -
pjproject/main/pjlib/include/pj/sock_select.h
r4 r11 98 98 99 99 /** 100 * Get the number of descriptors in the set.101 *102 * @param fdsetp The descriptor set.103 *104 * @return Number of descriptors in the set.105 */106 PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);107 108 109 /**110 100 * This function wait for a number of file descriptors to change status. 111 101 * The behaviour is the same as select() function call which appear in -
pjproject/main/pjlib/include/pj/xml.h
r4 r11 31 31 struct pj_xml_attr 32 32 { 33 PJ_DECL_LIST_MEMBER(pj_xml_attr) 33 PJ_DECL_LIST_MEMBER(pj_xml_attr); 34 34 pj_str_t name; /**< Attribute name. */ 35 35 pj_str_t value; /**< Attribute value. */ … … 40 40 typedef struct pj_xml_node_head 41 41 { 42 PJ_DECL_LIST_MEMBER(pj_xml_node) 42 PJ_DECL_LIST_MEMBER(pj_xml_node); 43 43 } pj_xml_node_head; 44 44 … … 46 46 struct pj_xml_node 47 47 { 48 PJ_DECL_LIST_MEMBER(pj_xml_node) 48 PJ_DECL_LIST_MEMBER(pj_xml_node); /** List @a prev and @a next member */ 49 49 pj_str_t name; /** Node name. */ 50 50 pj_xml_attr attr_head; /** Attribute list. */ -
pjproject/main/pjlib/src/pj/config.c
r6 r11 5 5 6 6 static const char *id = "config.c"; 7 const char *PJ_VERSION = "0.3.0-pre 1";7 const char *PJ_VERSION = "0.3.0-pre4"; 8 8 9 9 PJ_DEF(void) pj_dump_config(void) -
pjproject/main/pjlib/src/pj/ioqueue_select.c
r6 r11 35 35 #define THIS_FILE "ioq_select" 36 36 37 #define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \ 38 (op & PJ_IOQUEUE_OP_RECV) || \ 39 (op & PJ_IOQUEUE_OP_RECV_FROM)) 40 #define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \ 41 (op & PJ_IOQUEUE_OP_SEND) || \ 42 (op & PJ_IOQUEUE_OP_SEND_TO)) 43 44 45 #if PJ_HAS_TCP 46 # define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT) 47 # define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT) 48 #else 49 # define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0 50 # define PJ_IOQUEUE_IS_CONNECT_OP(op) 0 51 #endif 37 /* 38 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return 39 * the correct error code. 40 */ 41 #if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) 42 # error "Error reporting must be enabled for this function to work!" 43 #endif 44 45 /** 46 * Get the number of descriptors in the set. This is defined in sock_select.c 47 * This function will only return the number of sockets set from PJ_FD_SET 48 * operation. When the set is modified by other means (such as by select()), 49 * the count will not be reflected here. 50 * 51 * That's why don't export this function in the header file, to avoid 52 * misunderstanding. 53 * 54 * @param fdsetp The descriptor set. 55 * 56 * @return Number of descriptors in the set. 57 */ 58 PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); 59 60 61 52 62 53 63 /* … … 61 71 #endif 62 72 73 struct generic_operation 74 { 75 PJ_DECL_LIST_MEMBER(struct generic_operation); 76 pj_ioqueue_operation_e op; 77 }; 78 79 struct read_operation 80 { 81 PJ_DECL_LIST_MEMBER(struct read_operation); 82 pj_ioqueue_operation_e op; 83 84 void *buf; 85 pj_size_t size; 86 unsigned flags; 87 pj_sockaddr_t *rmt_addr; 88 int *rmt_addrlen; 89 }; 90 91 struct write_operation 92 { 93 PJ_DECL_LIST_MEMBER(struct write_operation); 94 pj_ioqueue_operation_e op; 95 96 char *buf; 97 pj_size_t size; 98 pj_ssize_t written; 99 unsigned flags; 100 pj_sockaddr_in rmt_addr; 101 int rmt_addrlen; 102 }; 103 104 #if PJ_HAS_TCP 105 struct accept_operation 106 { 107 PJ_DECL_LIST_MEMBER(struct accept_operation); 108 pj_ioqueue_operation_e op; 109 110 pj_sock_t *accept_fd; 111 pj_sockaddr_t *local_addr; 112 pj_sockaddr_t *rmt_addr; 113 int *addrlen; 114 }; 115 #endif 116 117 union operation_key 118 { 119 struct generic_operation generic; 120 struct read_operation read; 121 struct write_operation write; 122 #if PJ_HAS_TCP 123 struct accept_operation accept; 124 #endif 125 }; 126 63 127 /* 64 128 * This describes each key. … … 66 130 struct pj_ioqueue_key_t 67 131 { 68 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) 132 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); 133 pj_ioqueue_t *ioqueue; 69 134 pj_sock_t fd; 70 pj_ioqueue_operation_e op;71 135 void *user_data; 72 136 pj_ioqueue_callback cb; 73 74 void *rd_buf; 75 unsigned rd_flags; 76 pj_size_t rd_buflen; 77 void *wr_buf; 78 pj_size_t wr_buflen; 79 80 pj_sockaddr_t *rmt_addr; 81 int *rmt_addrlen; 82 83 pj_sockaddr_t *local_addr; 84 int *local_addrlen; 85 86 pj_sock_t *accept_fd; 137 int connecting; 138 struct read_operation read_list; 139 struct write_operation write_list; 140 #if PJ_HAS_TCP 141 struct accept_operation accept_list; 142 #endif 87 143 }; 88 144 … … 95 151 pj_bool_t auto_delete_lock; 96 152 unsigned max, count; 97 pj_ioqueue_key_t hlist;153 pj_ioqueue_key_t key_list; 98 154 pj_fd_set_t rfdset; 99 155 pj_fd_set_t wfdset; … … 110 166 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 111 167 pj_size_t max_fd, 112 int max_threads,113 168 pj_ioqueue_t **p_ioqueue) 114 169 { 115 pj_ioqueue_t *ioque ;170 pj_ioqueue_t *ioqueue; 116 171 pj_status_t rc; 117 172 118 PJ_UNUSED_ARG(max_threads); 119 120 if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { 121 pj_assert(!"max_fd too large"); 122 return PJ_EINVAL; 123 } 124 125 ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 126 ioque->max = max_fd; 127 ioque->count = 0; 128 PJ_FD_ZERO(&ioque->rfdset); 129 PJ_FD_ZERO(&ioque->wfdset); 173 /* Check that arguments are valid. */ 174 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 175 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, 176 PJ_EINVAL); 177 178 /* Check that size of pj_ioqueue_op_key_t is sufficient */ 179 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= 180 sizeof(union operation_key), PJ_EBUG); 181 182 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 183 ioqueue->max = max_fd; 184 ioqueue->count = 0; 185 PJ_FD_ZERO(&ioqueue->rfdset); 186 PJ_FD_ZERO(&ioqueue->wfdset); 130 187 #if PJ_HAS_TCP 131 PJ_FD_ZERO(&ioque ->xfdset);132 #endif 133 pj_list_init(&ioque ->hlist);134 135 rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque ->lock);188 PJ_FD_ZERO(&ioqueue->xfdset); 189 #endif 190 pj_list_init(&ioqueue->key_list); 191 192 rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock); 136 193 if (rc != PJ_SUCCESS) 137 194 return rc; 138 195 139 ioque ->auto_delete_lock = PJ_TRUE;140 141 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque ));142 143 *p_ioqueue = ioque ;196 ioqueue->auto_delete_lock = PJ_TRUE; 197 198 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); 199 200 *p_ioqueue = ioqueue; 144 201 return PJ_SUCCESS; 145 202 } … … 150 207 * Destroy ioqueue. 151 208 */ 152 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque )209 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 153 210 { 154 211 pj_status_t rc = PJ_SUCCESS; 155 212 156 PJ_ASSERT_RETURN(ioque, PJ_EINVAL); 157 158 if (ioque->auto_delete_lock) 159 rc = pj_lock_destroy(ioque->lock); 213 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 214 215 pj_lock_acquire(ioqueue->lock); 216 217 if (ioqueue->auto_delete_lock) 218 rc = pj_lock_destroy(ioqueue->lock); 160 219 161 220 return rc; … … 164 223 165 224 /* 166 * pj_ioqueue_set_lock()167 */168 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,169 pj_lock_t *lock,170 pj_bool_t auto_delete )171 {172 PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);173 174 if (ioque->auto_delete_lock) {175 pj_lock_destroy(ioque->lock);176 }177 178 ioque->lock = lock;179 ioque->auto_delete_lock = auto_delete;180 181 return PJ_SUCCESS;182 }183 184 185 /*186 225 * pj_ioqueue_register_sock() 187 226 * … … 189 228 */ 190 229 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 191 pj_ioqueue_t *ioque ,230 pj_ioqueue_t *ioqueue, 192 231 pj_sock_t sock, 193 232 void *user_data, … … 199 238 pj_status_t rc = PJ_SUCCESS; 200 239 201 PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&240 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && 202 241 cb && p_key, PJ_EINVAL); 203 242 204 pj_lock_acquire(ioque ->lock);205 206 if (ioque ->count >= ioque->max) {243 pj_lock_acquire(ioqueue->lock); 244 245 if (ioqueue->count >= ioqueue->max) { 207 246 rc = PJ_ETOOMANY; 208 247 goto on_return; … … 212 251 value = 1; 213 252 #ifdef PJ_WIN32 214 if (ioctlsocket(sock, FIONBIO, (u nsignedlong*)&value)) {253 if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) { 215 254 #else 216 255 if (ioctl(sock, FIONBIO, &value)) { … … 222 261 /* Create key. */ 223 262 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 263 key->ioqueue = ioqueue; 224 264 key->fd = sock; 225 265 key->user_data = user_data; 266 pj_list_init(&key->read_list); 267 pj_list_init(&key->write_list); 268 #if PJ_HAS_TCP 269 pj_list_init(&key->accept_list); 270 #endif 226 271 227 272 /* Save callback. */ … … 229 274 230 275 /* Register */ 231 pj_list_insert_before(&ioque ->hlist, key);232 ++ioque ->count;276 pj_list_insert_before(&ioqueue->key_list, key); 277 ++ioqueue->count; 233 278 234 279 on_return: 280 /* On error, socket may be left in non-blocking mode. */ 235 281 *p_key = key; 236 pj_lock_release(ioque ->lock);282 pj_lock_release(ioqueue->lock); 237 283 238 284 return rc; … … 244 290 * Unregister handle from ioqueue. 245 291 */ 246 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, 247 pj_ioqueue_key_t *key) 248 { 249 PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); 250 251 pj_lock_acquire(ioque->lock); 252 253 pj_assert(ioque->count > 0); 254 --ioque->count; 292 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) 293 { 294 pj_ioqueue_t *ioqueue; 295 296 PJ_ASSERT_RETURN(key, PJ_EINVAL); 297 298 ioqueue = key->ioqueue; 299 300 pj_lock_acquire(ioqueue->lock); 301 302 pj_assert(ioqueue->count > 0); 303 --ioqueue->count; 255 304 pj_list_erase(key); 256 PJ_FD_CLR(key->fd, &ioque ->rfdset);257 PJ_FD_CLR(key->fd, &ioque ->wfdset);305 PJ_FD_CLR(key->fd, &ioqueue->rfdset); 306 PJ_FD_CLR(key->fd, &ioqueue->wfdset); 258 307 #if PJ_HAS_TCP 259 PJ_FD_CLR(key->fd, &ioque ->xfdset);260 #endif 261 262 pj_lock_release(ioque ->lock);308 PJ_FD_CLR(key->fd, &ioqueue->xfdset); 309 #endif 310 311 pj_lock_release(ioqueue->lock); 263 312 return PJ_SUCCESS; 264 313 } … … 276 325 277 326 327 /* 328 * pj_ioqueue_set_user_data() 329 */ 330 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 331 void *user_data, 332 void **old_data) 333 { 334 PJ_ASSERT_RETURN(key, PJ_EINVAL); 335 336 if (old_data) 337 *old_data = key->user_data; 338 key->user_data = user_data; 339 340 return PJ_SUCCESS; 341 } 342 343 278 344 /* This supposed to check whether the fd_set values are consistent 279 345 * with the operation currently set in each key. 280 346 */ 281 347 #if VALIDATE_FD_SET 282 static void validate_sets(const pj_ioqueue_t *ioque ,348 static void validate_sets(const pj_ioqueue_t *ioqueue, 283 349 const pj_fd_set_t *rfdset, 284 350 const pj_fd_set_t *wfdset, … … 287 353 pj_ioqueue_key_t *key; 288 354 289 key = ioque->hlist.next; 290 while (key != &ioque->hlist) { 291 if ((key->op & PJ_IOQUEUE_OP_READ) 292 || (key->op & PJ_IOQUEUE_OP_RECV) 293 || (key->op & PJ_IOQUEUE_OP_RECV_FROM) 355 key = ioqueue->key_list.next; 356 while (key != &ioqueue->key_list) { 357 if (!pj_list_empty(&key->read_list) 294 358 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 295 || (key->op & PJ_IOQUEUE_OP_ACCEPT)359 || !pj_list_empty(&key->accept_list) 296 360 #endif 297 361 ) … … 302 366 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0); 303 367 } 304 if ((key->op & PJ_IOQUEUE_OP_WRITE) 305 || (key->op & PJ_IOQUEUE_OP_SEND) 306 || (key->op & PJ_IOQUEUE_OP_SEND_TO) 368 if (!pj_list_empty(&key->write_list) 307 369 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 308 || (key->op & PJ_IOQUEUE_OP_CONNECT)370 || key->connecting 309 371 #endif 310 372 ) … … 316 378 } 317 379 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 318 if (key-> op & PJ_IOQUEUE_OP_CONNECT)380 if (key->connecting) 319 381 { 320 382 pj_assert(PJ_FD_ISSET(key->fd, xfdset)); … … 348 410 * work on fd_set copy of the ioqueue (not the original one). 349 411 */ 350 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque , const pj_time_val *timeout)412 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 351 413 { 352 414 pj_fd_set_t rfdset, wfdset, xfdset; … … 354 416 pj_ioqueue_key_t *h; 355 417 418 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 419 356 420 /* Lock ioqueue before making fd_set copies */ 357 pj_lock_acquire(ioque->lock); 358 359 if (PJ_FD_COUNT(&ioque->rfdset)==0 && 360 PJ_FD_COUNT(&ioque->wfdset)==0 && 361 PJ_FD_COUNT(&ioque->xfdset)==0) 421 pj_lock_acquire(ioqueue->lock); 422 423 /* We will only do select() when there are sockets to be polled. 424 * Otherwise select() will return error. 425 */ 426 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && 427 PJ_FD_COUNT(&ioqueue->wfdset)==0 && 428 PJ_FD_COUNT(&ioqueue->xfdset)==0) 362 429 { 363 pj_lock_release(ioque ->lock);430 pj_lock_release(ioqueue->lock); 364 431 if (timeout) 365 432 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); … … 368 435 369 436 /* Copy ioqueue's pj_fd_set_t to local variables. */ 370 pj_memcpy(&rfdset, &ioque ->rfdset, sizeof(pj_fd_set_t));371 pj_memcpy(&wfdset, &ioque ->wfdset, sizeof(pj_fd_set_t));437 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t)); 438 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t)); 372 439 #if PJ_HAS_TCP 373 pj_memcpy(&xfdset, &ioque ->xfdset, sizeof(pj_fd_set_t));440 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t)); 374 441 #else 375 442 PJ_FD_ZERO(&xfdset); … … 377 444 378 445 #if VALIDATE_FD_SET 379 validate_sets(ioque , &rfdset, &wfdset, &xfdset);446 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset); 380 447 #endif 381 448 382 449 /* Unlock ioqueue before select(). */ 383 pj_lock_release(ioque ->lock);450 pj_lock_release(ioqueue->lock); 384 451 385 452 count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout); … … 388 455 return count; 389 456 390 /* Lock ioqueue again before scanning for signalled sockets. */ 391 pj_lock_acquire(ioque->lock); 392 393 #if PJ_HAS_TCP 394 /* Scan for exception socket */ 395 h = ioque->hlist.next; 396 do_except_scan: 397 for ( ; h!=&ioque->hlist; h = h->next) { 398 if ((h->op & PJ_IOQUEUE_OP_CONNECT) && PJ_FD_ISSET(h->fd, &xfdset)) 399 break; 400 } 401 if (h != &ioque->hlist) { 402 /* 'connect()' should be the only operation. */ 403 pj_assert((h->op == PJ_IOQUEUE_OP_CONNECT)); 404 405 /* Clear operation. */ 406 h->op &= ~(PJ_IOQUEUE_OP_CONNECT); 407 PJ_FD_CLR(h->fd, &ioque->wfdset); 408 PJ_FD_CLR(h->fd, &ioque->xfdset); 409 PJ_FD_CLR(h->fd, &wfdset); 410 PJ_FD_CLR(h->fd, &xfdset); 411 412 /* Call callback. */ 413 if (h->cb.on_connect_complete) 414 (*h->cb.on_connect_complete)(h, -1); 415 416 /* Re-scan exception list. */ 417 goto do_except_scan; 418 } 419 #endif /* PJ_HAS_TCP */ 420 421 /* Scan for readable socket. */ 422 h = ioque->hlist.next; 423 do_readable_scan: 424 for ( ; h!=&ioque->hlist; h = h->next) { 425 if ((PJ_IOQUEUE_IS_READ_OP(h->op) || PJ_IOQUEUE_IS_ACCEPT_OP(h->op)) && 426 PJ_FD_ISSET(h->fd, &rfdset)) 457 /* Lock ioqueue again before scanning for signalled sockets. 458 * We must strictly use recursive mutex since application may invoke 459 * the ioqueue again inside the callback. 460 */ 461 pj_lock_acquire(ioqueue->lock); 462 463 /* Scan for writable sockets first to handle piggy-back data 464 * coming with accept(). 465 */ 466 h = ioqueue->key_list.next; 467 do_writable_scan: 468 for ( ; h!=&ioqueue->key_list; h = h->next) { 469 if ( (!pj_list_empty(&h->write_list) || h->connecting) 470 && PJ_FD_ISSET(h->fd, &wfdset)) 427 471 { 428 472 break; 429 473 } 430 474 } 431 if (h != &ioque->hlist) { 432 pj_status_t rc; 433 434 pj_assert(PJ_IOQUEUE_IS_READ_OP(h->op) || 435 PJ_IOQUEUE_IS_ACCEPT_OP(h->op)); 436 437 # if PJ_HAS_TCP 438 if ((h->op & PJ_IOQUEUE_OP_ACCEPT)) { 439 /* accept() must be the only operation specified on server socket */ 440 pj_assert(h->op == PJ_IOQUEUE_OP_ACCEPT); 441 442 rc=pj_sock_accept(h->fd, h->accept_fd, h->rmt_addr, h->rmt_addrlen); 443 if (rc==0 && h->local_addr) { 444 rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, 445 h->local_addrlen); 446 } 447 448 h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); 449 PJ_FD_CLR(h->fd, &ioque->rfdset); 450 451 /* Call callback. */ 452 if (h->cb.on_accept_complete) 453 (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); 454 455 /* Re-scan readable sockets. */ 456 goto do_readable_scan; 457 } 458 else { 459 # endif 460 pj_ssize_t bytes_read = h->rd_buflen; 461 462 if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { 463 rc = pj_sock_recvfrom(h->fd, h->rd_buf, &bytes_read, 0, 464 h->rmt_addr, h->rmt_addrlen); 465 } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { 466 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); 467 } else { 468 /* 469 * User has specified pj_ioqueue_read(). 470 * On Win32, we should do ReadFile(). But because we got 471 * here because of select() anyway, user must have put a 472 * socket descriptor on h->fd, which in this case we can 473 * just call pj_sock_recv() instead of ReadFile(). 474 * On Unix, user may put a file in h->fd, so we'll have 475 * to call read() here. 476 * This may not compile on systems which doesn't have 477 * read(). That's why we only specify PJ_LINUX here so 478 * that error is easier to catch. 479 */ 480 # if defined(PJ_WIN32) && PJ_WIN32 != 0 481 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); 482 # elif (defined(PJ_LINUX) && PJ_LINUX != 0) || \ 483 (defined(PJ_SUNOS) && PJ_SUNOS != 0) 484 bytes_read = read(h->fd, h->rd_buf, bytes_read); 485 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 486 # elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 487 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); 488 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 489 # else 490 # error "Implement read() for this platform!" 491 # endif 492 } 493 494 if (rc != PJ_SUCCESS) { 495 # if defined(PJ_WIN32) && PJ_WIN32 != 0 496 /* On Win32, for UDP, WSAECONNRESET on the receive side 497 * indicates that previous sending has triggered ICMP Port 498 * Unreachable message. 499 * But we wouldn't know at this point which one of previous 500 * key that has triggered the error, since UDP socket can 501 * be shared! 502 * So we'll just ignore it! 503 */ 504 505 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { 506 PJ_LOG(4,(THIS_FILE, 507 "Ignored ICMP port unreach. on key=%p", h)); 508 } 509 # endif 510 511 /* In any case we would report this to caller. */ 512 bytes_read = -rc; 513 } 514 515 h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | 516 PJ_IOQUEUE_OP_RECV_FROM); 517 PJ_FD_CLR(h->fd, &ioque->rfdset); 518 PJ_FD_CLR(h->fd, &rfdset); 519 520 /* Call callback. */ 521 if (h->cb.on_read_complete) 522 (*h->cb.on_read_complete)(h, bytes_read); 523 524 /* Re-scan readable sockets. */ 525 goto do_readable_scan; 526 527 } 528 } 529 530 /* Scan for writable socket */ 531 h = ioque->hlist.next; 532 do_writable_scan: 533 for ( ; h!=&ioque->hlist; h = h->next) { 534 if ((PJ_IOQUEUE_IS_WRITE_OP(h->op) || PJ_IOQUEUE_IS_CONNECT_OP(h->op)) 535 && PJ_FD_ISSET(h->fd, &wfdset)) 536 { 537 break; 538 } 539 } 540 if (h != &ioque->hlist) { 541 pj_assert(PJ_IOQUEUE_IS_WRITE_OP(h->op) || 542 PJ_IOQUEUE_IS_CONNECT_OP(h->op)); 475 if (h != &ioqueue->key_list) { 476 pj_assert(!pj_list_empty(&h->write_list) || h->connecting); 543 477 544 478 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 545 if ( (h->op & PJ_IOQUEUE_OP_CONNECT)) {479 if (h->connecting) { 546 480 /* Completion of connect() operation */ 547 481 pj_ssize_t bytes_transfered; 548 482 549 #if (defined(PJ_LINUX) && PJ_LINUX!=0) || \ 550 (defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0) 483 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) 551 484 /* from connect(2): 552 553 554 555 485 * On Linux, use getsockopt to read the SO_ERROR option at 486 * level SOL_SOCKET to determine whether connect() completed 487 * successfully (if SO_ERROR is zero). 488 */ 556 489 int value; 557 490 socklen_t vallen = sizeof(value); … … 590 523 591 524 /* Clear operation. */ 592 h-> op &= (~PJ_IOQUEUE_OP_CONNECT);593 PJ_FD_CLR(h->fd, &ioque ->wfdset);594 PJ_FD_CLR(h->fd, &ioque ->xfdset);525 h->connecting = 0; 526 PJ_FD_CLR(h->fd, &ioqueue->wfdset); 527 PJ_FD_CLR(h->fd, &ioqueue->xfdset); 595 528 596 529 /* Call callback. */ … … 604 537 #endif /* PJ_HAS_TCP */ 605 538 { 606 /* Completion of write(), send(), or sendto() operation. */ 607 608 /* Clear operation. */ 609 h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | 610 PJ_IOQUEUE_OP_SEND_TO); 611 PJ_FD_CLR(h->fd, &ioque->wfdset); 539 /* Socket is writable. */ 540 struct write_operation *write_op; 541 pj_ssize_t sent; 542 pj_status_t send_rc; 543 544 /* Get the first in the queue. */ 545 write_op = h->write_list.next; 546 547 /* Send the data. */ 548 sent = write_op->size - write_op->written; 549 if (write_op->op == PJ_IOQUEUE_OP_SEND) { 550 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, 551 &sent, write_op->flags); 552 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { 553 send_rc = pj_sock_sendto(h->fd, 554 write_op->buf+write_op->written, 555 &sent, write_op->flags, 556 &write_op->rmt_addr, 557 write_op->rmt_addrlen); 558 } else { 559 pj_assert(!"Invalid operation type!"); 560 send_rc = PJ_EBUG; 561 } 562 563 if (send_rc == PJ_SUCCESS) { 564 write_op->written += sent; 565 } else { 566 pj_assert(send_rc > 0); 567 write_op->written = -send_rc; 568 } 569 570 /* In any case we don't need to process this descriptor again. */ 612 571 PJ_FD_CLR(h->fd, &wfdset); 613 572 614 /* Call callback. */ 615 /* All data must have been sent? */ 616 if (h->cb.on_write_complete) 617 (*h->cb.on_write_complete)(h, h->wr_buflen); 618 573 /* Are we finished with this buffer? */ 574 if (send_rc!=PJ_SUCCESS || 575 write_op->written == (pj_ssize_t)write_op->size) 576 { 577 pj_list_erase(write_op); 578 579 /* Clear operation if there's no more data to send. */ 580 if (pj_list_empty(&h->write_list)) 581 PJ_FD_CLR(h->fd, &ioqueue->wfdset); 582 583 /* Call callback. */ 584 if (h->cb.on_write_complete) { 585 (*h->cb.on_write_complete)(h, 586 (pj_ioqueue_op_key_t*)write_op, 587 write_op->written); 588 } 589 } 590 619 591 /* Re-scan writable sockets. */ 620 592 goto do_writable_scan; 621 593 } 622 594 } 595 596 /* Scan for readable socket. */ 597 h = ioqueue->key_list.next; 598 do_readable_scan: 599 for ( ; h!=&ioqueue->key_list; h = h->next) { 600 if ((!pj_list_empty(&h->read_list) 601 #if PJ_HAS_TCP 602 || !pj_list_empty(&h->accept_list) 603 #endif 604 ) && PJ_FD_ISSET(h->fd, &rfdset)) 605 { 606 break; 607 } 608 } 609 if (h != &ioqueue->key_list) { 610 pj_status_t rc; 611 612 #if PJ_HAS_TCP 613 pj_assert(!pj_list_empty(&h->read_list) || 614 !pj_list_empty(&h->accept_list)); 615 #else 616 pj_assert(!pj_list_empty(&h->read_list)); 617 #endif 618 619 # if PJ_HAS_TCP 620 if (!pj_list_empty(&h->accept_list)) { 621 622 struct accept_operation *accept_op; 623 624 /* Get one accept operation from the list. */ 625 accept_op = h->accept_list.next; 626 pj_list_erase(accept_op); 627 628 rc=pj_sock_accept(h->fd, accept_op->accept_fd, 629 accept_op->rmt_addr, accept_op->addrlen); 630 if (rc==PJ_SUCCESS && accept_op->local_addr) { 631 rc = pj_sock_getsockname(*accept_op->accept_fd, 632 accept_op->local_addr, 633 accept_op->addrlen); 634 } 635 636 /* Clear bit in fdset if there is no more pending accept */ 637 if (pj_list_empty(&h->accept_list)) 638 PJ_FD_CLR(h->fd, &ioqueue->rfdset); 639 640 /* Call callback. */ 641 if (h->cb.on_accept_complete) 642 (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, 643 *accept_op->accept_fd, rc); 644 645 /* Re-scan readable sockets. */ 646 goto do_readable_scan; 647 } 648 else { 649 # endif 650 struct read_operation *read_op; 651 pj_ssize_t bytes_read; 652 653 pj_assert(!pj_list_empty(&h->read_list)); 654 655 /* Get one pending read operation from the list. */ 656 read_op = h->read_list.next; 657 pj_list_erase(read_op); 658 659 bytes_read = read_op->size; 660 661 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { 662 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, 663 read_op->rmt_addr, 664 read_op->rmt_addrlen); 665 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { 666 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 667 } else { 668 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); 669 /* 670 * User has specified pj_ioqueue_read(). 671 * On Win32, we should do ReadFile(). But because we got 672 * here because of select() anyway, user must have put a 673 * socket descriptor on h->fd, which in this case we can 674 * just call pj_sock_recv() instead of ReadFile(). 675 * On Unix, user may put a file in h->fd, so we'll have 676 * to call read() here. 677 * This may not compile on systems which doesn't have 678 * read(). That's why we only specify PJ_LINUX here so 679 * that error is easier to catch. 680 */ 681 # if defined(PJ_WIN32) && PJ_WIN32 != 0 682 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 683 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, 684 // &bytes_read, NULL); 685 # elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) 686 bytes_read = read(h->fd, h->rd_buf, bytes_read); 687 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 688 # elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 689 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); 690 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 691 # else 692 # error "Implement read() for this platform!" 693 # endif 694 } 695 696 if (rc != PJ_SUCCESS) { 697 # if defined(PJ_WIN32) && PJ_WIN32 != 0 698 /* On Win32, for UDP, WSAECONNRESET on the receive side 699 * indicates that previous sending has triggered ICMP Port 700 * Unreachable message. 701 * But we wouldn't know at this point which one of previous 702 * key that has triggered the error, since UDP socket can 703 * be shared! 704 * So we'll just ignore it! 705 */ 706 707 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { 708 //PJ_LOG(4,(THIS_FILE, 709 // "Ignored ICMP port unreach. on key=%p", h)); 710 } 711 # endif 712 713 /* In any case we would report this to caller. */ 714 bytes_read = -rc; 715 } 716 717 /* Clear fdset if there is no pending read. */ 718 if (pj_list_empty(&h->read_list)) 719 PJ_FD_CLR(h->fd, &ioqueue->rfdset); 720 721 /* In any case clear from temporary set. */ 722 PJ_FD_CLR(h->fd, &rfdset); 723 724 /* Call callback. */ 725 if (h->cb.on_read_complete) 726 (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, 727 bytes_read); 728 729 /* Re-scan readable sockets. */ 730 goto do_readable_scan; 731 732 } 733 } 734 735 #if PJ_HAS_TCP 736 /* Scan for exception socket for TCP connection error. */ 737 h = ioqueue->key_list.next; 738 do_except_scan: 739 for ( ; h!=&ioqueue->key_list; h = h->next) { 740 if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset)) 741 break; 742 } 743 if (h != &ioqueue->key_list) { 744 745 pj_assert(h->connecting); 746 747 /* Clear operation. */ 748 h->connecting = 0; 749 PJ_FD_CLR(h->fd, &ioqueue->wfdset); 750 PJ_FD_CLR(h->fd, &ioqueue->xfdset); 751 PJ_FD_CLR(h->fd, &wfdset); 752 PJ_FD_CLR(h->fd, &xfdset); 753 754 /* Call callback. */ 755 if (h->cb.on_connect_complete) 756 (*h->cb.on_connect_complete)(h, -1); 757 758 /* Re-scan exception list. */ 759 goto do_except_scan; 760 } 761 #endif /* PJ_HAS_TCP */ 623 762 624 763 /* Shouldn't happen. */ … … 629 768 //count = 0; 630 769 631 pj_lock_release(ioque ->lock);770 pj_lock_release(ioqueue->lock); 632 771 return count; 633 772 } 634 773 635 774 /* 636 * pj_ioqueue_read() 637 * 638 * Start asynchronous read from the descriptor. 639 */ 640 PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, 641 pj_ioqueue_key_t *key, 642 void *buffer, 643 pj_size_t buflen) 644 { 645 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 775 * pj_ioqueue_recv() 776 * 777 * Start asynchronous recv() from the socket. 778 */ 779 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, 780 pj_ioqueue_op_key_t *op_key, 781 void *buffer, 782 pj_ssize_t *length, 783 unsigned flags ) 784 { 785 pj_status_t status; 786 pj_ssize_t size; 787 struct read_operation *read_op; 788 pj_ioqueue_t *ioqueue; 789 790 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 646 791 PJ_CHECK_STACK(); 647 792 648 /* For consistency with other ioqueue implementation, we would reject 649 * if descriptor has already been submitted for reading before. 650 */ 651 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 652 (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 653 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 654 PJ_EBUSY); 655 656 pj_lock_acquire(ioque->lock); 657 658 key->op |= PJ_IOQUEUE_OP_READ; 659 key->rd_flags = 0; 660 key->rd_buf = buffer; 661 key->rd_buflen = buflen; 662 PJ_FD_SET(key->fd, &ioque->rfdset); 663 664 pj_lock_release(ioque->lock); 793 /* Try to see if there's data immediately available. 794 */ 795 size = *length; 796 status = pj_sock_recv(key->fd, buffer, &size, flags); 797 if (status == PJ_SUCCESS) { 798 /* Yes! Data is available! */ 799 *length = size; 800 return PJ_SUCCESS; 801 } else { 802 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 803 * the error to caller. 804 */ 805 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) 806 return status; 807 } 808 809 /* 810 * No data is immediately available. 811 * Must schedule asynchronous operation to the ioqueue. 812 */ 813 ioqueue = key->ioqueue; 814 pj_lock_acquire(ioqueue->lock); 815 816 read_op = (struct read_operation*)op_key; 817 818 read_op->op = PJ_IOQUEUE_OP_RECV; 819 read_op->buf = buffer; 820 read_op->size = *length; 821 read_op->flags = flags; 822 823 pj_list_insert_before(&key->read_list, read_op); 824 PJ_FD_SET(key->fd, &ioqueue->rfdset); 825 826 pj_lock_release(ioqueue->lock); 665 827 return PJ_EPENDING; 666 828 } 667 829 668 669 /*670 * pj_ioqueue_recv()671 *672 * Start asynchronous recv() from the socket.673 */674 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,675 pj_ioqueue_key_t *key,676 void *buffer,677 pj_size_t buflen,678 unsigned flags )679 {680 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);681 PJ_CHECK_STACK();682 683 /* For consistency with other ioqueue implementation, we would reject684 * if descriptor has already been submitted for reading before.685 */686 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&687 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&688 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),689 PJ_EBUSY);690 691 pj_lock_acquire(ioque->lock);692 693 key->op |= PJ_IOQUEUE_OP_RECV;694 key->rd_buf = buffer;695 key->rd_buflen = buflen;696 key->rd_flags = flags;697 PJ_FD_SET(key->fd, &ioque->rfdset);698 699 pj_lock_release(ioque->lock);700 return PJ_EPENDING;701 }702 703 830 /* 704 831 * pj_ioqueue_recvfrom() … … 706 833 * Start asynchronous recvfrom() from the socket. 707 834 */ 708 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_ t *ioque,709 pj_ioqueue_key_t *key,835 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 836 pj_ioqueue_op_key_t *op_key, 710 837 void *buffer, 711 pj_s ize_t buflen,838 pj_ssize_t *length, 712 839 unsigned flags, 713 840 pj_sockaddr_t *addr, 714 841 int *addrlen) 715 842 { 716 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 843 pj_status_t status; 844 pj_ssize_t size; 845 struct read_operation *read_op; 846 pj_ioqueue_t *ioqueue; 847 848 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 717 849 PJ_CHECK_STACK(); 718 850 719 /* For consistency with other ioqueue implementation, we would reject 720 * if descriptor has already been submitted for reading before. 721 */ 722 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 723 (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 724 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 725 PJ_EBUSY); 726 727 pj_lock_acquire(ioque->lock); 728 729 key->op |= PJ_IOQUEUE_OP_RECV_FROM; 730 key->rd_buf = buffer; 731 key->rd_buflen = buflen; 732 key->rd_flags = flags; 733 key->rmt_addr = addr; 734 key->rmt_addrlen = addrlen; 735 PJ_FD_SET(key->fd, &ioque->rfdset); 736 737 pj_lock_release(ioque->lock); 851 /* Try to see if there's data immediately available. 852 */ 853 size = *length; 854 status = pj_sock_recvfrom(key->fd, buffer, &size, flags, 855 addr, addrlen); 856 if (status == PJ_SUCCESS) { 857 /* Yes! Data is available! */ 858 *length = size; 859 return PJ_SUCCESS; 860 } else { 861 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 862 * the error to caller. 863 */ 864 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) 865 return status; 866 } 867 868 /* 869 * No data is immediately available. 870 * Must schedule asynchronous operation to the ioqueue. 871 */ 872 ioqueue = key->ioqueue; 873 pj_lock_acquire(ioqueue->lock); 874 875 read_op = (struct read_operation*)op_key; 876 877 read_op->op = PJ_IOQUEUE_OP_RECV_FROM; 878 read_op->buf = buffer; 879 read_op->size = *length; 880 read_op->flags = flags; 881 read_op->rmt_addr = addr; 882 read_op->rmt_addrlen = addrlen; 883 884 pj_list_insert_before(&key->read_list, read_op); 885 PJ_FD_SET(key->fd, &ioqueue->rfdset); 886 887 pj_lock_release(ioqueue->lock); 738 888 return PJ_EPENDING; 739 889 } 740 890 741 891 /* 742 * pj_ioqueue_write() 892 * pj_ioqueue_send() 893 * 894 * Start asynchronous send() to the descriptor. 895 */ 896 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 897 pj_ioqueue_op_key_t *op_key, 898 const void *data, 899 pj_ssize_t *length, 900 unsigned flags) 901 { 902 pj_ioqueue_t *ioqueue; 903 struct write_operation *write_op; 904 pj_status_t status; 905 pj_ssize_t sent; 906 907 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 908 PJ_CHECK_STACK(); 909 910 /* Fast track: 911 * Try to send data immediately, only if there's no pending write! 912 * Note: 913 * We are speculating that the list is empty here without properly 914 * acquiring ioqueue's mutex first. This is intentional, to maximize 915 * performance via parallelism. 916 * 917 * This should be safe, because: 918 * - by convention, we require caller to make sure that the 919 * key is not unregistered while other threads are invoking 920 * an operation on the same key. 921 * - pj_list_empty() is safe to be invoked by multiple threads, 922 * even when other threads are modifying the list. 923 */ 924 if (pj_list_empty(&key->write_list)) { 925 /* 926 * See if data can be sent immediately. 927 */ 928 sent = *length; 929 status = pj_sock_send(key->fd, data, &sent, flags); 930 if (status == PJ_SUCCESS) { 931 /* Success! */ 932 *length = sent; 933 return PJ_SUCCESS; 934 } else { 935 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 936 * the error to caller. 937 */ 938 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 939 return status; 940 } 941 } 942 } 943 944 /* 945 * Schedule asynchronous send. 946 */ 947 ioqueue = key->ioqueue; 948 pj_lock_acquire(ioqueue->lock); 949 950 write_op = (struct write_operation*)op_key; 951 write_op->op = PJ_IOQUEUE_OP_SEND; 952 write_op->buf = NULL; 953 write_op->size = *length; 954 write_op->written = 0; 955 write_op->flags = flags; 956 957 pj_list_insert_before(&key->write_list, write_op); 958 PJ_FD_SET(key->fd, &ioqueue->wfdset); 959 960 pj_lock_release(ioqueue->lock); 961 962 return PJ_EPENDING; 963 } 964 965 966 /* 967 * pj_ioqueue_sendto() 743 968 * 744 969 * Start asynchronous write() to the descriptor. 745 970 */ 746 PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, 747 pj_ioqueue_key_t *key, 748 const void *data, 749 pj_size_t datalen) 750 { 751 pj_status_t rc; 752 pj_ssize_t sent; 753 754 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 755 PJ_CHECK_STACK(); 756 757 /* For consistency with other ioqueue implementation, we would reject 758 * if descriptor has already been submitted for writing before. 759 */ 760 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 761 (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 762 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 763 PJ_EBUSY); 764 765 sent = datalen; 766 /* sent would be -1 after pj_sock_send() if it returns error. */ 767 rc = pj_sock_send(key->fd, data, &sent, 0); 768 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 769 return rc; 770 } 771 772 pj_lock_acquire(ioque->lock); 773 774 key->op |= PJ_IOQUEUE_OP_WRITE; 775 key->wr_buf = NULL; 776 key->wr_buflen = datalen; 777 PJ_FD_SET(key->fd, &ioque->wfdset); 778 779 pj_lock_release(ioque->lock); 780 781 return PJ_EPENDING; 782 } 783 784 /* 785 * pj_ioqueue_send() 786 * 787 * Start asynchronous send() to the descriptor. 788 */ 789 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, 790 pj_ioqueue_key_t *key, 791 const void *data, 792 pj_size_t datalen, 793 unsigned flags) 794 { 795 pj_status_t rc; 796 pj_ssize_t sent; 797 798 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 799 PJ_CHECK_STACK(); 800 801 /* For consistency with other ioqueue implementation, we would reject 802 * if descriptor has already been submitted for writing before. 803 */ 804 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 805 (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 806 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 807 PJ_EBUSY); 808 809 sent = datalen; 810 /* sent would be -1 after pj_sock_send() if it returns error. */ 811 rc = pj_sock_send(key->fd, data, &sent, flags); 812 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 813 return rc; 814 } 815 816 pj_lock_acquire(ioque->lock); 817 818 key->op |= PJ_IOQUEUE_OP_SEND; 819 key->wr_buf = NULL; 820 key->wr_buflen = datalen; 821 PJ_FD_SET(key->fd, &ioque->wfdset); 822 823 pj_lock_release(ioque->lock); 824 825 return PJ_EPENDING; 826 } 827 828 829 /* 830 * pj_ioqueue_sendto() 831 * 832 * Start asynchronous write() to the descriptor. 833 */ 834 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, 835 pj_ioqueue_key_t *key, 971 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 972 pj_ioqueue_op_key_t *op_key, 836 973 const void *data, 837 pj_s ize_t datalen,974 pj_ssize_t *length, 838 975 unsigned flags, 839 976 const pj_sockaddr_t *addr, 840 977 int addrlen) 841 978 { 842 pj_status_t rc; 979 pj_ioqueue_t *ioqueue; 980 struct write_operation *write_op; 981 pj_status_t status; 843 982 pj_ssize_t sent; 844 983 845 PJ_ASSERT_RETURN( ioque && key && data, PJ_EINVAL);984 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 846 985 PJ_CHECK_STACK(); 847 986 848 /* For consistency with other ioqueue implementation, we would reject 849 * if descriptor has already been submitted for writing before. 850 */ 851 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 852 (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 853 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 854 PJ_EBUSY); 855 856 sent = datalen; 857 /* sent would be -1 after pj_sock_sendto() if it returns error. */ 858 rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); 859 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 860 return rc; 861 } 862 863 pj_lock_acquire(ioque->lock); 864 865 key->op |= PJ_IOQUEUE_OP_SEND_TO; 866 key->wr_buf = NULL; 867 key->wr_buflen = datalen; 868 PJ_FD_SET(key->fd, &ioque->wfdset); 869 870 pj_lock_release(ioque->lock); 987 /* Fast track: 988 * Try to send data immediately, only if there's no pending write! 989 * Note: 990 * We are speculating that the list is empty here without properly 991 * acquiring ioqueue's mutex first. This is intentional, to maximize 992 * performance via parallelism. 993 * 994 * This should be safe, because: 995 * - by convention, we require caller to make sure that the 996 * key is not unregistered while other threads are invoking 997 * an operation on the same key. 998 * - pj_list_empty() is safe to be invoked by multiple threads, 999 * even when other threads are modifying the list. 1000 */ 1001 if (pj_list_empty(&key->write_list)) { 1002 /* 1003 * See if data can be sent immediately. 1004 */ 1005 sent = *length; 1006 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); 1007 if (status == PJ_SUCCESS) { 1008 /* Success! */ 1009 *length = sent; 1010 return PJ_SUCCESS; 1011 } else { 1012 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 1013 * the error to caller. 1014 */ 1015 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 1016 return status; 1017 } 1018 } 1019 } 1020 1021 /* 1022 * Check that address storage can hold the address parameter. 1023 */ 1024 PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG); 1025 1026 /* 1027 * Schedule asynchronous send. 1028 */ 1029 ioqueue = key->ioqueue; 1030 pj_lock_acquire(ioqueue->lock); 1031 1032 write_op = (struct write_operation*)op_key; 1033 write_op->op = PJ_IOQUEUE_OP_SEND_TO; 1034 write_op->buf = NULL; 1035 write_op->size = *length; 1036 write_op->written = 0; 1037 write_op->flags = flags; 1038 pj_memcpy(&write_op->rmt_addr, addr, addrlen); 1039 write_op->rmt_addrlen = addrlen; 1040 1041 pj_list_insert_before(&key->write_list, write_op); 1042 PJ_FD_SET(key->fd, &ioqueue->wfdset); 1043 1044 pj_lock_release(ioqueue->lock); 1045 871 1046 return PJ_EPENDING; 872 1047 } … … 876 1051 * Initiate overlapped accept() operation. 877 1052 */ 878 PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, 879 pj_ioqueue_key_t *key, 880 pj_sock_t *new_sock, 881 pj_sockaddr_t *local, 882 pj_sockaddr_t *remote, 883 int *addrlen) 884 { 1053 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 1054 pj_ioqueue_op_key_t *op_key, 1055 pj_sock_t *new_sock, 1056 pj_sockaddr_t *local, 1057 pj_sockaddr_t *remote, 1058 int *addrlen) 1059 { 1060 pj_ioqueue_t *ioqueue; 1061 struct accept_operation *accept_op; 1062 pj_status_t status; 1063 885 1064 /* check parameters. All must be specified! */ 886 pj_assert(ioqueue && key && new_sock); 887 888 /* Server socket must have no other operation! */ 889 pj_assert(key->op == 0); 890 1065 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 1066 1067 /* Fast track: 1068 * See if there's new connection available immediately. 1069 */ 1070 if (pj_list_empty(&key->accept_list)) { 1071 status = pj_sock_accept(key->fd, new_sock, remote, addrlen); 1072 if (status == PJ_SUCCESS) { 1073 /* Yes! New connection is available! */ 1074 if (local && addrlen) { 1075 status = pj_sock_getsockname(*new_sock, local, addrlen); 1076 if (status != PJ_SUCCESS) { 1077 pj_sock_close(*new_sock); 1078 *new_sock = PJ_INVALID_SOCKET; 1079 return status; 1080 } 1081 } 1082 return PJ_SUCCESS; 1083 } else { 1084 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 1085 * the error to caller. 1086 */ 1087 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 1088 return status; 1089 } 1090 } 1091 } 1092 1093 /* 1094 * No connection is available immediately. 1095 * Schedule accept() operation to be completed when there is incoming 1096 * connection available. 1097 */ 1098 ioqueue = key->ioqueue; 1099 accept_op = (struct accept_operation*)op_key; 1100 891 1101 pj_lock_acquire(ioqueue->lock); 892 1102 893 key->op = PJ_IOQUEUE_OP_ACCEPT;894 key->accept_fd = new_sock;895 key->rmt_addr = remote;896 key->rmt_addrlen= addrlen;897 key->local_addr = local;898 key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */ 899 1103 accept_op->op = PJ_IOQUEUE_OP_ACCEPT; 1104 accept_op->accept_fd = new_sock; 1105 accept_op->rmt_addr = remote; 1106 accept_op->addrlen= addrlen; 1107 accept_op->local_addr = local; 1108 1109 pj_list_insert_before(&key->accept_list, accept_op); 900 1110 PJ_FD_SET(key->fd, &ioqueue->rfdset); 901 1111 902 1112 pj_lock_release(ioqueue->lock); 1113 903 1114 return PJ_EPENDING; 904 1115 } … … 908 1119 * since there's no overlapped version of connect()). 909 1120 */ 910 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, 911 pj_ioqueue_key_t *key, 1121 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 912 1122 const pj_sockaddr_t *addr, 913 1123 int addrlen ) 914 1124 { 915 pj_status_t rc; 1125 pj_ioqueue_t *ioqueue; 1126 pj_status_t status; 916 1127 917 1128 /* check parameters. All must be specified! */ 918 PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL); 919 920 /* Connecting socket must have no other operation! */ 921 PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY); 1129 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 1130 1131 /* Check if socket has not been marked for connecting */ 1132 if (key->connecting != 0) 1133 return PJ_EPENDING; 922 1134 923 rc= pj_sock_connect(key->fd, addr, addrlen);924 if ( rc== PJ_SUCCESS) {1135 status = pj_sock_connect(key->fd, addr, addrlen); 1136 if (status == PJ_SUCCESS) { 925 1137 /* Connected! */ 926 1138 return PJ_SUCCESS; 927 1139 } else { 928 if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || 929 rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) 930 { 1140 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { 931 1141 /* Pending! */ 1142 ioqueue = key->ioqueue; 932 1143 pj_lock_acquire(ioqueue->lock); 933 key-> op = PJ_IOQUEUE_OP_CONNECT;1144 key->connecting = PJ_TRUE; 934 1145 PJ_FD_SET(key->fd, &ioqueue->wfdset); 935 1146 PJ_FD_SET(key->fd, &ioqueue->xfdset); … … 938 1149 } else { 939 1150 /* Error! */ 940 return rc;1151 return status; 941 1152 } 942 1153 } -
pjproject/main/pjlib/src/pj/ioqueue_winnt.c
r6 r11 24 24 25 25 26 #define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+20) 26 /* The address specified in AcceptEx() must be 16 more than the size of 27 * SOCKADDR (source: MSDN). 28 */ 29 #define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16) 30 31 typedef struct generic_overlapped 32 { 33 WSAOVERLAPPED overlapped; 34 pj_ioqueue_operation_e operation; 35 } generic_overlapped; 27 36 28 37 /* … … 34 43 pj_ioqueue_operation_e operation; 35 44 WSABUF wsabuf; 45 pj_sockaddr_in dummy_addr; 46 int dummy_addrlen; 36 47 } ioqueue_overlapped; 37 48 … … 54 65 55 66 /* 67 * Structure to hold pending operation key. 68 */ 69 union operation_key 70 { 71 generic_overlapped generic; 72 ioqueue_overlapped overlapped; 73 #if PJ_HAS_TCP 74 ioqueue_accept_rec accept; 75 #endif 76 }; 77 78 /* 56 79 * Structure for individual socket. 57 80 */ 58 81 struct pj_ioqueue_key_t 59 82 { 83 pj_ioqueue_t *ioqueue; 60 84 HANDLE hnd; 61 85 void *user_data; 62 ioqueue_overlapped recv_overlapped;63 ioqueue_overlapped send_overlapped;64 86 #if PJ_HAS_TCP 65 87 int connecting; 66 ioqueue_accept_rec accept_overlapped;67 88 #endif 68 89 pj_ioqueue_callback cb; … … 108 129 &remote, 109 130 &remotelen); 110 pj_memcpy(accept_overlapped->local, local, locallen); 111 pj_memcpy(accept_overlapped->remote, remote, locallen); 131 if (*accept_overlapped->addrlen > locallen) { 132 pj_memcpy(accept_overlapped->local, local, locallen); 133 pj_memcpy(accept_overlapped->remote, remote, locallen); 134 } else { 135 pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen); 136 pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen); 137 } 112 138 *accept_overlapped->addrlen = locallen; 113 139 if (accept_overlapped->newsock_ptr) … … 121 147 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; 122 148 HANDLE hEvent = ioqueue->connecting_handles[pos]; 123 unsigned long optval;124 149 125 150 /* Remove key from array of connecting handles. */ … … 144 169 } 145 170 146 /* Set socket to blocking again. */147 optval = 0;148 if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {149 DWORD dwStatus;150 dwStatus = WSAGetLastError();151 }152 171 } 153 172 … … 184 203 ioqueue->connecting_handles[pos], 185 204 &net_events); 186 *connect_err = net_events.iErrorCode[FD_CONNECT_BIT]; 205 *connect_err = 206 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); 187 207 188 208 /* Erase socket from pending connect. */ … … 195 215 #endif 196 216 197 217 /* 218 * pj_ioqueue_create() 219 */ 198 220 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 199 221 pj_size_t max_fd, 200 int max_threads, 201 pj_ioqueue_t **ioqueue) 202 { 203 pj_ioqueue_t *ioq; 222 pj_ioqueue_t **p_ioqueue) 223 { 224 pj_ioqueue_t *ioqueue; 204 225 pj_status_t rc; 205 226 206 227 PJ_UNUSED_ARG(max_fd); 207 PJ_ASSERT_RETURN(pool && ioqueue, PJ_EINVAL); 208 209 ioq = pj_pool_zalloc(pool, sizeof(*ioq)); 210 ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, max_threads); 211 if (ioq->iocp == NULL) 228 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); 229 230 rc = sizeof(union operation_key); 231 232 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */ 233 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= 234 sizeof(union operation_key), PJ_EBUG); 235 236 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); 237 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 238 if (ioqueue->iocp == NULL) 212 239 return PJ_RETURN_OS_ERROR(GetLastError()); 213 240 214 rc = pj_lock_create_simple_mutex(pool, NULL, &ioq ->lock);241 rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); 215 242 if (rc != PJ_SUCCESS) { 216 CloseHandle(ioq ->iocp);243 CloseHandle(ioqueue->iocp); 217 244 return rc; 218 245 } 219 246 220 ioq ->auto_delete_lock = PJ_TRUE;221 222 * ioqueue = ioq;223 224 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioq ));247 ioqueue->auto_delete_lock = PJ_TRUE; 248 249 *p_ioqueue = ioqueue; 250 251 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); 225 252 return PJ_SUCCESS; 226 253 } 227 254 228 PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioque ) 255 /* 256 * pj_ioqueue_destroy() 257 */ 258 PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) 229 259 { 230 260 unsigned i; 231 261 232 262 PJ_CHECK_STACK(); 233 PJ_ASSERT_RETURN(ioque , PJ_EINVAL);263 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 234 264 235 265 /* Destroy events in the pool */ 236 for (i=0; i<ioque->event_count; ++i) { 237 CloseHandle(ioque->event_pool[i]); 238 } 239 ioque->event_count = 0; 240 241 if (ioque->auto_delete_lock) 242 pj_lock_destroy(ioque->lock); 243 244 if (CloseHandle(ioque->iocp) == TRUE) 245 return PJ_SUCCESS; 246 else 266 for (i=0; i<ioqueue->event_count; ++i) { 267 CloseHandle(ioqueue->event_pool[i]); 268 } 269 ioqueue->event_count = 0; 270 271 if (CloseHandle(ioqueue->iocp) != TRUE) 247 272 return PJ_RETURN_OS_ERROR(GetLastError()); 248 } 249 250 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, 273 274 if (ioqueue->auto_delete_lock) 275 pj_lock_destroy(ioqueue->lock); 276 277 return PJ_SUCCESS; 278 } 279 280 /* 281 * pj_ioqueue_set_lock() 282 */ 283 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, 251 284 pj_lock_t *lock, 252 285 pj_bool_t auto_delete ) 253 286 { 254 PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);255 256 if (ioque ->auto_delete_lock) {257 pj_lock_destroy(ioque ->lock);258 } 259 260 ioque ->lock = lock;261 ioque ->auto_delete_lock = auto_delete;287 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); 288 289 if (ioqueue->auto_delete_lock) { 290 pj_lock_destroy(ioqueue->lock); 291 } 292 293 ioqueue->lock = lock; 294 ioqueue->auto_delete_lock = auto_delete; 262 295 263 296 return PJ_SUCCESS; 264 297 } 265 298 299 /* 300 * pj_ioqueue_register_sock() 301 */ 266 302 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 267 pj_ioqueue_t *ioque ,268 pj_sock_t hnd,303 pj_ioqueue_t *ioqueue, 304 pj_sock_t sock, 269 305 void *user_data, 270 306 const pj_ioqueue_callback *cb, … … 273 309 HANDLE hioq; 274 310 pj_ioqueue_key_t *rec; 275 276 PJ_ASSERT_RETURN(pool && ioque && cb && key, PJ_EINVAL); 277 311 u_long value; 312 int rc; 313 314 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); 315 316 /* Build the key for this socket. */ 278 317 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 279 rec->hnd = (HANDLE)hnd; 318 rec->ioqueue = ioqueue; 319 rec->hnd = (HANDLE)sock; 280 320 rec->user_data = user_data; 281 321 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 282 #if PJ_HAS_TCP 283 rec->accept_overlapped.newsock = PJ_INVALID_SOCKET; 284 #endif 285 hioq = CreateIoCompletionPort((HANDLE)hnd, ioque->iocp, (DWORD)rec, 0); 322 323 /* Set socket to nonblocking. */ 324 value = 1; 325 rc = ioctlsocket(sock, FIONBIO, &value); 326 if (rc != 0) { 327 return PJ_RETURN_OS_ERROR(WSAGetLastError()); 328 } 329 330 /* Associate with IOCP */ 331 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); 286 332 if (!hioq) { 287 333 return PJ_RETURN_OS_ERROR(GetLastError()); … … 292 338 } 293 339 294 295 296 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, 297 298 { 299 PJ_ASSERT_RETURN( ioque &&key, PJ_EINVAL);340 /* 341 * pj_ioqueue_unregister() 342 */ 343 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 344 { 345 PJ_ASSERT_RETURN(key, PJ_EINVAL); 300 346 301 347 #if PJ_HAS_TCP 302 348 if (key->connecting) { 303 349 unsigned pos; 350 pj_ioqueue_t *ioqueue; 351 352 ioqueue = key->ioqueue; 304 353 305 354 /* Erase from connecting_handles */ 306 pj_lock_acquire(ioque->lock); 307 for (pos=0; pos < ioque->connecting_count; ++pos) { 308 if (ioque->connecting_keys[pos] == key) { 309 erase_connecting_socket(ioque, pos); 310 if (key->accept_overlapped.newsock_ptr) { 311 /* ??? shouldn't it be newsock instead of newsock_ptr??? */ 312 closesocket(*key->accept_overlapped.newsock_ptr); 313 } 355 pj_lock_acquire(ioqueue->lock); 356 for (pos=0; pos < ioqueue->connecting_count; ++pos) { 357 if (ioqueue->connecting_keys[pos] == key) { 358 erase_connecting_socket(ioqueue, pos); 314 359 break; 315 360 } 316 361 } 317 pj_lock_release(ioque->lock);318 362 key->connecting = 0; 363 pj_lock_release(ioqueue->lock); 319 364 } 320 365 #endif … … 322 367 } 323 368 369 /* 370 * pj_ioqueue_get_user_data() 371 */ 324 372 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) 325 373 { … … 329 377 330 378 /* 379 * pj_ioqueue_set_user_data() 380 */ 381 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 382 void *user_data, 383 void **old_data ) 384 { 385 PJ_ASSERT_RETURN(key, PJ_EINVAL); 386 387 if (old_data) 388 *old_data = key->user_data; 389 390 key->user_data = user_data; 391 return PJ_SUCCESS; 392 } 393 394 /* 395 * pj_ioqueue_poll() 396 * 331 397 * Poll for events. 332 398 */ 333 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque , const pj_time_val *timeout)399 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 334 400 { 335 401 DWORD dwMsec, dwBytesTransfered, dwKey; 336 ioqueue_overlapped *ov;402 generic_overlapped *pOv; 337 403 pj_ioqueue_key_t *key; 338 404 pj_ssize_t size_status; 339 405 BOOL rc; 340 406 341 PJ_ASSERT_RETURN(ioque , -PJ_EINVAL);407 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 342 408 343 409 /* Check the connecting array. */ 344 410 #if PJ_HAS_TCP 345 key = check_connecting(ioque , &size_status);411 key = check_connecting(ioqueue, &size_status); 346 412 if (key != NULL) { 347 413 key->cb.on_connect_complete(key, (int)size_status); … … 354 420 355 421 /* Poll for completion status. */ 356 rc = GetQueuedCompletionStatus(ioque ->iocp, &dwBytesTransfered, &dwKey,357 (OVERLAPPED**)& ov, dwMsec);422 rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey, 423 (OVERLAPPED**)&pOv, dwMsec); 358 424 359 425 /* The return value is: 360 426 * - nonzero if event was dequeued. 361 * - zero and ov==NULL if no event was dequeued.362 * - zero and ov!=NULL if event for failed I/O was dequeued.363 */ 364 if ( ov) {427 * - zero and pOv==NULL if no event was dequeued. 428 * - zero and pOv!=NULL if event for failed I/O was dequeued. 429 */ 430 if (pOv) { 365 431 /* Event was dequeued for either successfull or failed I/O */ 366 432 key = (pj_ioqueue_key_t*)dwKey; 367 433 size_status = dwBytesTransfered; 368 switch ( ov->operation) {434 switch (pOv->operation) { 369 435 case PJ_IOQUEUE_OP_READ: 370 436 case PJ_IOQUEUE_OP_RECV: 371 437 case PJ_IOQUEUE_OP_RECV_FROM: 372 key->recv_overlapped.operation = 0;438 pOv->operation = 0; 373 439 if (key->cb.on_read_complete) 374 key->cb.on_read_complete(key, size_status); 440 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, 441 size_status); 375 442 break; 376 443 case PJ_IOQUEUE_OP_WRITE: 377 444 case PJ_IOQUEUE_OP_SEND: 378 445 case PJ_IOQUEUE_OP_SEND_TO: 379 key->send_overlapped.operation = 0;446 pOv->operation = 0; 380 447 if (key->cb.on_write_complete) 381 key->cb.on_write_complete(key, size_status); 448 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, 449 size_status); 382 450 break; 383 451 #if PJ_HAS_TCP 384 452 case PJ_IOQUEUE_OP_ACCEPT: 385 453 /* special case for accept. */ 386 ioqueue_on_accept_complete((ioqueue_accept_rec*)ov); 387 if (key->cb.on_accept_complete) 388 key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 389 0); 454 ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv); 455 if (key->cb.on_accept_complete) { 456 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv; 457 key->cb.on_accept_complete(key, 458 (pj_ioqueue_op_key_t*)pOv, 459 accept_rec->newsock, 460 PJ_SUCCESS); 461 } 390 462 break; 391 463 case PJ_IOQUEUE_OP_CONNECT: … … 399 471 400 472 if (GetLastError()==WAIT_TIMEOUT) { 401 /* Check the connecting array . */402 #if PJ_HAS_TCP 403 key = check_connecting(ioque , &size_status);473 /* Check the connecting array (again). */ 474 #if PJ_HAS_TCP 475 key = check_connecting(ioqueue, &size_status); 404 476 if (key != NULL) { 405 477 key->cb.on_connect_complete(key, (int)size_status); … … 413 485 414 486 /* 415 * pj_ioqueue_read()416 *417 * Initiate overlapped ReadFile operation.418 */419 PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,420 pj_ioqueue_key_t *key,421 void *buffer,422 pj_size_t buflen)423 {424 BOOL rc;425 DWORD bytesRead;426 427 PJ_CHECK_STACK();428 PJ_UNUSED_ARG(ioque);429 430 if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {431 pj_assert(!"Operation already pending for this descriptor");432 return PJ_EBUSY;433 }434 435 pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));436 key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ;437 438 rc = ReadFile(key->hnd, buffer, buflen, &bytesRead,439 &key->recv_overlapped.overlapped);440 if (rc == FALSE) {441 DWORD dwStatus = GetLastError();442 if (dwStatus==ERROR_IO_PENDING)443 return PJ_EPENDING;444 else445 return PJ_STATUS_FROM_OS(dwStatus);446 } else {447 /*448 * This is workaround to a probable bug in Win2000 (probably NT too).449 * Even if 'rc' is TRUE, which indicates operation has completed,450 * GetQueuedCompletionStatus still will return the key.451 * So as work around, we always return PJ_EPENDING here.452 */453 return PJ_EPENDING;454 }455 }456 457 /*458 487 * pj_ioqueue_recv() 459 488 * 460 489 * Initiate overlapped WSARecv() operation. 461 490 */ 462 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_ t *ioque,463 pj_ioqueue_key_t *key,491 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, 492 pj_ioqueue_op_key_t *op_key, 464 493 void *buffer, 465 pj_s ize_t buflen,494 pj_ssize_t *length, 466 495 unsigned flags ) 467 496 { 497 /* 498 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and 499 * addrlen here. But unfortunately it generates EINVAL... :-( 500 * -bennylp 501 */ 468 502 int rc; 469 503 DWORD bytesRead; 470 504 DWORD dwFlags = 0; 505 union operation_key *op_key_rec; 471 506 472 507 PJ_CHECK_STACK(); 473 PJ_UNUSED_ARG(ioque); 474 475 if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 476 pj_assert(!"Operation already pending for this socket"); 477 return PJ_EBUSY; 478 } 479 480 pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); 481 key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ; 482 483 key->recv_overlapped.wsabuf.buf = buffer; 484 key->recv_overlapped.wsabuf.len = buflen; 508 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); 509 510 op_key_rec = (union operation_key*)op_key->internal__; 511 op_key_rec->overlapped.wsabuf.buf = buffer; 512 op_key_rec->overlapped.wsabuf.len = *length; 485 513 486 514 dwFlags = flags; 487 488 rc = WSARecv((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, 489 &bytesRead, &dwFlags, 490 &key->recv_overlapped.overlapped, NULL); 515 516 /* Try non-overlapped received first to see if data is 517 * immediately available. 518 */ 519 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 520 &bytesRead, &dwFlags, NULL, NULL); 521 if (rc == 0) { 522 *length = bytesRead; 523 return PJ_SUCCESS; 524 } else { 525 DWORD dwError = WSAGetLastError(); 526 if (dwError != WSAEWOULDBLOCK) { 527 *length = -1; 528 return PJ_RETURN_OS_ERROR(dwError); 529 } 530 } 531 532 /* 533 * No immediate data available. 534 * Register overlapped Recv() operation. 535 */ 536 pj_memset(&op_key_rec->overlapped.overlapped, 0, 537 sizeof(op_key_rec->overlapped.overlapped)); 538 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; 539 540 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 541 &bytesRead, &dwFlags, 542 &op_key_rec->overlapped.overlapped, NULL); 491 543 if (rc == SOCKET_ERROR) { 492 544 DWORD dwStatus = WSAGetLastError(); 493 if (dwStatus==WSA_IO_PENDING) 494 return PJ_EPENDING; 495 else 545 if (dwStatus!=WSA_IO_PENDING) { 546 *length = -1; 496 547 return PJ_STATUS_FROM_OS(dwStatus); 497 } else { 498 /* Must always return pending status. 499 * See comments on pj_ioqueue_read 500 * return bytesRead; 501 */ 502 return PJ_EPENDING; 503 } 548 } 549 } 550 551 /* Pending operation has been scheduled. */ 552 return PJ_EPENDING; 504 553 } 505 554 … … 509 558 * Initiate overlapped RecvFrom() operation. 510 559 */ 511 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_ t *ioque,512 pj_ioqueue_key_t *key,560 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 561 pj_ioqueue_op_key_t *op_key, 513 562 void *buffer, 514 pj_s ize_t buflen,563 pj_ssize_t *length, 515 564 unsigned flags, 516 565 pj_sockaddr_t *addr, 517 566 int *addrlen) 518 567 { 519 BOOLrc;568 int rc; 520 569 DWORD bytesRead; 521 DWORD dwFlags; 570 DWORD dwFlags = 0; 571 union operation_key *op_key_rec; 522 572 523 573 PJ_CHECK_STACK(); 524 PJ_UNUSED_ARG(ioque); 525 526 if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 527 pj_assert(!"Operation already pending for this socket"); 528 return PJ_EBUSY; 529 } 530 531 pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); 532 key->recv_overlapped.operation = PJ_IOQUEUE_OP_RECV_FROM; 533 key->recv_overlapped.wsabuf.buf = buffer; 534 key->recv_overlapped.wsabuf.len = buflen; 574 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); 575 576 op_key_rec = (union operation_key*)op_key->internal__; 577 op_key_rec->overlapped.wsabuf.buf = buffer; 578 op_key_rec->overlapped.wsabuf.len = *length; 579 535 580 dwFlags = flags; 536 rc = WSARecvFrom((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, 537 &bytesRead, &dwFlags, 538 addr, addrlen, 539 &key->recv_overlapped.overlapped, NULL); 581 582 /* Try non-overlapped received first to see if data is 583 * immediately available. 584 */ 585 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 586 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL); 587 if (rc == 0) { 588 *length = bytesRead; 589 return PJ_SUCCESS; 590 } else { 591 DWORD dwError = WSAGetLastError(); 592 if (dwError != WSAEWOULDBLOCK) { 593 *length = -1; 594 return PJ_RETURN_OS_ERROR(dwError); 595 } 596 } 597 598 /* 599 * No immediate data available. 600 * Register overlapped Recv() operation. 601 */ 602 pj_memset(&op_key_rec->overlapped.overlapped, 0, 603 sizeof(op_key_rec->overlapped.overlapped)); 604 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; 605 606 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 607 &bytesRead, &dwFlags, addr, addrlen, 608 &op_key_rec->overlapped.overlapped, NULL); 540 609 if (rc == SOCKET_ERROR) { 541 610 DWORD dwStatus = WSAGetLastError(); 542 if (dwStatus==WSA_IO_PENDING) 543 return PJ_EPENDING; 544 else 611 if (dwStatus!=WSA_IO_PENDING) { 612 *length = -1; 545 613 return PJ_STATUS_FROM_OS(dwStatus); 546 } else { 547 /* Must always return pending status. 548 * See comments on pj_ioqueue_read 549 * return bytesRead; 550 */ 551 return PJ_EPENDING; 552 } 553 } 554 555 /* 556 * pj_ioqueue_write() 557 * 558 * Initiate overlapped WriteFile() operation. 559 */ 560 PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, 561 pj_ioqueue_key_t *key, 562 const void *data, 563 pj_size_t datalen) 564 { 565 BOOL rc; 566 DWORD bytesWritten; 567 568 PJ_CHECK_STACK(); 569 PJ_UNUSED_ARG(ioque); 570 571 if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 572 pj_assert(!"Operation already pending for this descriptor"); 573 return PJ_EBUSY; 574 } 575 576 pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); 577 key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; 578 rc = WriteFile(key->hnd, data, datalen, &bytesWritten, 579 &key->send_overlapped.overlapped); 614 } 615 } 580 616 581 if (rc == FALSE) { 582 DWORD dwStatus = GetLastError(); 583 if (dwStatus==ERROR_IO_PENDING) 584 return PJ_EPENDING; 585 else 586 return PJ_STATUS_FROM_OS(dwStatus); 587 } else { 588 /* Must always return pending status. 589 * See comments on pj_ioqueue_read 590 * return bytesWritten; 591 */ 592 return PJ_EPENDING; 593 } 594 } 595 617 /* Pending operation has been scheduled. */ 618 return PJ_EPENDING; 619 } 596 620 597 621 /* … … 600 624 * Initiate overlapped Send operation. 601 625 */ 602 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_ t *ioque,603 pj_ioqueue_key_t *key,626 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 627 pj_ioqueue_op_key_t *op_key, 604 628 const void *data, 605 pj_s ize_t datalen,629 pj_ssize_t *length, 606 630 unsigned flags ) 631 { 632 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0); 633 } 634 635 636 /* 637 * pj_ioqueue_sendto() 638 * 639 * Initiate overlapped SendTo operation. 640 */ 641 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 642 pj_ioqueue_op_key_t *op_key, 643 const void *data, 644 pj_ssize_t *length, 645 unsigned flags, 646 const pj_sockaddr_t *addr, 647 int addrlen) 607 648 { 608 649 int rc; 609 650 DWORD bytesWritten; 610 651 DWORD dwFlags; 652 union operation_key *op_key_rec; 611 653 612 654 PJ_CHECK_STACK(); 613 PJ_UNUSED_ARG(ioque); 614 615 if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 616 pj_assert(!"Operation already pending for this socket"); 617 return PJ_EBUSY; 618 } 619 620 pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); 621 key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; 622 key->send_overlapped.wsabuf.buf = (void*)data; 623 key->send_overlapped.wsabuf.len = datalen; 655 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); 656 657 op_key_rec = (union operation_key*)op_key->internal__; 658 624 659 dwFlags = flags; 625 rc = WSASend((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, 626 &bytesWritten, dwFlags, 627 &key->send_overlapped.overlapped, NULL); 660 661 /* 662 * First try blocking write. 663 */ 664 op_key_rec->overlapped.wsabuf.buf = (void*)data; 665 op_key_rec->overlapped.wsabuf.len = *length; 666 667 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 668 &bytesWritten, dwFlags, addr, addrlen, 669 NULL, NULL); 670 if (rc == 0) { 671 *length = bytesWritten; 672 return PJ_SUCCESS; 673 } else { 674 DWORD dwStatus = WSAGetLastError(); 675 if (dwStatus != WSAEWOULDBLOCK) { 676 *length = -1; 677 return PJ_RETURN_OS_ERROR(dwStatus); 678 } 679 } 680 681 /* 682 * Data can't be sent immediately. 683 * Schedule asynchronous WSASend(). 684 */ 685 pj_memset(&op_key_rec->overlapped.overlapped, 0, 686 sizeof(op_key_rec->overlapped.overlapped)); 687 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND; 688 689 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 690 &bytesWritten, dwFlags, addr, addrlen, 691 &op_key_rec->overlapped.overlapped, NULL); 628 692 if (rc == SOCKET_ERROR) { 629 693 DWORD dwStatus = WSAGetLastError(); 630 if (dwStatus==WSA_IO_PENDING) 631 return PJ_EPENDING; 632 else 694 if (dwStatus!=WSA_IO_PENDING) 633 695 return PJ_STATUS_FROM_OS(dwStatus); 634 } else { 635 /* Must always return pending status. 636 * See comments on pj_ioqueue_read 637 * return bytesRead; 638 */ 639 return PJ_EPENDING; 640 } 641 } 642 643 644 /* 645 * pj_ioqueue_sendto() 646 * 647 * Initiate overlapped SendTo operation. 648 */ 649 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, 650 pj_ioqueue_key_t *key, 651 const void *data, 652 pj_size_t datalen, 653 unsigned flags, 654 const pj_sockaddr_t *addr, 655 int addrlen) 656 { 657 BOOL rc; 658 DWORD bytesSent; 659 DWORD dwFlags; 660 661 PJ_CHECK_STACK(); 662 PJ_UNUSED_ARG(ioque); 663 664 if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 665 pj_assert(!"Operation already pending for this socket"); 666 return PJ_EBUSY; 667 } 668 669 pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); 670 key->send_overlapped.operation = PJ_IOQUEUE_OP_SEND_TO; 671 key->send_overlapped.wsabuf.buf = (char*)data; 672 key->send_overlapped.wsabuf.len = datalen; 673 dwFlags = flags; 674 rc = WSASendTo((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, 675 &bytesSent, dwFlags, addr, 676 addrlen, &key->send_overlapped.overlapped, NULL); 677 if (rc == SOCKET_ERROR) { 678 DWORD dwStatus = WSAGetLastError(); 679 if (dwStatus==WSA_IO_PENDING) 680 return PJ_EPENDING; 681 else 682 return PJ_STATUS_FROM_OS(dwStatus); 683 } else { 684 // Must always return pending status. 685 // See comments on pj_ioqueue_read 686 // return bytesSent; 687 return PJ_EPENDING; 688 } 696 } 697 698 /* Asynchronous operation successfully submitted. */ 699 return PJ_EPENDING; 689 700 } 690 701 … … 696 707 * Initiate overlapped accept() operation. 697 708 */ 698 PJ_DEF( int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,699 pj_ioqueue_key_t *key,700 pj_sock_t *new_sock,701 pj_sockaddr_t *local,702 pj_sockaddr_t *remote,703 int *addrlen)709 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 710 pj_ioqueue_op_key_t *op_key, 711 pj_sock_t *new_sock, 712 pj_sockaddr_t *local, 713 pj_sockaddr_t *remote, 714 int *addrlen) 704 715 { 705 716 BOOL rc; 706 717 DWORD bytesReceived; 707 718 pj_status_t status; 719 union operation_key *op_key_rec; 720 SOCKET sock; 708 721 709 722 PJ_CHECK_STACK(); 710 PJ_UNUSED_ARG(ioqueue); 711 712 if (key->accept_overlapped.operation != PJ_IOQUEUE_OP_NONE) { 713 pj_assert(!"Operation already pending for this socket"); 714 return PJ_EBUSY; 715 } 716 717 if (key->accept_overlapped.newsock == PJ_INVALID_SOCKET) { 718 pj_sock_t sock; 719 status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock); 720 if (status != PJ_SUCCESS) 721 return status; 722 723 key->accept_overlapped.newsock = sock; 724 } 725 key->accept_overlapped.operation = PJ_IOQUEUE_OP_ACCEPT; 726 key->accept_overlapped.addrlen = addrlen; 727 key->accept_overlapped.local = local; 728 key->accept_overlapped.remote = remote; 729 key->accept_overlapped.newsock_ptr = new_sock; 730 pj_memset(&key->accept_overlapped.overlapped, 0, 731 sizeof(key->accept_overlapped.overlapped)); 732 733 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)key->accept_overlapped.newsock, 734 key->accept_overlapped.accept_buf, 723 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 724 725 /* 726 * See if there is a new connection immediately available. 727 */ 728 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0); 729 if (sock != INVALID_SOCKET) { 730 /* Yes! New socket is available! */ 731 int status; 732 733 status = getsockname(sock, local, addrlen); 734 if (status != 0) { 735 DWORD dwError = WSAGetLastError(); 736 closesocket(sock); 737 return PJ_RETURN_OS_ERROR(dwError); 738 } 739 740 *new_sock = sock; 741 return PJ_SUCCESS; 742 743 } else { 744 DWORD dwError = WSAGetLastError(); 745 if (dwError != WSAEWOULDBLOCK) { 746 return PJ_RETURN_OS_ERROR(dwError); 747 } 748 } 749 750 /* 751 * No connection is immediately available. 752 * Must schedule an asynchronous operation. 753 */ 754 op_key_rec = (union operation_key*)op_key->internal__; 755 756 status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, 757 &op_key_rec->accept.newsock); 758 if (status != PJ_SUCCESS) 759 return status; 760 761 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket 762 * addresses can be obtained with getsockname() and getpeername(). 763 */ 764 status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET, 765 SO_UPDATE_ACCEPT_CONTEXT, 766 (char*)&key->hnd, sizeof(SOCKET)); 767 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later. 768 * So ignore the error status. 769 */ 770 771 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT; 772 op_key_rec->accept.addrlen = addrlen; 773 op_key_rec->accept.local = local; 774 op_key_rec->accept.remote = remote; 775 op_key_rec->accept.newsock_ptr = new_sock; 776 pj_memset(&op_key_rec->accept.overlapped, 0, 777 sizeof(op_key_rec->accept.overlapped)); 778 779 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock, 780 op_key_rec->accept.accept_buf, 735 781 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, 736 782 &bytesReceived, 737 & key->accept_overlapped.overlapped);783 &op_key_rec->accept.overlapped ); 738 784 739 785 if (rc == TRUE) { 740 ioqueue_on_accept_complete(&key->accept_overlapped); 741 if (key->cb.on_accept_complete) 742 key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 0); 786 ioqueue_on_accept_complete(&op_key_rec->accept); 743 787 return PJ_SUCCESS; 744 788 } else { 745 789 DWORD dwStatus = WSAGetLastError(); 746 if (dwStatus==WSA_IO_PENDING) 747 return PJ_EPENDING; 748 else 790 if (dwStatus!=WSA_IO_PENDING) 749 791 return PJ_STATUS_FROM_OS(dwStatus); 750 792 } 793 794 /* Asynchronous Accept() has been submitted. */ 795 return PJ_EPENDING; 751 796 } 752 797 … … 758 803 * since there's no overlapped version of connect()). 759 804 */ 760 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, 761 pj_ioqueue_key_t *key, 805 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 762 806 const pj_sockaddr_t *addr, 763 807 int addrlen ) 764 808 { 765 unsigned long optval = 1;766 809 HANDLE hEvent; 810 pj_ioqueue_t *ioqueue; 767 811 768 812 PJ_CHECK_STACK(); 769 770 /* Set socket to non-blocking. */ 771 if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { 772 return PJ_RETURN_OS_ERROR(WSAGetLastError()); 773 } 813 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 774 814 775 815 /* Initiate connect() */ … … 777 817 DWORD dwStatus; 778 818 dwStatus = WSAGetLastError(); 779 if (dwStatus != WSAEWOULDBLOCK) { 780 /* Permanent error */ 819 if (dwStatus != WSAEWOULDBLOCK) { 781 820 return PJ_RETURN_OS_ERROR(dwStatus); 782 } else {783 /* Pending operation. This is what we're looking for. */784 821 } 785 822 } else { 786 823 /* Connect has completed immediately! */ 787 /* Restore to blocking mode. */788 optval = 0;789 if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {790 return PJ_RETURN_OS_ERROR(WSAGetLastError());791 }792 793 key->cb.on_connect_complete(key, 0);794 824 return PJ_SUCCESS; 795 825 } 826 827 ioqueue = key->ioqueue; 796 828 797 829 /* Add to the array of connecting socket to be polled */ -
pjproject/main/pjlib/src/pjlib-test/atomic.c
r6 r11 50 50 51 51 /* increment. */ 52 if (pj_atomic_inc(atomic_var) != 112) 52 pj_atomic_inc(atomic_var); 53 if (pj_atomic_get(atomic_var) != 112) 53 54 return -40; 54 55 55 56 /* decrement. */ 56 if (pj_atomic_dec(atomic_var) != 111) 57 pj_atomic_dec(atomic_var); 58 if (pj_atomic_get(atomic_var) != 111) 57 59 return -50; 58 60 59 61 /* set */ 60 if (pj_atomic_set(atomic_var, 211) != 111) 62 pj_atomic_set(atomic_var, 211); 63 if (pj_atomic_get(atomic_var) != 211) 64 return -60; 65 66 /* add */ 67 pj_atomic_add(atomic_var, 10); 68 if (pj_atomic_get(atomic_var) != 221) 61 69 return -60; 62 70 63 71 /* check the value again. */ 64 if (pj_atomic_get(atomic_var) != 2 11)72 if (pj_atomic_get(atomic_var) != 221) 65 73 return -70; 66 74 -
pjproject/main/pjlib/src/pjlib-test/ioq_perf.c
r6 r11 35 35 typedef struct test_item 36 36 { 37 pj_sock_t server_fd, 38 client_fd; 39 pj_ioqueue_t *ioqueue; 40 pj_ioqueue_key_t *server_key, 41 *client_key; 42 pj_size_t buffer_size; 43 char *outgoing_buffer; 44 char *incoming_buffer; 45 pj_size_t bytes_sent, 46 bytes_recv; 37 pj_sock_t server_fd, 38 client_fd; 39 pj_ioqueue_t *ioqueue; 40 pj_ioqueue_key_t *server_key, 41 *client_key; 42 pj_ioqueue_op_key_t recv_op, 43 send_op; 44 int has_pending_send; 45 pj_size_t buffer_size; 46 char *outgoing_buffer; 47 char *incoming_buffer; 48 pj_size_t bytes_sent, 49 bytes_recv; 47 50 } test_item; 48 51 … … 50 53 * Increment item->bytes_recv and ready to read the next data. 51 54 */ 52 static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) 55 static void on_read_complete(pj_ioqueue_key_t *key, 56 pj_ioqueue_op_key_t *op_key, 57 pj_ssize_t bytes_read) 53 58 { 54 59 test_item *item = pj_ioqueue_get_user_data(key); 55 60 pj_status_t rc; 61 int data_is_available = 1; 56 62 57 63 //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); 58 64 59 if (thread_quit_flag) 60 return; 61 62 if (bytes_read < 0) { 63 pj_status_t rc = -bytes_read; 64 char errmsg[128]; 65 66 if (rc != last_error) { 67 last_error = rc; 68 pj_strerror(rc, errmsg, sizeof(errmsg)); 69 PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", 70 bytes_read, errmsg)); 71 PJ_LOG(3,(THIS_FILE, 72 ".....additional info: total read=%u, total written=%u", 73 item->bytes_recv, item->bytes_sent)); 74 } else { 75 last_error_counter++; 76 } 77 bytes_read = 0; 78 79 } else if (bytes_read == 0) { 80 PJ_LOG(3,(THIS_FILE, "...socket has closed!")); 81 } 82 83 item->bytes_recv += bytes_read; 65 do { 66 if (thread_quit_flag) 67 return; 68 69 if (bytes_read < 0) { 70 pj_status_t rc = -bytes_read; 71 char errmsg[128]; 72 73 if (rc != last_error) { 74 last_error = rc; 75 pj_strerror(rc, errmsg, sizeof(errmsg)); 76 PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", 77 bytes_read, errmsg)); 78 PJ_LOG(3,(THIS_FILE, 79 ".....additional info: total read=%u, total written=%u", 80 item->bytes_recv, item->bytes_sent)); 81 } else { 82 last_error_counter++; 83 } 84 bytes_read = 0; 85 86 } else if (bytes_read == 0) { 87 PJ_LOG(3,(THIS_FILE, "...socket has closed!")); 88 } 89 90 item->bytes_recv += bytes_read; 84 91 85 /* To assure that the test quits, even if main thread 86 * doesn't have time to run. 87 */ 88 if (item->bytes_recv > item->buffer_size * 10000) 89 thread_quit_flag = 1; 90 91 rc = pj_ioqueue_recv( item->ioqueue, item->server_key, 92 item->incoming_buffer, item->buffer_size, 0 ); 93 94 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 95 if (rc != last_error) { 96 last_error = rc; 97 app_perror("...error: read error", rc); 98 } else { 99 last_error_counter++; 100 } 101 } 92 /* To assure that the test quits, even if main thread 93 * doesn't have time to run. 94 */ 95 if (item->bytes_recv > item->buffer_size * 10000) 96 thread_quit_flag = 1; 97 98 bytes_read = item->buffer_size; 99 rc = pj_ioqueue_recv( key, op_key, 100 item->incoming_buffer, &bytes_read, 0 ); 101 102 if (rc == PJ_SUCCESS) { 103 data_is_available = 1; 104 } else if (rc == PJ_EPENDING) { 105 data_is_available = 0; 106 } else { 107 data_is_available = 0; 108 if (rc != last_error) { 109 last_error = rc; 110 app_perror("...error: read error", rc); 111 } else { 112 last_error_counter++; 113 } 114 } 115 116 if (!item->has_pending_send) { 117 pj_ssize_t sent = item->buffer_size; 118 rc = pj_ioqueue_send(item->client_key, &item->send_op, 119 item->outgoing_buffer, &sent, 0); 120 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 121 app_perror("...error: write error", rc); 122 } 123 124 item->has_pending_send = (rc==PJ_EPENDING); 125 } 126 127 } while (data_is_available); 102 128 } 103 129 … … 105 131 * Increment item->bytes_sent and write the next data. 106 132 */ 107 static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) 133 static void on_write_complete(pj_ioqueue_key_t *key, 134 pj_ioqueue_op_key_t *op_key, 135 pj_ssize_t bytes_sent) 108 136 { 109 137 test_item *item = pj_ioqueue_get_user_data(key); … … 114 142 return; 115 143 144 item->has_pending_send = 0; 116 145 item->bytes_sent += bytes_sent; 117 146 … … 123 152 pj_status_t rc; 124 153 125 rc = pj_ioqueue_write(item->ioqueue, item->client_key, 126 item->outgoing_buffer, item->buffer_size); 154 bytes_sent = item->buffer_size; 155 rc = pj_ioqueue_send( item->client_key, op_key, 156 item->outgoing_buffer, &bytes_sent, 0); 127 157 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 128 158 app_perror("...error: write error", rc); 129 159 } 160 161 item->has_pending_send = (rc==PJ_EPENDING); 130 162 } 131 163 } … … 192 224 193 225 TRACE_((THIS_FILE, " creating ioqueue..")); 194 rc = pj_ioqueue_create(pool, sockpair_cnt*2, thread_cnt,&ioqueue);226 rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); 195 227 if (rc != PJ_SUCCESS) { 196 228 app_perror("...error: unable to create ioqueue", rc); … … 200 232 /* Initialize each producer-consumer pair. */ 201 233 for (i=0; i<sockpair_cnt; ++i) { 234 pj_ssize_t bytes; 202 235 203 236 items[i].ioqueue = ioqueue; … … 243 276 /* Start reading. */ 244 277 TRACE_((THIS_FILE, " pj_ioqueue_recv..")); 245 rc = pj_ioqueue_recv(ioqueue, items[i].server_key, 246 items[i].incoming_buffer, items[i].buffer_size, 278 bytes = items[i].buffer_size; 279 rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op, 280 items[i].incoming_buffer, &bytes, 247 281 0); 248 if (rc != PJ_ SUCCESS && rc != PJ_EPENDING) {282 if (rc != PJ_EPENDING) { 249 283 app_perror("...error: pj_ioqueue_recv", rc); 250 284 return -73; … … 253 287 /* Start writing. */ 254 288 TRACE_((THIS_FILE, " pj_ioqueue_write..")); 255 rc = pj_ioqueue_write(ioqueue, items[i].client_key, 256 items[i].outgoing_buffer, items[i].buffer_size); 289 bytes = items[i].buffer_size; 290 rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op, 291 items[i].outgoing_buffer, &bytes, 0); 257 292 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 258 293 app_perror("...error: pj_ioqueue_write", rc); … … 260 295 } 261 296 297 items[i].has_pending_send = (rc==PJ_EPENDING); 262 298 } 263 299 … … 325 361 TRACE_((THIS_FILE, " closing all sockets..")); 326 362 for (i=0; i<sockpair_cnt; ++i) { 327 pj_ioqueue_unregister(i oqueue, items[i].server_key);328 pj_ioqueue_unregister(i oqueue, items[i].client_key);363 pj_ioqueue_unregister(items[i].server_key); 364 pj_ioqueue_unregister(items[i].client_key); 329 365 pj_sock_close(items[i].server_fd); 330 366 pj_sock_close(items[i].client_fd); -
pjproject/main/pjlib/src/pjlib-test/ioq_tcp.c
r6 r11 32 32 #define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048) 33 33 34 static pj_ssize_t callback_read_size, 35 callback_write_size, 36 callback_accept_status, 37 callback_connect_status; 38 static pj_ioqueue_key_t*callback_read_key, 39 *callback_write_key, 40 *callback_accept_key, 41 *callback_connect_key; 42 43 static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) 34 static pj_ssize_t callback_read_size, 35 callback_write_size, 36 callback_accept_status, 37 callback_connect_status; 38 static pj_ioqueue_key_t *callback_read_key, 39 *callback_write_key, 40 *callback_accept_key, 41 *callback_connect_key; 42 static pj_ioqueue_op_key_t *callback_read_op, 43 *callback_write_op, 44 *callback_accept_op; 45 46 static void on_ioqueue_read(pj_ioqueue_key_t *key, 47 pj_ioqueue_op_key_t *op_key, 48 pj_ssize_t bytes_read) 44 49 { 45 50 callback_read_key = key; 51 callback_read_op = op_key; 46 52 callback_read_size = bytes_read; 47 53 } 48 54 49 static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written) 55 static void on_ioqueue_write(pj_ioqueue_key_t *key, 56 pj_ioqueue_op_key_t *op_key, 57 pj_ssize_t bytes_written) 50 58 { 51 59 callback_write_key = key; 60 callback_write_op = op_key; 52 61 callback_write_size = bytes_written; 53 62 } 54 63 55 static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, 64 static void on_ioqueue_accept(pj_ioqueue_key_t *key, 65 pj_ioqueue_op_key_t *op_key, 66 pj_sock_t sock, 56 67 int status) 57 68 { … … 59 70 60 71 callback_accept_key = key; 72 callback_accept_op = op_key; 61 73 callback_accept_status = status; 62 74 } … … 84 96 pj_timestamp *t_elapsed) 85 97 { 86 int rc;98 pj_status_t status; 87 99 pj_ssize_t bytes; 100 pj_time_val timeout; 88 101 pj_timestamp t1, t2; 89 102 int pending_op = 0; 103 pj_ioqueue_op_key_t read_op, write_op; 90 104 91 105 // Start reading on the server side. 92 rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize); 93 if (rc != 0 && rc != PJ_EPENDING) { 106 bytes = bufsize; 107 status = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0); 108 if (status != PJ_SUCCESS && status != PJ_EPENDING) { 109 app_perror("...pj_ioqueue_recv error", status); 94 110 return -100; 95 111 } 96 112 97 ++pending_op; 113 if (status == PJ_EPENDING) 114 ++pending_op; 115 else { 116 /* Does not expect to return error or immediate data. */ 117 return -115; 118 } 98 119 99 120 // Randomize send buffer. … … 101 122 102 123 // Starts send on the client side. 103 bytes = pj_ioqueue_write(ioque, ckey, send_buf, bufsize); 104 if (bytes != bufsize && bytes != PJ_EPENDING) { 124 bytes = bufsize; 125 status = pj_ioqueue_send(ckey, &write_op, send_buf, &bytes, 0); 126 if (status != PJ_SUCCESS && bytes != PJ_EPENDING) { 105 127 return -120; 106 128 } 107 if ( bytes == PJ_EPENDING) {129 if (status == PJ_EPENDING) { 108 130 ++pending_op; 109 131 } … … 115 137 callback_read_size = callback_write_size = 0; 116 138 callback_read_key = callback_write_key = NULL; 139 callback_read_op = callback_write_op = NULL; 117 140 118 141 // Poll the queue until we've got completion event in the server side. 119 rc= 0;142 status = 0; 120 143 while (pending_op > 0) { 121 rc = pj_ioqueue_poll(ioque, NULL); 122 if (rc > 0) { 144 timeout.sec = 1; timeout.msec = 0; 145 status = pj_ioqueue_poll(ioque, &timeout); 146 if (status > 0) { 123 147 if (callback_read_size) { 124 if (callback_read_size != bufsize) {148 if (callback_read_size != bufsize) 125 149 return -160; 126 }127 150 if (callback_read_key != skey) 128 151 return -161; 152 if (callback_read_op != &read_op) 153 return -162; 129 154 } 130 155 if (callback_write_size) { 131 156 if (callback_write_key != ckey) 132 return -162; 157 return -163; 158 if (callback_write_op != &write_op) 159 return -164; 133 160 } 134 pending_op -= rc;161 pending_op -= status; 135 162 } 136 if (rc < 0) { 163 if (status == 0) { 164 PJ_LOG(3,("", "...error: timed out")); 165 } 166 if (status < 0) { 137 167 return -170; 138 168 } 139 169 } 170 171 // Pending op is zero. 172 // Subsequent poll should yield zero too. 173 timeout.sec = timeout.msec = 0; 174 status = pj_ioqueue_poll(ioque, &timeout); 175 if (status != 0) 176 return -173; 140 177 141 178 // End time. … … 143 180 t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo); 144 181 145 if ( rc< 0) {146 return -1 50;182 if (status < 0) { 183 return -176; 147 184 } 148 185 … … 169 206 pj_ioqueue_t *ioque = NULL; 170 207 pj_ioqueue_key_t *skey, *ckey0, *ckey1; 208 pj_ioqueue_op_key_t accept_op; 171 209 int bufsize = BUF_MIN_SIZE; 172 210 pj_ssize_t status = -1; … … 206 244 207 245 // Create I/O Queue. 208 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0,&ioque);246 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 209 247 if (rc != PJ_SUCCESS) { 210 248 app_perror("...ERROR in pj_ioqueue_create()", rc); … … 232 270 // Server socket accept() 233 271 client_addr_len = sizeof(pj_sockaddr_in); 234 status = pj_ioqueue_accept(ioque, skey, &csock0, &client_addr, &rmt_addr, &client_addr_len); 272 status = pj_ioqueue_accept(skey, &accept_op, &csock0, 273 &client_addr, &rmt_addr, &client_addr_len); 235 274 if (status != PJ_EPENDING) { 236 275 app_perror("...ERROR in pj_ioqueue_accept()", rc); … … 248 287 249 288 // Client socket connect() 250 status = pj_ioqueue_connect( ioque,ckey1, &addr, sizeof(addr));289 status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); 251 290 if (status!=PJ_SUCCESS && status != PJ_EPENDING) { 252 291 app_perror("...ERROR in pj_ioqueue_connect()", rc); … … 263 302 callback_read_key = callback_write_key = 264 303 callback_accept_key = callback_connect_key = NULL; 304 callback_accept_op = callback_read_op = callback_write_op = NULL; 265 305 266 306 while (pending_op) { … … 274 314 } 275 315 if (callback_accept_key != skey) { 276 status=-4 1; goto on_error;316 status=-42; goto on_error; 277 317 } 318 if (callback_accept_op != &accept_op) { 319 status=-43; goto on_error; 320 } 321 callback_accept_status = -2; 278 322 } 279 323 … … 285 329 status=-51; goto on_error; 286 330 } 331 callback_connect_status = -2; 287 332 } 288 333 … … 293 338 } 294 339 } 340 } 341 342 // There's no pending operation. 343 // When we poll the ioqueue, there must not be events. 344 if (pending_op == 0) { 345 pj_time_val timeout = {1, 0}; 346 status = pj_ioqueue_poll(ioque, &timeout); 347 if (status != 0) { 348 status=-60; goto on_error; 349 } 295 350 } 296 351 … … 313 368 // Test send and receive. 314 369 t_elapsed.u32.lo = 0; 315 status = send_recv_test(ioque, ckey0, ckey1, send_buf, recv_buf, bufsize, &t_elapsed); 370 status = send_recv_test(ioque, ckey0, ckey1, send_buf, 371 recv_buf, bufsize, &t_elapsed); 316 372 if (status != 0) { 317 373 goto on_error; … … 355 411 356 412 // Create I/O Queue. 357 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0,&ioque);413 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 358 414 if (!ioque) { 359 415 status=-20; goto on_error; … … 382 438 383 439 // Client socket connect() 384 status = pj_ioqueue_connect( ioque,ckey1, &addr, sizeof(addr));440 status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); 385 441 if (status==PJ_SUCCESS) { 386 442 // unexpectedly success! … … 418 474 } 419 475 476 // There's no pending operation. 477 // When we poll the ioqueue, there must not be events. 478 if (pending_op == 0) { 479 pj_time_val timeout = {1, 0}; 480 status = pj_ioqueue_poll(ioque, &timeout); 481 if (status != 0) { 482 status=-60; goto on_error; 483 } 484 } 485 420 486 // Success 421 487 status = 0; -
pjproject/main/pjlib/src/pjlib-test/ioq_udp.c
r6 r11 35 35 #define TRACE_(msg) PJ_LOG(3,(THIS_FILE,"....." msg)) 36 36 37 static pj_ssize_t callback_read_size, 38 callback_write_size, 39 callback_accept_status, 40 callback_connect_status; 41 static pj_ioqueue_key_t *callback_read_key, 42 *callback_write_key, 43 *callback_accept_key, 44 *callback_connect_key; 45 46 static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) 37 static pj_ssize_t callback_read_size, 38 callback_write_size, 39 callback_accept_status, 40 callback_connect_status; 41 static pj_ioqueue_key_t *callback_read_key, 42 *callback_write_key, 43 *callback_accept_key, 44 *callback_connect_key; 45 static pj_ioqueue_op_key_t *callback_read_op, 46 *callback_write_op, 47 *callback_accept_op; 48 49 static void on_ioqueue_read(pj_ioqueue_key_t *key, 50 pj_ioqueue_op_key_t *op_key, 51 pj_ssize_t bytes_read) 47 52 { 48 53 callback_read_key = key; 54 callback_read_op = op_key; 49 55 callback_read_size = bytes_read; 50 56 } 51 57 52 static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written) 58 static void on_ioqueue_write(pj_ioqueue_key_t *key, 59 pj_ioqueue_op_key_t *op_key, 60 pj_ssize_t bytes_written) 53 61 { 54 62 callback_write_key = key; 63 callback_write_op = op_key; 55 64 callback_write_size = bytes_written; 56 65 } 57 66 58 static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, int status) 67 static void on_ioqueue_accept(pj_ioqueue_key_t *key, 68 pj_ioqueue_op_key_t *op_key, 69 pj_sock_t sock, int status) 59 70 { 60 71 PJ_UNUSED_ARG(sock); 61 72 callback_accept_key = key; 73 callback_accept_op = op_key; 62 74 callback_accept_status = status; 63 75 } … … 82 94 # define S_ADDR s_addr 83 95 #endif 84 85 /*86 * native_format_test()87 * This is just a simple test to verify that various structures in sock.h88 * are really compatible with operating system's definitions.89 */90 static int native_format_test(void)91 {92 pj_status_t rc;93 94 // Test that PJ_INVALID_SOCKET is working.95 {96 pj_sock_t sock;97 rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, -1, &sock);98 if (rc == PJ_SUCCESS)99 return -1020;100 }101 102 // Previous func will set errno var.103 pj_set_os_error(PJ_SUCCESS);104 105 return 0;106 }107 96 108 97 /* … … 120 109 pj_ioqueue_t *ioque = NULL; 121 110 pj_ioqueue_key_t *skey, *ckey; 111 pj_ioqueue_op_key_t read_op, write_op; 122 112 int bufsize = BUF_MIN_SIZE; 123 113 pj_ssize_t bytes, status = -1; … … 158 148 // Create I/O Queue. 159 149 TRACE_("create ioqueue..."); 160 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 161 PJ_IOQUEUE_DEFAULT_THREADS, &ioque); 150 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 162 151 if (rc != PJ_SUCCESS) { 163 152 status=-20; goto on_error; … … 196 185 TRACE_("start recvfrom..."); 197 186 addrlen = sizeof(addr); 198 bytes = pj_ioqueue_recvfrom(ioque, skey, recv_buf, bufsize, 0, 199 &addr, &addrlen); 200 if (bytes < 0 && bytes != PJ_EPENDING) { 187 bytes = bufsize; 188 rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0, 189 &addr, &addrlen); 190 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 191 app_perror("...error: pj_ioqueue_recvfrom", rc); 201 192 status=-28; goto on_error; 202 } else if ( bytes== PJ_EPENDING) {193 } else if (rc == PJ_EPENDING) { 203 194 recv_pending = 1; 204 195 PJ_LOG(3, (THIS_FILE, … … 212 203 // Write must return the number of bytes. 213 204 TRACE_("start sendto..."); 214 bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, &addr,215 sizeof(addr)); 216 if (bytes != bufsize && bytes != PJ_EPENDING) { 217 PJ_LOG(1,(THIS_FILE, 218 "......error: sendto returned %d", bytes));205 bytes = bufsize; 206 rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &addr, 207 sizeof(addr)); 208 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 209 app_perror("...error: pj_ioqueue_sendto", rc); 219 210 status=-30; goto on_error; 220 } else if ( bytes== PJ_EPENDING) {211 } else if (rc == PJ_EPENDING) { 221 212 send_pending = 1; 222 213 PJ_LOG(3, (THIS_FILE, … … 233 224 callback_read_key = callback_write_key = 234 225 callback_accept_key = callback_connect_key = NULL; 226 callback_read_op = callback_write_op = NULL; 235 227 236 228 // Poll if pending. 237 while (send_pending &&recv_pending) {229 while (send_pending || recv_pending) { 238 230 int rc; 239 231 pj_time_val timeout = { 5, 0 }; … … 254 246 status=-61; goto on_error; 255 247 } 256 257 248 if (callback_read_key != skey) { 258 249 status=-65; goto on_error; 250 } 251 if (callback_read_op != &read_op) { 252 status=-66; goto on_error; 259 253 } 260 254 … … 271 265 status=-73; goto on_error; 272 266 } 273 274 267 if (callback_write_key != ckey) { 275 268 status=-75; goto on_error; 269 } 270 if (callback_write_op != &write_op) { 271 status=-76; goto on_error; 276 272 } 277 273 … … 327 323 328 324 /* Create IOQueue */ 329 rc = pj_ioqueue_create(pool, MAX, 330 PJ_IOQUEUE_DEFAULT_THREADS, 331 &ioqueue); 325 rc = pj_ioqueue_create(pool, MAX, &ioqueue); 332 326 if (rc != PJ_SUCCESS || ioqueue == NULL) { 333 327 app_perror("...error in pj_ioqueue_create", rc); … … 359 353 360 354 for (i=0; i<count; ++i) { 361 rc = pj_ioqueue_unregister( ioqueue,key[i]);355 rc = pj_ioqueue_unregister(key[i]); 362 356 if (rc != PJ_SUCCESS) { 363 357 app_perror("...error in pj_ioqueue_unregister", rc); … … 394 388 pj_pool_t *pool = NULL; 395 389 pj_sock_t *inactive_sock=NULL; 390 pj_ioqueue_op_key_t *inactive_read_op; 396 391 char *send_buf, *recv_buf; 397 392 pj_ioqueue_t *ioque = NULL; … … 430 425 431 426 // Create I/O Queue. 432 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 433 PJ_IOQUEUE_DEFAULT_THREADS, &ioque); 427 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 434 428 if (rc != PJ_SUCCESS) { 435 429 app_perror("...error: pj_ioqueue_create()", rc); … … 441 435 inactive_sock = (pj_sock_t*)pj_pool_alloc(pool, 442 436 inactive_sock_count*sizeof(pj_sock_t)); 437 inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool, 438 inactive_sock_count*sizeof(pj_ioqueue_op_key_t)); 443 439 memset(&addr, 0, sizeof(addr)); 444 440 addr.sin_family = PJ_AF_INET; 445 441 for (i=0; i<inactive_sock_count; ++i) { 442 pj_ssize_t bytes; 443 446 444 rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]); 447 445 if (rc != PJ_SUCCESS || inactive_sock[i] < 0) { … … 464 462 goto on_error; 465 463 } 466 rc = pj_ioqueue_read(ioque, key, recv_buf, bufsize); 464 bytes = bufsize; 465 rc = pj_ioqueue_recv(key, &inactive_read_op[i], recv_buf, &bytes, 0); 467 466 if ( rc < 0 && rc != PJ_EPENDING) { 468 467 pj_sock_close(inactive_sock[i]); … … 497 496 for (i=0; i<LOOP; ++i) { 498 497 pj_ssize_t bytes; 498 pj_ioqueue_op_key_t read_op, write_op; 499 499 500 500 // Randomize send buffer. … … 502 502 503 503 // Start reading on the server side. 504 rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize); 504 bytes = bufsize; 505 rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0); 505 506 if (rc < 0 && rc != PJ_EPENDING) { 506 507 app_perror("...error: pj_ioqueue_read()", rc); … … 509 510 510 511 // Starts send on the client side. 511 bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, 512 &addr, sizeof(addr)); 513 if (bytes != bufsize && bytes != PJ_EPENDING) { 512 bytes = bufsize; 513 rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, 514 &addr, sizeof(addr)); 515 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 514 516 app_perror("...error: pj_ioqueue_write()", bytes); 515 517 rc = -1; … … 600 602 int bufsize, sock_count; 601 603 602 PJ_LOG(3, (THIS_FILE, "...format test"));603 if ((status = native_format_test()) != 0)604 return status;605 PJ_LOG(3, (THIS_FILE, "....native format test ok"));606 607 604 PJ_LOG(3, (THIS_FILE, "...compliance test")); 608 605 if ((status=compliance_test()) != 0) { -
pjproject/main/pjlib/src/pjlib-test/main.c
r6 r11 12 12 13 13 14 #if defined(PJ_WIN32) && PJ_WIN32!=0 14 //#if defined(PJ_WIN32) && PJ_WIN32!=0 15 #if 0 15 16 #include <windows.h> 16 17 static void boost(void) -
pjproject/main/pjlib/src/pjlib-test/test.h
r6 r11 9 9 #define GROUP_OS 0 10 10 #define GROUP_DATA_STRUCTURE 0 11 #define GROUP_NETWORK 011 #define GROUP_NETWORK 1 12 12 #define GROUP_EXTRA 0 13 13 … … 35 35 #define INCLUDE_XML_TEST GROUP_EXTRA 36 36 37 #define INCLUDE_ECHO_SERVER 0 38 #define INCLUDE_ECHO_CLIENT 0 37 39 38 #define INCLUDE_ECHO_SERVER 039 #define INCLUDE_ECHO_CLIENT 140 40 41 41 #define ECHO_SERVER_MAX_THREADS 4 … … 74 74 extern int echo_client(int sock_type, const char *server, int port); 75 75 76 extern int echo_srv_sync(void); 77 extern int udp_echo_srv_ioqueue(void); 78 extern int echo_srv_common_loop(pj_atomic_t *bytes_counter); 79 76 80 extern pj_pool_factory *mem; 77 81 -
pjproject/main/pjlib/src/pjlib-test/udp_echo_srv_sync.c
r10 r11 9 9 { 10 10 pj_sock_t sock = (pj_sock_t)arg; 11 char buf[ 1516];11 char buf[512]; 12 12 pj_status_t last_recv_err = PJ_SUCCESS, last_write_err = PJ_SUCCESS; 13 13 … … 49 49 pj_thread_t *thread[ECHO_SERVER_MAX_THREADS]; 50 50 pj_status_t rc; 51 pj_highprec_t last_received, avg_bw, highest_bw;52 pj_time_val last_print;53 unsigned count;54 51 int i; 55 52 … … 84 81 PJ_LOG(3,("", "...Press Ctrl-C to abort")); 85 82 83 echo_srv_common_loop(total_bytes); 84 return 0; 85 } 86 87 88 int echo_srv_common_loop(pj_atomic_t *bytes_counter) 89 { 90 pj_highprec_t last_received, avg_bw, highest_bw; 91 pj_time_val last_print; 92 unsigned count; 93 86 94 last_received = 0; 87 95 pj_gettimeofday(&last_print); … … 96 104 pj_thread_sleep(1000); 97 105 98 received = cur_received = pj_atomic_get( total_bytes);106 received = cur_received = pj_atomic_get(bytes_counter); 99 107 cur_received = cur_received - last_received; 100 108
Note: See TracChangeset
for help on using the changeset viewer.