Changeset 11 for pjproject/main/pjlib/src/pj/ioqueue_select.c
- Timestamp:
- Nov 6, 2005 9:37:47 AM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/main/pjlib/src/pj/ioqueue_select.c
r6 r11 35 35 #define THIS_FILE "ioq_select" 36 36 37 #define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \ 38 (op & PJ_IOQUEUE_OP_RECV) || \ 39 (op & PJ_IOQUEUE_OP_RECV_FROM)) 40 #define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \ 41 (op & PJ_IOQUEUE_OP_SEND) || \ 42 (op & PJ_IOQUEUE_OP_SEND_TO)) 43 44 45 #if PJ_HAS_TCP 46 # define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT) 47 # define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT) 48 #else 49 # define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0 50 # define PJ_IOQUEUE_IS_CONNECT_OP(op) 0 51 #endif 37 /* 38 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return 39 * the correct error code. 40 */ 41 #if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) 42 # error "Error reporting must be enabled for this function to work!" 43 #endif 44 45 /** 46 * Get the number of descriptors in the set. This is defined in sock_select.c 47 * This function will only return the number of sockets set from PJ_FD_SET 48 * operation. When the set is modified by other means (such as by select()), 49 * the count will not be reflected here. 50 * 51 * That's why don't export this function in the header file, to avoid 52 * misunderstanding. 53 * 54 * @param fdsetp The descriptor set. 55 * 56 * @return Number of descriptors in the set. 57 */ 58 PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); 59 60 61 52 62 53 63 /* … … 61 71 #endif 62 72 73 struct generic_operation 74 { 75 PJ_DECL_LIST_MEMBER(struct generic_operation); 76 pj_ioqueue_operation_e op; 77 }; 78 79 struct read_operation 80 { 81 PJ_DECL_LIST_MEMBER(struct read_operation); 82 pj_ioqueue_operation_e op; 83 84 void *buf; 85 pj_size_t size; 86 unsigned flags; 87 pj_sockaddr_t *rmt_addr; 88 int *rmt_addrlen; 89 }; 90 91 struct write_operation 92 { 93 PJ_DECL_LIST_MEMBER(struct write_operation); 94 pj_ioqueue_operation_e op; 95 96 char *buf; 97 pj_size_t size; 98 pj_ssize_t written; 99 unsigned flags; 100 pj_sockaddr_in rmt_addr; 101 int rmt_addrlen; 102 }; 103 104 #if PJ_HAS_TCP 105 struct accept_operation 106 { 107 PJ_DECL_LIST_MEMBER(struct accept_operation); 108 pj_ioqueue_operation_e op; 109 110 pj_sock_t *accept_fd; 111 pj_sockaddr_t *local_addr; 112 pj_sockaddr_t *rmt_addr; 113 int *addrlen; 114 }; 115 #endif 116 117 union operation_key 118 { 119 struct generic_operation generic; 120 struct read_operation read; 121 struct write_operation write; 122 #if PJ_HAS_TCP 123 struct accept_operation accept; 124 #endif 125 }; 126 63 127 /* 64 128 * This describes each key. … … 66 130 struct pj_ioqueue_key_t 67 131 { 68 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) 132 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); 133 pj_ioqueue_t *ioqueue; 69 134 pj_sock_t fd; 70 pj_ioqueue_operation_e op;71 135 void *user_data; 72 136 pj_ioqueue_callback cb; 73 74 void *rd_buf; 75 unsigned rd_flags; 76 pj_size_t rd_buflen; 77 void *wr_buf; 78 pj_size_t wr_buflen; 79 80 pj_sockaddr_t *rmt_addr; 81 int *rmt_addrlen; 82 83 pj_sockaddr_t *local_addr; 84 int *local_addrlen; 85 86 pj_sock_t *accept_fd; 137 int connecting; 138 struct read_operation read_list; 139 struct write_operation write_list; 140 #if PJ_HAS_TCP 141 struct accept_operation accept_list; 142 #endif 87 143 }; 88 144 … … 95 151 pj_bool_t auto_delete_lock; 96 152 unsigned max, count; 97 pj_ioqueue_key_t hlist;153 pj_ioqueue_key_t key_list; 98 154 pj_fd_set_t rfdset; 99 155 pj_fd_set_t wfdset; … … 110 166 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 111 167 pj_size_t max_fd, 112 int max_threads,113 168 pj_ioqueue_t **p_ioqueue) 114 169 { 115 pj_ioqueue_t *ioque ;170 pj_ioqueue_t *ioqueue; 116 171 pj_status_t rc; 117 172 118 PJ_UNUSED_ARG(max_threads); 119 120 if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { 121 pj_assert(!"max_fd too large"); 122 return PJ_EINVAL; 123 } 124 125 ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 126 ioque->max = max_fd; 127 ioque->count = 0; 128 PJ_FD_ZERO(&ioque->rfdset); 129 PJ_FD_ZERO(&ioque->wfdset); 173 /* Check that arguments are valid. */ 174 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 175 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, 176 PJ_EINVAL); 177 178 /* Check that size of pj_ioqueue_op_key_t is sufficient */ 179 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= 180 sizeof(union operation_key), PJ_EBUG); 181 182 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 183 ioqueue->max = max_fd; 184 ioqueue->count = 0; 185 PJ_FD_ZERO(&ioqueue->rfdset); 186 PJ_FD_ZERO(&ioqueue->wfdset); 130 187 #if PJ_HAS_TCP 131 PJ_FD_ZERO(&ioque ->xfdset);132 #endif 133 pj_list_init(&ioque ->hlist);134 135 rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque ->lock);188 PJ_FD_ZERO(&ioqueue->xfdset); 189 #endif 190 pj_list_init(&ioqueue->key_list); 191 192 rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock); 136 193 if (rc != PJ_SUCCESS) 137 194 return rc; 138 195 139 ioque ->auto_delete_lock = PJ_TRUE;140 141 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque ));142 143 *p_ioqueue = ioque ;196 ioqueue->auto_delete_lock = PJ_TRUE; 197 198 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); 199 200 *p_ioqueue = ioqueue; 144 201 return PJ_SUCCESS; 145 202 } … … 150 207 * Destroy ioqueue. 151 208 */ 152 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque )209 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 153 210 { 154 211 pj_status_t rc = PJ_SUCCESS; 155 212 156 PJ_ASSERT_RETURN(ioque, PJ_EINVAL); 157 158 if (ioque->auto_delete_lock) 159 rc = pj_lock_destroy(ioque->lock); 213 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 214 215 pj_lock_acquire(ioqueue->lock); 216 217 if (ioqueue->auto_delete_lock) 218 rc = pj_lock_destroy(ioqueue->lock); 160 219 161 220 return rc; … … 164 223 165 224 /* 166 * pj_ioqueue_set_lock()167 */168 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,169 pj_lock_t *lock,170 pj_bool_t auto_delete )171 {172 PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);173 174 if (ioque->auto_delete_lock) {175 pj_lock_destroy(ioque->lock);176 }177 178 ioque->lock = lock;179 ioque->auto_delete_lock = auto_delete;180 181 return PJ_SUCCESS;182 }183 184 185 /*186 225 * pj_ioqueue_register_sock() 187 226 * … … 189 228 */ 190 229 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 191 pj_ioqueue_t *ioque ,230 pj_ioqueue_t *ioqueue, 192 231 pj_sock_t sock, 193 232 void *user_data, … … 199 238 pj_status_t rc = PJ_SUCCESS; 200 239 201 PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&240 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && 202 241 cb && p_key, PJ_EINVAL); 203 242 204 pj_lock_acquire(ioque ->lock);205 206 if (ioque ->count >= ioque->max) {243 pj_lock_acquire(ioqueue->lock); 244 245 if (ioqueue->count >= ioqueue->max) { 207 246 rc = PJ_ETOOMANY; 208 247 goto on_return; … … 212 251 value = 1; 213 252 #ifdef PJ_WIN32 214 if (ioctlsocket(sock, FIONBIO, (u nsignedlong*)&value)) {253 if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) { 215 254 #else 216 255 if (ioctl(sock, FIONBIO, &value)) { … … 222 261 /* Create key. */ 223 262 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 263 key->ioqueue = ioqueue; 224 264 key->fd = sock; 225 265 key->user_data = user_data; 266 pj_list_init(&key->read_list); 267 pj_list_init(&key->write_list); 268 #if PJ_HAS_TCP 269 pj_list_init(&key->accept_list); 270 #endif 226 271 227 272 /* Save callback. */ … … 229 274 230 275 /* Register */ 231 pj_list_insert_before(&ioque ->hlist, key);232 ++ioque ->count;276 pj_list_insert_before(&ioqueue->key_list, key); 277 ++ioqueue->count; 233 278 234 279 on_return: 280 /* On error, socket may be left in non-blocking mode. */ 235 281 *p_key = key; 236 pj_lock_release(ioque ->lock);282 pj_lock_release(ioqueue->lock); 237 283 238 284 return rc; … … 244 290 * Unregister handle from ioqueue. 245 291 */ 246 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, 247 pj_ioqueue_key_t *key) 248 { 249 PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); 250 251 pj_lock_acquire(ioque->lock); 252 253 pj_assert(ioque->count > 0); 254 --ioque->count; 292 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) 293 { 294 pj_ioqueue_t *ioqueue; 295 296 PJ_ASSERT_RETURN(key, PJ_EINVAL); 297 298 ioqueue = key->ioqueue; 299 300 pj_lock_acquire(ioqueue->lock); 301 302 pj_assert(ioqueue->count > 0); 303 --ioqueue->count; 255 304 pj_list_erase(key); 256 PJ_FD_CLR(key->fd, &ioque ->rfdset);257 PJ_FD_CLR(key->fd, &ioque ->wfdset);305 PJ_FD_CLR(key->fd, &ioqueue->rfdset); 306 PJ_FD_CLR(key->fd, &ioqueue->wfdset); 258 307 #if PJ_HAS_TCP 259 PJ_FD_CLR(key->fd, &ioque ->xfdset);260 #endif 261 262 pj_lock_release(ioque ->lock);308 PJ_FD_CLR(key->fd, &ioqueue->xfdset); 309 #endif 310 311 pj_lock_release(ioqueue->lock); 263 312 return PJ_SUCCESS; 264 313 } … … 276 325 277 326 327 /* 328 * pj_ioqueue_set_user_data() 329 */ 330 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 331 void *user_data, 332 void **old_data) 333 { 334 PJ_ASSERT_RETURN(key, PJ_EINVAL); 335 336 if (old_data) 337 *old_data = key->user_data; 338 key->user_data = user_data; 339 340 return PJ_SUCCESS; 341 } 342 343 278 344 /* This supposed to check whether the fd_set values are consistent 279 345 * with the operation currently set in each key. 280 346 */ 281 347 #if VALIDATE_FD_SET 282 static void validate_sets(const pj_ioqueue_t *ioque ,348 static void validate_sets(const pj_ioqueue_t *ioqueue, 283 349 const pj_fd_set_t *rfdset, 284 350 const pj_fd_set_t *wfdset, … … 287 353 pj_ioqueue_key_t *key; 288 354 289 key = ioque->hlist.next; 290 while (key != &ioque->hlist) { 291 if ((key->op & PJ_IOQUEUE_OP_READ) 292 || (key->op & PJ_IOQUEUE_OP_RECV) 293 || (key->op & PJ_IOQUEUE_OP_RECV_FROM) 355 key = ioqueue->key_list.next; 356 while (key != &ioqueue->key_list) { 357 if (!pj_list_empty(&key->read_list) 294 358 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 295 || (key->op & PJ_IOQUEUE_OP_ACCEPT)359 || !pj_list_empty(&key->accept_list) 296 360 #endif 297 361 ) … … 302 366 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0); 303 367 } 304 if ((key->op & PJ_IOQUEUE_OP_WRITE) 305 || (key->op & PJ_IOQUEUE_OP_SEND) 306 || (key->op & PJ_IOQUEUE_OP_SEND_TO) 368 if (!pj_list_empty(&key->write_list) 307 369 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 308 || (key->op & PJ_IOQUEUE_OP_CONNECT)370 || key->connecting 309 371 #endif 310 372 ) … … 316 378 } 317 379 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 318 if (key-> op & PJ_IOQUEUE_OP_CONNECT)380 if (key->connecting) 319 381 { 320 382 pj_assert(PJ_FD_ISSET(key->fd, xfdset)); … … 348 410 * work on fd_set copy of the ioqueue (not the original one). 349 411 */ 350 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque , const pj_time_val *timeout)412 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 351 413 { 352 414 pj_fd_set_t rfdset, wfdset, xfdset; … … 354 416 pj_ioqueue_key_t *h; 355 417 418 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 419 356 420 /* Lock ioqueue before making fd_set copies */ 357 pj_lock_acquire(ioque->lock); 358 359 if (PJ_FD_COUNT(&ioque->rfdset)==0 && 360 PJ_FD_COUNT(&ioque->wfdset)==0 && 361 PJ_FD_COUNT(&ioque->xfdset)==0) 421 pj_lock_acquire(ioqueue->lock); 422 423 /* We will only do select() when there are sockets to be polled. 424 * Otherwise select() will return error. 425 */ 426 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && 427 PJ_FD_COUNT(&ioqueue->wfdset)==0 && 428 PJ_FD_COUNT(&ioqueue->xfdset)==0) 362 429 { 363 pj_lock_release(ioque ->lock);430 pj_lock_release(ioqueue->lock); 364 431 if (timeout) 365 432 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); … … 368 435 369 436 /* Copy ioqueue's pj_fd_set_t to local variables. */ 370 pj_memcpy(&rfdset, &ioque ->rfdset, sizeof(pj_fd_set_t));371 pj_memcpy(&wfdset, &ioque ->wfdset, sizeof(pj_fd_set_t));437 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t)); 438 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t)); 372 439 #if PJ_HAS_TCP 373 pj_memcpy(&xfdset, &ioque ->xfdset, sizeof(pj_fd_set_t));440 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t)); 374 441 #else 375 442 PJ_FD_ZERO(&xfdset); … … 377 444 378 445 #if VALIDATE_FD_SET 379 validate_sets(ioque , &rfdset, &wfdset, &xfdset);446 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset); 380 447 #endif 381 448 382 449 /* Unlock ioqueue before select(). */ 383 pj_lock_release(ioque ->lock);450 pj_lock_release(ioqueue->lock); 384 451 385 452 count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout); … … 388 455 return count; 389 456 390 /* Lock ioqueue again before scanning for signalled sockets. */ 391 pj_lock_acquire(ioque->lock); 392 393 #if PJ_HAS_TCP 394 /* Scan for exception socket */ 395 h = ioque->hlist.next; 396 do_except_scan: 397 for ( ; h!=&ioque->hlist; h = h->next) { 398 if ((h->op & PJ_IOQUEUE_OP_CONNECT) && PJ_FD_ISSET(h->fd, &xfdset)) 399 break; 400 } 401 if (h != &ioque->hlist) { 402 /* 'connect()' should be the only operation. */ 403 pj_assert((h->op == PJ_IOQUEUE_OP_CONNECT)); 404 405 /* Clear operation. */ 406 h->op &= ~(PJ_IOQUEUE_OP_CONNECT); 407 PJ_FD_CLR(h->fd, &ioque->wfdset); 408 PJ_FD_CLR(h->fd, &ioque->xfdset); 409 PJ_FD_CLR(h->fd, &wfdset); 410 PJ_FD_CLR(h->fd, &xfdset); 411 412 /* Call callback. */ 413 if (h->cb.on_connect_complete) 414 (*h->cb.on_connect_complete)(h, -1); 415 416 /* Re-scan exception list. */ 417 goto do_except_scan; 418 } 419 #endif /* PJ_HAS_TCP */ 420 421 /* Scan for readable socket. */ 422 h = ioque->hlist.next; 423 do_readable_scan: 424 for ( ; h!=&ioque->hlist; h = h->next) { 425 if ((PJ_IOQUEUE_IS_READ_OP(h->op) || PJ_IOQUEUE_IS_ACCEPT_OP(h->op)) && 426 PJ_FD_ISSET(h->fd, &rfdset)) 457 /* Lock ioqueue again before scanning for signalled sockets. 458 * We must strictly use recursive mutex since application may invoke 459 * the ioqueue again inside the callback. 460 */ 461 pj_lock_acquire(ioqueue->lock); 462 463 /* Scan for writable sockets first to handle piggy-back data 464 * coming with accept(). 465 */ 466 h = ioqueue->key_list.next; 467 do_writable_scan: 468 for ( ; h!=&ioqueue->key_list; h = h->next) { 469 if ( (!pj_list_empty(&h->write_list) || h->connecting) 470 && PJ_FD_ISSET(h->fd, &wfdset)) 427 471 { 428 472 break; 429 473 } 430 474 } 431 if (h != &ioque->hlist) { 432 pj_status_t rc; 433 434 pj_assert(PJ_IOQUEUE_IS_READ_OP(h->op) || 435 PJ_IOQUEUE_IS_ACCEPT_OP(h->op)); 436 437 # if PJ_HAS_TCP 438 if ((h->op & PJ_IOQUEUE_OP_ACCEPT)) { 439 /* accept() must be the only operation specified on server socket */ 440 pj_assert(h->op == PJ_IOQUEUE_OP_ACCEPT); 441 442 rc=pj_sock_accept(h->fd, h->accept_fd, h->rmt_addr, h->rmt_addrlen); 443 if (rc==0 && h->local_addr) { 444 rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, 445 h->local_addrlen); 446 } 447 448 h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); 449 PJ_FD_CLR(h->fd, &ioque->rfdset); 450 451 /* Call callback. */ 452 if (h->cb.on_accept_complete) 453 (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); 454 455 /* Re-scan readable sockets. */ 456 goto do_readable_scan; 457 } 458 else { 459 # endif 460 pj_ssize_t bytes_read = h->rd_buflen; 461 462 if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { 463 rc = pj_sock_recvfrom(h->fd, h->rd_buf, &bytes_read, 0, 464 h->rmt_addr, h->rmt_addrlen); 465 } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { 466 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); 467 } else { 468 /* 469 * User has specified pj_ioqueue_read(). 470 * On Win32, we should do ReadFile(). But because we got 471 * here because of select() anyway, user must have put a 472 * socket descriptor on h->fd, which in this case we can 473 * just call pj_sock_recv() instead of ReadFile(). 474 * On Unix, user may put a file in h->fd, so we'll have 475 * to call read() here. 476 * This may not compile on systems which doesn't have 477 * read(). That's why we only specify PJ_LINUX here so 478 * that error is easier to catch. 479 */ 480 # if defined(PJ_WIN32) && PJ_WIN32 != 0 481 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); 482 # elif (defined(PJ_LINUX) && PJ_LINUX != 0) || \ 483 (defined(PJ_SUNOS) && PJ_SUNOS != 0) 484 bytes_read = read(h->fd, h->rd_buf, bytes_read); 485 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 486 # elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 487 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); 488 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 489 # else 490 # error "Implement read() for this platform!" 491 # endif 492 } 493 494 if (rc != PJ_SUCCESS) { 495 # if defined(PJ_WIN32) && PJ_WIN32 != 0 496 /* On Win32, for UDP, WSAECONNRESET on the receive side 497 * indicates that previous sending has triggered ICMP Port 498 * Unreachable message. 499 * But we wouldn't know at this point which one of previous 500 * key that has triggered the error, since UDP socket can 501 * be shared! 502 * So we'll just ignore it! 503 */ 504 505 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { 506 PJ_LOG(4,(THIS_FILE, 507 "Ignored ICMP port unreach. on key=%p", h)); 508 } 509 # endif 510 511 /* In any case we would report this to caller. */ 512 bytes_read = -rc; 513 } 514 515 h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | 516 PJ_IOQUEUE_OP_RECV_FROM); 517 PJ_FD_CLR(h->fd, &ioque->rfdset); 518 PJ_FD_CLR(h->fd, &rfdset); 519 520 /* Call callback. */ 521 if (h->cb.on_read_complete) 522 (*h->cb.on_read_complete)(h, bytes_read); 523 524 /* Re-scan readable sockets. */ 525 goto do_readable_scan; 526 527 } 528 } 529 530 /* Scan for writable socket */ 531 h = ioque->hlist.next; 532 do_writable_scan: 533 for ( ; h!=&ioque->hlist; h = h->next) { 534 if ((PJ_IOQUEUE_IS_WRITE_OP(h->op) || PJ_IOQUEUE_IS_CONNECT_OP(h->op)) 535 && PJ_FD_ISSET(h->fd, &wfdset)) 536 { 537 break; 538 } 539 } 540 if (h != &ioque->hlist) { 541 pj_assert(PJ_IOQUEUE_IS_WRITE_OP(h->op) || 542 PJ_IOQUEUE_IS_CONNECT_OP(h->op)); 475 if (h != &ioqueue->key_list) { 476 pj_assert(!pj_list_empty(&h->write_list) || h->connecting); 543 477 544 478 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 545 if ( (h->op & PJ_IOQUEUE_OP_CONNECT)) {479 if (h->connecting) { 546 480 /* Completion of connect() operation */ 547 481 pj_ssize_t bytes_transfered; 548 482 549 #if (defined(PJ_LINUX) && PJ_LINUX!=0) || \ 550 (defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0) 483 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) 551 484 /* from connect(2): 552 553 554 555 485 * On Linux, use getsockopt to read the SO_ERROR option at 486 * level SOL_SOCKET to determine whether connect() completed 487 * successfully (if SO_ERROR is zero). 488 */ 556 489 int value; 557 490 socklen_t vallen = sizeof(value); … … 590 523 591 524 /* Clear operation. */ 592 h-> op &= (~PJ_IOQUEUE_OP_CONNECT);593 PJ_FD_CLR(h->fd, &ioque ->wfdset);594 PJ_FD_CLR(h->fd, &ioque ->xfdset);525 h->connecting = 0; 526 PJ_FD_CLR(h->fd, &ioqueue->wfdset); 527 PJ_FD_CLR(h->fd, &ioqueue->xfdset); 595 528 596 529 /* Call callback. */ … … 604 537 #endif /* PJ_HAS_TCP */ 605 538 { 606 /* Completion of write(), send(), or sendto() operation. */ 607 608 /* Clear operation. */ 609 h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | 610 PJ_IOQUEUE_OP_SEND_TO); 611 PJ_FD_CLR(h->fd, &ioque->wfdset); 539 /* Socket is writable. */ 540 struct write_operation *write_op; 541 pj_ssize_t sent; 542 pj_status_t send_rc; 543 544 /* Get the first in the queue. */ 545 write_op = h->write_list.next; 546 547 /* Send the data. */ 548 sent = write_op->size - write_op->written; 549 if (write_op->op == PJ_IOQUEUE_OP_SEND) { 550 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, 551 &sent, write_op->flags); 552 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { 553 send_rc = pj_sock_sendto(h->fd, 554 write_op->buf+write_op->written, 555 &sent, write_op->flags, 556 &write_op->rmt_addr, 557 write_op->rmt_addrlen); 558 } else { 559 pj_assert(!"Invalid operation type!"); 560 send_rc = PJ_EBUG; 561 } 562 563 if (send_rc == PJ_SUCCESS) { 564 write_op->written += sent; 565 } else { 566 pj_assert(send_rc > 0); 567 write_op->written = -send_rc; 568 } 569 570 /* In any case we don't need to process this descriptor again. */ 612 571 PJ_FD_CLR(h->fd, &wfdset); 613 572 614 /* Call callback. */ 615 /* All data must have been sent? */ 616 if (h->cb.on_write_complete) 617 (*h->cb.on_write_complete)(h, h->wr_buflen); 618 573 /* Are we finished with this buffer? */ 574 if (send_rc!=PJ_SUCCESS || 575 write_op->written == (pj_ssize_t)write_op->size) 576 { 577 pj_list_erase(write_op); 578 579 /* Clear operation if there's no more data to send. */ 580 if (pj_list_empty(&h->write_list)) 581 PJ_FD_CLR(h->fd, &ioqueue->wfdset); 582 583 /* Call callback. */ 584 if (h->cb.on_write_complete) { 585 (*h->cb.on_write_complete)(h, 586 (pj_ioqueue_op_key_t*)write_op, 587 write_op->written); 588 } 589 } 590 619 591 /* Re-scan writable sockets. */ 620 592 goto do_writable_scan; 621 593 } 622 594 } 595 596 /* Scan for readable socket. */ 597 h = ioqueue->key_list.next; 598 do_readable_scan: 599 for ( ; h!=&ioqueue->key_list; h = h->next) { 600 if ((!pj_list_empty(&h->read_list) 601 #if PJ_HAS_TCP 602 || !pj_list_empty(&h->accept_list) 603 #endif 604 ) && PJ_FD_ISSET(h->fd, &rfdset)) 605 { 606 break; 607 } 608 } 609 if (h != &ioqueue->key_list) { 610 pj_status_t rc; 611 612 #if PJ_HAS_TCP 613 pj_assert(!pj_list_empty(&h->read_list) || 614 !pj_list_empty(&h->accept_list)); 615 #else 616 pj_assert(!pj_list_empty(&h->read_list)); 617 #endif 618 619 # if PJ_HAS_TCP 620 if (!pj_list_empty(&h->accept_list)) { 621 622 struct accept_operation *accept_op; 623 624 /* Get one accept operation from the list. */ 625 accept_op = h->accept_list.next; 626 pj_list_erase(accept_op); 627 628 rc=pj_sock_accept(h->fd, accept_op->accept_fd, 629 accept_op->rmt_addr, accept_op->addrlen); 630 if (rc==PJ_SUCCESS && accept_op->local_addr) { 631 rc = pj_sock_getsockname(*accept_op->accept_fd, 632 accept_op->local_addr, 633 accept_op->addrlen); 634 } 635 636 /* Clear bit in fdset if there is no more pending accept */ 637 if (pj_list_empty(&h->accept_list)) 638 PJ_FD_CLR(h->fd, &ioqueue->rfdset); 639 640 /* Call callback. */ 641 if (h->cb.on_accept_complete) 642 (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, 643 *accept_op->accept_fd, rc); 644 645 /* Re-scan readable sockets. */ 646 goto do_readable_scan; 647 } 648 else { 649 # endif 650 struct read_operation *read_op; 651 pj_ssize_t bytes_read; 652 653 pj_assert(!pj_list_empty(&h->read_list)); 654 655 /* Get one pending read operation from the list. */ 656 read_op = h->read_list.next; 657 pj_list_erase(read_op); 658 659 bytes_read = read_op->size; 660 661 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { 662 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, 663 read_op->rmt_addr, 664 read_op->rmt_addrlen); 665 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { 666 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 667 } else { 668 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); 669 /* 670 * User has specified pj_ioqueue_read(). 671 * On Win32, we should do ReadFile(). But because we got 672 * here because of select() anyway, user must have put a 673 * socket descriptor on h->fd, which in this case we can 674 * just call pj_sock_recv() instead of ReadFile(). 675 * On Unix, user may put a file in h->fd, so we'll have 676 * to call read() here. 677 * This may not compile on systems which doesn't have 678 * read(). That's why we only specify PJ_LINUX here so 679 * that error is easier to catch. 680 */ 681 # if defined(PJ_WIN32) && PJ_WIN32 != 0 682 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 683 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, 684 // &bytes_read, NULL); 685 # elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) 686 bytes_read = read(h->fd, h->rd_buf, bytes_read); 687 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 688 # elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 689 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); 690 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 691 # else 692 # error "Implement read() for this platform!" 693 # endif 694 } 695 696 if (rc != PJ_SUCCESS) { 697 # if defined(PJ_WIN32) && PJ_WIN32 != 0 698 /* On Win32, for UDP, WSAECONNRESET on the receive side 699 * indicates that previous sending has triggered ICMP Port 700 * Unreachable message. 701 * But we wouldn't know at this point which one of previous 702 * key that has triggered the error, since UDP socket can 703 * be shared! 704 * So we'll just ignore it! 705 */ 706 707 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { 708 //PJ_LOG(4,(THIS_FILE, 709 // "Ignored ICMP port unreach. on key=%p", h)); 710 } 711 # endif 712 713 /* In any case we would report this to caller. */ 714 bytes_read = -rc; 715 } 716 717 /* Clear fdset if there is no pending read. */ 718 if (pj_list_empty(&h->read_list)) 719 PJ_FD_CLR(h->fd, &ioqueue->rfdset); 720 721 /* In any case clear from temporary set. */ 722 PJ_FD_CLR(h->fd, &rfdset); 723 724 /* Call callback. */ 725 if (h->cb.on_read_complete) 726 (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, 727 bytes_read); 728 729 /* Re-scan readable sockets. */ 730 goto do_readable_scan; 731 732 } 733 } 734 735 #if PJ_HAS_TCP 736 /* Scan for exception socket for TCP connection error. */ 737 h = ioqueue->key_list.next; 738 do_except_scan: 739 for ( ; h!=&ioqueue->key_list; h = h->next) { 740 if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset)) 741 break; 742 } 743 if (h != &ioqueue->key_list) { 744 745 pj_assert(h->connecting); 746 747 /* Clear operation. */ 748 h->connecting = 0; 749 PJ_FD_CLR(h->fd, &ioqueue->wfdset); 750 PJ_FD_CLR(h->fd, &ioqueue->xfdset); 751 PJ_FD_CLR(h->fd, &wfdset); 752 PJ_FD_CLR(h->fd, &xfdset); 753 754 /* Call callback. */ 755 if (h->cb.on_connect_complete) 756 (*h->cb.on_connect_complete)(h, -1); 757 758 /* Re-scan exception list. */ 759 goto do_except_scan; 760 } 761 #endif /* PJ_HAS_TCP */ 623 762 624 763 /* Shouldn't happen. */ … … 629 768 //count = 0; 630 769 631 pj_lock_release(ioque ->lock);770 pj_lock_release(ioqueue->lock); 632 771 return count; 633 772 } 634 773 635 774 /* 636 * pj_ioqueue_read() 637 * 638 * Start asynchronous read from the descriptor. 639 */ 640 PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, 641 pj_ioqueue_key_t *key, 642 void *buffer, 643 pj_size_t buflen) 644 { 645 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 775 * pj_ioqueue_recv() 776 * 777 * Start asynchronous recv() from the socket. 778 */ 779 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, 780 pj_ioqueue_op_key_t *op_key, 781 void *buffer, 782 pj_ssize_t *length, 783 unsigned flags ) 784 { 785 pj_status_t status; 786 pj_ssize_t size; 787 struct read_operation *read_op; 788 pj_ioqueue_t *ioqueue; 789 790 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 646 791 PJ_CHECK_STACK(); 647 792 648 /* For consistency with other ioqueue implementation, we would reject 649 * if descriptor has already been submitted for reading before. 650 */ 651 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 652 (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 653 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 654 PJ_EBUSY); 655 656 pj_lock_acquire(ioque->lock); 657 658 key->op |= PJ_IOQUEUE_OP_READ; 659 key->rd_flags = 0; 660 key->rd_buf = buffer; 661 key->rd_buflen = buflen; 662 PJ_FD_SET(key->fd, &ioque->rfdset); 663 664 pj_lock_release(ioque->lock); 793 /* Try to see if there's data immediately available. 794 */ 795 size = *length; 796 status = pj_sock_recv(key->fd, buffer, &size, flags); 797 if (status == PJ_SUCCESS) { 798 /* Yes! Data is available! */ 799 *length = size; 800 return PJ_SUCCESS; 801 } else { 802 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 803 * the error to caller. 804 */ 805 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) 806 return status; 807 } 808 809 /* 810 * No data is immediately available. 811 * Must schedule asynchronous operation to the ioqueue. 812 */ 813 ioqueue = key->ioqueue; 814 pj_lock_acquire(ioqueue->lock); 815 816 read_op = (struct read_operation*)op_key; 817 818 read_op->op = PJ_IOQUEUE_OP_RECV; 819 read_op->buf = buffer; 820 read_op->size = *length; 821 read_op->flags = flags; 822 823 pj_list_insert_before(&key->read_list, read_op); 824 PJ_FD_SET(key->fd, &ioqueue->rfdset); 825 826 pj_lock_release(ioqueue->lock); 665 827 return PJ_EPENDING; 666 828 } 667 829 668 669 /*670 * pj_ioqueue_recv()671 *672 * Start asynchronous recv() from the socket.673 */674 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,675 pj_ioqueue_key_t *key,676 void *buffer,677 pj_size_t buflen,678 unsigned flags )679 {680 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);681 PJ_CHECK_STACK();682 683 /* For consistency with other ioqueue implementation, we would reject684 * if descriptor has already been submitted for reading before.685 */686 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&687 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&688 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),689 PJ_EBUSY);690 691 pj_lock_acquire(ioque->lock);692 693 key->op |= PJ_IOQUEUE_OP_RECV;694 key->rd_buf = buffer;695 key->rd_buflen = buflen;696 key->rd_flags = flags;697 PJ_FD_SET(key->fd, &ioque->rfdset);698 699 pj_lock_release(ioque->lock);700 return PJ_EPENDING;701 }702 703 830 /* 704 831 * pj_ioqueue_recvfrom() … … 706 833 * Start asynchronous recvfrom() from the socket. 707 834 */ 708 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_ t *ioque,709 pj_ioqueue_key_t *key,835 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 836 pj_ioqueue_op_key_t *op_key, 710 837 void *buffer, 711 pj_s ize_t buflen,838 pj_ssize_t *length, 712 839 unsigned flags, 713 840 pj_sockaddr_t *addr, 714 841 int *addrlen) 715 842 { 716 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); 843 pj_status_t status; 844 pj_ssize_t size; 845 struct read_operation *read_op; 846 pj_ioqueue_t *ioqueue; 847 848 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 717 849 PJ_CHECK_STACK(); 718 850 719 /* For consistency with other ioqueue implementation, we would reject 720 * if descriptor has already been submitted for reading before. 721 */ 722 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && 723 (key->op & PJ_IOQUEUE_OP_RECV) == 0 && 724 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), 725 PJ_EBUSY); 726 727 pj_lock_acquire(ioque->lock); 728 729 key->op |= PJ_IOQUEUE_OP_RECV_FROM; 730 key->rd_buf = buffer; 731 key->rd_buflen = buflen; 732 key->rd_flags = flags; 733 key->rmt_addr = addr; 734 key->rmt_addrlen = addrlen; 735 PJ_FD_SET(key->fd, &ioque->rfdset); 736 737 pj_lock_release(ioque->lock); 851 /* Try to see if there's data immediately available. 852 */ 853 size = *length; 854 status = pj_sock_recvfrom(key->fd, buffer, &size, flags, 855 addr, addrlen); 856 if (status == PJ_SUCCESS) { 857 /* Yes! Data is available! */ 858 *length = size; 859 return PJ_SUCCESS; 860 } else { 861 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 862 * the error to caller. 863 */ 864 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) 865 return status; 866 } 867 868 /* 869 * No data is immediately available. 870 * Must schedule asynchronous operation to the ioqueue. 871 */ 872 ioqueue = key->ioqueue; 873 pj_lock_acquire(ioqueue->lock); 874 875 read_op = (struct read_operation*)op_key; 876 877 read_op->op = PJ_IOQUEUE_OP_RECV_FROM; 878 read_op->buf = buffer; 879 read_op->size = *length; 880 read_op->flags = flags; 881 read_op->rmt_addr = addr; 882 read_op->rmt_addrlen = addrlen; 883 884 pj_list_insert_before(&key->read_list, read_op); 885 PJ_FD_SET(key->fd, &ioqueue->rfdset); 886 887 pj_lock_release(ioqueue->lock); 738 888 return PJ_EPENDING; 739 889 } 740 890 741 891 /* 742 * pj_ioqueue_write() 892 * pj_ioqueue_send() 893 * 894 * Start asynchronous send() to the descriptor. 895 */ 896 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 897 pj_ioqueue_op_key_t *op_key, 898 const void *data, 899 pj_ssize_t *length, 900 unsigned flags) 901 { 902 pj_ioqueue_t *ioqueue; 903 struct write_operation *write_op; 904 pj_status_t status; 905 pj_ssize_t sent; 906 907 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 908 PJ_CHECK_STACK(); 909 910 /* Fast track: 911 * Try to send data immediately, only if there's no pending write! 912 * Note: 913 * We are speculating that the list is empty here without properly 914 * acquiring ioqueue's mutex first. This is intentional, to maximize 915 * performance via parallelism. 916 * 917 * This should be safe, because: 918 * - by convention, we require caller to make sure that the 919 * key is not unregistered while other threads are invoking 920 * an operation on the same key. 921 * - pj_list_empty() is safe to be invoked by multiple threads, 922 * even when other threads are modifying the list. 923 */ 924 if (pj_list_empty(&key->write_list)) { 925 /* 926 * See if data can be sent immediately. 927 */ 928 sent = *length; 929 status = pj_sock_send(key->fd, data, &sent, flags); 930 if (status == PJ_SUCCESS) { 931 /* Success! */ 932 *length = sent; 933 return PJ_SUCCESS; 934 } else { 935 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 936 * the error to caller. 937 */ 938 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 939 return status; 940 } 941 } 942 } 943 944 /* 945 * Schedule asynchronous send. 946 */ 947 ioqueue = key->ioqueue; 948 pj_lock_acquire(ioqueue->lock); 949 950 write_op = (struct write_operation*)op_key; 951 write_op->op = PJ_IOQUEUE_OP_SEND; 952 write_op->buf = NULL; 953 write_op->size = *length; 954 write_op->written = 0; 955 write_op->flags = flags; 956 957 pj_list_insert_before(&key->write_list, write_op); 958 PJ_FD_SET(key->fd, &ioqueue->wfdset); 959 960 pj_lock_release(ioqueue->lock); 961 962 return PJ_EPENDING; 963 } 964 965 966 /* 967 * pj_ioqueue_sendto() 743 968 * 744 969 * Start asynchronous write() to the descriptor. 745 970 */ 746 PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, 747 pj_ioqueue_key_t *key, 748 const void *data, 749 pj_size_t datalen) 750 { 751 pj_status_t rc; 752 pj_ssize_t sent; 753 754 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 755 PJ_CHECK_STACK(); 756 757 /* For consistency with other ioqueue implementation, we would reject 758 * if descriptor has already been submitted for writing before. 759 */ 760 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 761 (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 762 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 763 PJ_EBUSY); 764 765 sent = datalen; 766 /* sent would be -1 after pj_sock_send() if it returns error. */ 767 rc = pj_sock_send(key->fd, data, &sent, 0); 768 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 769 return rc; 770 } 771 772 pj_lock_acquire(ioque->lock); 773 774 key->op |= PJ_IOQUEUE_OP_WRITE; 775 key->wr_buf = NULL; 776 key->wr_buflen = datalen; 777 PJ_FD_SET(key->fd, &ioque->wfdset); 778 779 pj_lock_release(ioque->lock); 780 781 return PJ_EPENDING; 782 } 783 784 /* 785 * pj_ioqueue_send() 786 * 787 * Start asynchronous send() to the descriptor. 788 */ 789 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, 790 pj_ioqueue_key_t *key, 791 const void *data, 792 pj_size_t datalen, 793 unsigned flags) 794 { 795 pj_status_t rc; 796 pj_ssize_t sent; 797 798 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); 799 PJ_CHECK_STACK(); 800 801 /* For consistency with other ioqueue implementation, we would reject 802 * if descriptor has already been submitted for writing before. 803 */ 804 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 805 (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 806 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 807 PJ_EBUSY); 808 809 sent = datalen; 810 /* sent would be -1 after pj_sock_send() if it returns error. */ 811 rc = pj_sock_send(key->fd, data, &sent, flags); 812 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 813 return rc; 814 } 815 816 pj_lock_acquire(ioque->lock); 817 818 key->op |= PJ_IOQUEUE_OP_SEND; 819 key->wr_buf = NULL; 820 key->wr_buflen = datalen; 821 PJ_FD_SET(key->fd, &ioque->wfdset); 822 823 pj_lock_release(ioque->lock); 824 825 return PJ_EPENDING; 826 } 827 828 829 /* 830 * pj_ioqueue_sendto() 831 * 832 * Start asynchronous write() to the descriptor. 833 */ 834 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, 835 pj_ioqueue_key_t *key, 971 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 972 pj_ioqueue_op_key_t *op_key, 836 973 const void *data, 837 pj_s ize_t datalen,974 pj_ssize_t *length, 838 975 unsigned flags, 839 976 const pj_sockaddr_t *addr, 840 977 int addrlen) 841 978 { 842 pj_status_t rc; 979 pj_ioqueue_t *ioqueue; 980 struct write_operation *write_op; 981 pj_status_t status; 843 982 pj_ssize_t sent; 844 983 845 PJ_ASSERT_RETURN( ioque && key && data, PJ_EINVAL);984 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 846 985 PJ_CHECK_STACK(); 847 986 848 /* For consistency with other ioqueue implementation, we would reject 849 * if descriptor has already been submitted for writing before. 850 */ 851 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && 852 (key->op & PJ_IOQUEUE_OP_SEND) == 0 && 853 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), 854 PJ_EBUSY); 855 856 sent = datalen; 857 /* sent would be -1 after pj_sock_sendto() if it returns error. */ 858 rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); 859 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { 860 return rc; 861 } 862 863 pj_lock_acquire(ioque->lock); 864 865 key->op |= PJ_IOQUEUE_OP_SEND_TO; 866 key->wr_buf = NULL; 867 key->wr_buflen = datalen; 868 PJ_FD_SET(key->fd, &ioque->wfdset); 869 870 pj_lock_release(ioque->lock); 987 /* Fast track: 988 * Try to send data immediately, only if there's no pending write! 989 * Note: 990 * We are speculating that the list is empty here without properly 991 * acquiring ioqueue's mutex first. This is intentional, to maximize 992 * performance via parallelism. 993 * 994 * This should be safe, because: 995 * - by convention, we require caller to make sure that the 996 * key is not unregistered while other threads are invoking 997 * an operation on the same key. 998 * - pj_list_empty() is safe to be invoked by multiple threads, 999 * even when other threads are modifying the list. 1000 */ 1001 if (pj_list_empty(&key->write_list)) { 1002 /* 1003 * See if data can be sent immediately. 1004 */ 1005 sent = *length; 1006 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); 1007 if (status == PJ_SUCCESS) { 1008 /* Success! */ 1009 *length = sent; 1010 return PJ_SUCCESS; 1011 } else { 1012 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 1013 * the error to caller. 1014 */ 1015 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 1016 return status; 1017 } 1018 } 1019 } 1020 1021 /* 1022 * Check that address storage can hold the address parameter. 1023 */ 1024 PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG); 1025 1026 /* 1027 * Schedule asynchronous send. 1028 */ 1029 ioqueue = key->ioqueue; 1030 pj_lock_acquire(ioqueue->lock); 1031 1032 write_op = (struct write_operation*)op_key; 1033 write_op->op = PJ_IOQUEUE_OP_SEND_TO; 1034 write_op->buf = NULL; 1035 write_op->size = *length; 1036 write_op->written = 0; 1037 write_op->flags = flags; 1038 pj_memcpy(&write_op->rmt_addr, addr, addrlen); 1039 write_op->rmt_addrlen = addrlen; 1040 1041 pj_list_insert_before(&key->write_list, write_op); 1042 PJ_FD_SET(key->fd, &ioqueue->wfdset); 1043 1044 pj_lock_release(ioqueue->lock); 1045 871 1046 return PJ_EPENDING; 872 1047 } … … 876 1051 * Initiate overlapped accept() operation. 877 1052 */ 878 PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, 879 pj_ioqueue_key_t *key, 880 pj_sock_t *new_sock, 881 pj_sockaddr_t *local, 882 pj_sockaddr_t *remote, 883 int *addrlen) 884 { 1053 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 1054 pj_ioqueue_op_key_t *op_key, 1055 pj_sock_t *new_sock, 1056 pj_sockaddr_t *local, 1057 pj_sockaddr_t *remote, 1058 int *addrlen) 1059 { 1060 pj_ioqueue_t *ioqueue; 1061 struct accept_operation *accept_op; 1062 pj_status_t status; 1063 885 1064 /* check parameters. All must be specified! */ 886 pj_assert(ioqueue && key && new_sock); 887 888 /* Server socket must have no other operation! */ 889 pj_assert(key->op == 0); 890 1065 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 1066 1067 /* Fast track: 1068 * See if there's new connection available immediately. 1069 */ 1070 if (pj_list_empty(&key->accept_list)) { 1071 status = pj_sock_accept(key->fd, new_sock, remote, addrlen); 1072 if (status == PJ_SUCCESS) { 1073 /* Yes! New connection is available! */ 1074 if (local && addrlen) { 1075 status = pj_sock_getsockname(*new_sock, local, addrlen); 1076 if (status != PJ_SUCCESS) { 1077 pj_sock_close(*new_sock); 1078 *new_sock = PJ_INVALID_SOCKET; 1079 return status; 1080 } 1081 } 1082 return PJ_SUCCESS; 1083 } else { 1084 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report 1085 * the error to caller. 1086 */ 1087 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { 1088 return status; 1089 } 1090 } 1091 } 1092 1093 /* 1094 * No connection is available immediately. 1095 * Schedule accept() operation to be completed when there is incoming 1096 * connection available. 1097 */ 1098 ioqueue = key->ioqueue; 1099 accept_op = (struct accept_operation*)op_key; 1100 891 1101 pj_lock_acquire(ioqueue->lock); 892 1102 893 key->op = PJ_IOQUEUE_OP_ACCEPT;894 key->accept_fd = new_sock;895 key->rmt_addr = remote;896 key->rmt_addrlen= addrlen;897 key->local_addr = local;898 key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */ 899 1103 accept_op->op = PJ_IOQUEUE_OP_ACCEPT; 1104 accept_op->accept_fd = new_sock; 1105 accept_op->rmt_addr = remote; 1106 accept_op->addrlen= addrlen; 1107 accept_op->local_addr = local; 1108 1109 pj_list_insert_before(&key->accept_list, accept_op); 900 1110 PJ_FD_SET(key->fd, &ioqueue->rfdset); 901 1111 902 1112 pj_lock_release(ioqueue->lock); 1113 903 1114 return PJ_EPENDING; 904 1115 } … … 908 1119 * since there's no overlapped version of connect()). 909 1120 */ 910 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, 911 pj_ioqueue_key_t *key, 1121 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 912 1122 const pj_sockaddr_t *addr, 913 1123 int addrlen ) 914 1124 { 915 pj_status_t rc; 1125 pj_ioqueue_t *ioqueue; 1126 pj_status_t status; 916 1127 917 1128 /* check parameters. All must be specified! */ 918 PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL); 919 920 /* Connecting socket must have no other operation! */ 921 PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY); 1129 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 1130 1131 /* Check if socket has not been marked for connecting */ 1132 if (key->connecting != 0) 1133 return PJ_EPENDING; 922 1134 923 rc= pj_sock_connect(key->fd, addr, addrlen);924 if ( rc== PJ_SUCCESS) {1135 status = pj_sock_connect(key->fd, addr, addrlen); 1136 if (status == PJ_SUCCESS) { 925 1137 /* Connected! */ 926 1138 return PJ_SUCCESS; 927 1139 } else { 928 if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || 929 rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) 930 { 1140 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { 931 1141 /* Pending! */ 1142 ioqueue = key->ioqueue; 932 1143 pj_lock_acquire(ioqueue->lock); 933 key-> op = PJ_IOQUEUE_OP_CONNECT;1144 key->connecting = PJ_TRUE; 934 1145 PJ_FD_SET(key->fd, &ioqueue->wfdset); 935 1146 PJ_FD_SET(key->fd, &ioqueue->xfdset); … … 938 1149 } else { 939 1150 /* Error! */ 940 return rc;1151 return status; 941 1152 } 942 1153 }
Note: See TracChangeset
for help on using the changeset viewer.