Changeset 12
- Timestamp:
- Nov 6, 2005 1:32:11 PM (19 years ago)
- Location:
- pjproject/main/pjlib
- Files:
-
- 2 added
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/main/pjlib/build/pjlib.dsp
r11 r12 207 207 # Begin Source File 208 208 209 SOURCE=..\src\pj\ioqueue_common_abs.c 210 # PROP Exclude_From_Build 1 211 # End Source File 212 # Begin Source File 213 214 SOURCE=..\src\pj\ioqueue_common_abs.h 215 # End Source File 216 # Begin Source File 217 209 218 SOURCE=..\src\pj\ioqueue_select.c 210 211 !IF "$(CFG)" == "pjlib - Win32 Release"212 213 # PROP Exclude_From_Build 1214 215 !ELSEIF "$(CFG)" == "pjlib - Win32 Debug"216 217 !ENDIF218 219 219 # End Source File 220 220 # Begin Source File 221 221 222 222 SOURCE=..\src\pj\ioqueue_winnt.c 223 224 !IF "$(CFG)" == "pjlib - Win32 Release" 225 226 !ELSEIF "$(CFG)" == "pjlib - Win32 Debug" 227 228 # PROP Exclude_From_Build 1 229 230 !ENDIF 231 223 # PROP Exclude_From_Build 1 232 224 # End Source File 233 225 # Begin Source File -
pjproject/main/pjlib/include/pj/ioqueue.h
r11 r12 83 83 * The items below describe rules that must be obeyed when using the I/O 84 84 * queue, with regard to concurrency: 85 * - in general, the I/O queue is thread safe (assuming the lock strategy 86 * is not changed to disable mutex protection). All operations, except 87 * unregistration which is described below, can be safely invoked 88 * simultaneously by multiple threads. 89 * - however, <b>care must be taken when unregistering a key</b> from the 85 * - simultaneous operations (by different threads) to different key is safe. 86 * - simultaneous operations to the same key is also safe, except 87 * <b>unregistration</b>, which is described below. 88 * - <b>care must be taken when unregistering a key</b> from the 90 89 * ioqueue. Application must take care that when one thread is issuing 91 90 * an unregistration, other thread is not simultaneously invoking an … … 207 206 208 207 /** 209 * Indicates that the I/O Queue should be created to handle reasonable 210 * number of threads. 211 */ 212 #define PJ_IOQUEUE_DEFAULT_THREADS 0 208 * This macro specifies the maximum number of events that can be 209 * processed by the ioqueue on a single poll cycle, on implementation 210 * that supports it. The value is only meaningfull when specified 211 * during PJLIB build. 212 */ 213 #ifndef PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL 214 # define PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL (16) 215 #endif 216 213 217 214 218 /** -
pjproject/main/pjlib/include/pj/sock.h
r4 r12 70 70 71 71 /** 72 * Socket level specified in #pj_sock_setsockopt() .72 * Socket level specified in #pj_sock_setsockopt() or #pj_sock_getsockopt(). 73 73 * APPLICATION MUST USE THESE VALUES INSTEAD OF NORMAL SOL_*, BECAUSE 74 74 * THE LIBRARY WILL TRANSLATE THE VALUE TO THE NATIVE VALUE. … … 79 79 extern const pj_uint16_t PJ_SOL_UDP; /**< UDP level. */ 80 80 extern const pj_uint16_t PJ_SOL_IPV6; /**< IP version 6 */ 81 82 /** 83 * Values to be specified as \c optname when calling #pj_sock_setsockopt() 84 * or #pj_sock_getsockopt(). 85 */ 86 extern const pj_uint16_t PJ_SO_TYPE; /**< Socket type. */ 87 extern const pj_uint16_t PJ_SO_RCVBUF; /**< Buffer size for receive. */ 88 extern const pj_uint16_t PJ_SO_SNDBUF; /**< Buffer size for send. */ 89 81 90 82 91 /** … … 540 549 * @param sockfd The socket descriptor. 541 550 * @param level The level which to get the option from. 542 * @param optname The option name, which will be passed uninterpreted 543 * by the library. 551 * @param optname The option name. 544 552 * @param optval Identifies the buffer which the value will be 545 553 * returned. … … 550 558 */ 551 559 PJ_DECL(pj_status_t) pj_sock_getsockopt( pj_sock_t sockfd, 552 int level,553 int optname,560 pj_uint16_t level, 561 pj_uint16_t optname, 554 562 void *optval, 555 563 int *optlen); … … 561 569 * @param sockfd The socket descriptor. 562 570 * @param level The level which to get the option from. 563 * @param optname The option name, which will be passed uninterpreted 564 * by the library. 571 * @param optname The option name. 565 572 * @param optval Identifies the buffer which contain the value. 566 573 * @param optlen The length of the value. … … 569 576 */ 570 577 PJ_DECL(pj_status_t) pj_sock_setsockopt( pj_sock_t sockfd, 571 int level,572 int optname,578 pj_uint16_t level, 579 pj_uint16_t optname, 573 580 const void *optval, 574 581 int optlen); -
pjproject/main/pjlib/src/pj/ioqueue_select.c
r11 r12 23 23 24 24 /* 25 * Include declaration from common abstraction. 26 */ 27 #include "ioqueue_common_abs.h" 28 29 /* 25 30 * ISSUES with ioqueue_select() 26 31 * … … 59 64 60 65 61 62 63 66 /* 64 67 * During debugging build, VALIDATE_FD_SET is set. … … 71 74 #endif 72 75 73 struct generic_operation 74 { 75 PJ_DECL_LIST_MEMBER(struct generic_operation); 76 pj_ioqueue_operation_e op; 76 /* 77 * This describes each key. 78 */ 79 struct pj_ioqueue_key_t 80 { 81 DECLARE_COMMON_KEY 77 82 }; 78 83 79 struct read_operation80 {81 PJ_DECL_LIST_MEMBER(struct read_operation);82 pj_ioqueue_operation_e op;83 84 void *buf;85 pj_size_t size;86 unsigned flags;87 pj_sockaddr_t *rmt_addr;88 int *rmt_addrlen;89 };90 91 struct write_operation92 {93 PJ_DECL_LIST_MEMBER(struct write_operation);94 pj_ioqueue_operation_e op;95 96 char *buf;97 pj_size_t size;98 pj_ssize_t written;99 unsigned flags;100 pj_sockaddr_in rmt_addr;101 int rmt_addrlen;102 };103 104 #if PJ_HAS_TCP105 struct accept_operation106 {107 PJ_DECL_LIST_MEMBER(struct accept_operation);108 pj_ioqueue_operation_e op;109 110 pj_sock_t *accept_fd;111 pj_sockaddr_t *local_addr;112 pj_sockaddr_t *rmt_addr;113 int *addrlen;114 };115 #endif116 117 union operation_key118 {119 struct generic_operation generic;120 struct read_operation read;121 struct write_operation write;122 #if PJ_HAS_TCP123 struct accept_operation accept;124 #endif125 };126 127 /*128 * This describes each key.129 */130 struct pj_ioqueue_key_t131 {132 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);133 pj_ioqueue_t *ioqueue;134 pj_sock_t fd;135 void *user_data;136 pj_ioqueue_callback cb;137 int connecting;138 struct read_operation read_list;139 struct write_operation write_list;140 #if PJ_HAS_TCP141 struct accept_operation accept_list;142 #endif143 };144 145 84 /* 146 85 * This describes the I/O queue itself. … … 148 87 struct pj_ioqueue_t 149 88 { 150 pj_lock_t *lock;151 pj_bool_t auto_delete_lock; 89 DECLARE_COMMON_IOQUEUE 90 152 91 unsigned max, count; 153 92 pj_ioqueue_key_t key_list; … … 159 98 }; 160 99 100 /* Include implementation for common abstraction after we declare 101 * pj_ioqueue_key_t and pj_ioqueue_t. 102 */ 103 #include "ioqueue_common_abs.c" 104 161 105 /* 162 106 * pj_ioqueue_create() … … 169 113 { 170 114 pj_ioqueue_t *ioqueue; 115 pj_lock_t *lock; 171 116 pj_status_t rc; 172 117 … … 181 126 182 127 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 128 129 ioqueue_init(ioqueue); 130 183 131 ioqueue->max = max_fd; 184 132 ioqueue->count = 0; … … 190 138 pj_list_init(&ioqueue->key_list); 191 139 192 rc = pj_lock_create_ recursive_mutex(pool, "ioq%p", &ioqueue->lock);140 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); 193 141 if (rc != PJ_SUCCESS) 194 142 return rc; 195 143 196 ioqueue->auto_delete_lock = PJ_TRUE; 144 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); 145 if (rc != PJ_SUCCESS) 146 return rc; 197 147 198 148 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); … … 209 159 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 210 160 { 211 pj_status_t rc = PJ_SUCCESS;212 213 161 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 214 162 215 163 pj_lock_acquire(ioqueue->lock); 216 217 if (ioqueue->auto_delete_lock) 218 rc = pj_lock_destroy(ioqueue->lock); 219 220 return rc; 164 return ioqueue_destroy(ioqueue); 221 165 } 222 166 … … 261 205 /* Create key. */ 262 206 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 263 key->ioqueue = ioqueue; 264 key->fd = sock; 265 key->user_data = user_data; 266 pj_list_init(&key->read_list); 267 pj_list_init(&key->write_list); 268 #if PJ_HAS_TCP 269 pj_list_init(&key->accept_list); 270 #endif 271 272 /* Save callback. */ 273 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 207 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 208 if (rc != PJ_SUCCESS) 209 return rc; 274 210 275 211 /* Register */ … … 309 245 #endif 310 246 311 pj_lock_release(ioqueue->lock); 312 return PJ_SUCCESS; 313 } 314 315 /* 316 * pj_ioqueue_get_user_data() 317 * 318 * Obtain value associated with a key. 319 */ 320 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) 321 { 322 PJ_ASSERT_RETURN(key != NULL, NULL); 323 return key->user_data; 324 } 325 326 327 /* 328 * pj_ioqueue_set_user_data() 329 */ 330 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 331 void *user_data, 332 void **old_data) 333 { 334 PJ_ASSERT_RETURN(key, PJ_EINVAL); 335 336 if (old_data) 337 *old_data = key->user_data; 338 key->user_data = user_data; 247 /* ioqueue_destroy may try to acquire key's mutex. 248 * Since normally the order of locking is to lock key's mutex first 249 * then ioqueue's mutex, ioqueue_destroy may deadlock unless we 250 * release ioqueue's mutex first. 251 */ 252 pj_lock_release(ioqueue->lock); 253 254 /* Destroy the key. */ 255 ioqueue_destroy_key(key); 339 256 340 257 return PJ_SUCCESS; … … 393 310 394 311 312 /* ioqueue_remove_from_set() 313 * This function is called from ioqueue_dispatch_event() to instruct 314 * the ioqueue to remove the specified descriptor from ioqueue's descriptor 315 * set for the specified event. 316 */ 317 static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, 318 pj_sock_t fd, 319 enum ioqueue_event_type event_type) 320 { 321 pj_lock_acquire(ioqueue->lock); 322 323 if (event_type == READABLE_EVENT) 324 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); 325 else if (event_type == WRITEABLE_EVENT) 326 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); 327 else if (event_type == EXCEPTION_EVENT) 328 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); 329 else 330 pj_assert(0); 331 332 pj_lock_release(ioqueue->lock); 333 } 334 335 /* 336 * ioqueue_add_to_set() 337 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc 338 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor 339 * set for the specified event. 340 */ 341 static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, 342 pj_sock_t fd, 343 enum ioqueue_event_type event_type ) 344 { 345 pj_lock_acquire(ioqueue->lock); 346 347 if (event_type == READABLE_EVENT) 348 PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); 349 else if (event_type == WRITEABLE_EVENT) 350 PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); 351 else if (event_type == EXCEPTION_EVENT) 352 PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); 353 else 354 pj_assert(0); 355 356 pj_lock_release(ioqueue->lock); 357 } 358 395 359 /* 396 360 * pj_ioqueue_poll() … … 413 377 { 414 378 pj_fd_set_t rfdset, wfdset, xfdset; 415 int count ;379 int count, counter; 416 380 pj_ioqueue_key_t *h; 381 struct event 382 { 383 pj_ioqueue_key_t *key; 384 enum event_type event_type; 385 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 417 386 418 387 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); … … 454 423 if (count <= 0) 455 424 return count; 456 457 /* Lock ioqueue again before scanning for signalled sockets. 458 * We must strictly use recursive mutex since application may invoke 459 * the ioqueue again inside the callback. 425 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) 426 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; 427 428 /* Scan descriptor sets for event and add the events in the event 429 * array to be processed later in this function. We do this so that 430 * events can be processed in parallel without holding ioqueue lock. 460 431 */ 461 432 pj_lock_acquire(ioqueue->lock); 433 434 counter = 0; 462 435 463 436 /* Scan for writable sockets first to handle piggy-back data … … 465 438 */ 466 439 h = ioqueue->key_list.next; 467 do_writable_scan: 468 for ( ; h!=&ioqueue->key_list; h = h->next) { 469 if ( (!pj_list_empty(&h->write_list) || h->connecting) 440 for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { 441 if ( (key_has_pending_write(h) || key_has_pending_connect(h)) 470 442 && PJ_FD_ISSET(h->fd, &wfdset)) 471 443 { 472 break; 444 event[counter].key = h; 445 event[counter].event_type = WRITEABLE_EVENT; 446 ++counter; 473 447 } 474 } 475 if (h != &ioqueue->key_list) { 476 pj_assert(!pj_list_empty(&h->write_list) || h->connecting); 477 478 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 479 if (h->connecting) { 480 /* Completion of connect() operation */ 481 pj_ssize_t bytes_transfered; 482 483 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) 484 /* from connect(2): 485 * On Linux, use getsockopt to read the SO_ERROR option at 486 * level SOL_SOCKET to determine whether connect() completed 487 * successfully (if SO_ERROR is zero). 488 */ 489 int value; 490 socklen_t vallen = sizeof(value); 491 int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 492 &value, &vallen); 493 if (gs_rc != 0) { 494 /* Argh!! What to do now??? 495 * Just indicate that the socket is connected. The 496 * application will get error as soon as it tries to use 497 * the socket to send/receive. 498 */ 499 bytes_transfered = 0; 500 } else { 501 bytes_transfered = value; 502 } 503 #elif defined(PJ_WIN32) && PJ_WIN32!=0 504 bytes_transfered = 0; /* success */ 505 #else 506 /* Excellent information in D.J. Bernstein page: 507 * http://cr.yp.to/docs/connect.html 508 * 509 * Seems like the most portable way of detecting connect() 510 * failure is to call getpeername(). If socket is connected, 511 * getpeername() will return 0. If the socket is not connected, 512 * it will return ENOTCONN, and read(fd, &ch, 1) will produce 513 * the right errno through error slippage. This is a combination 514 * of suggestions from Douglas C. Schmidt and Ken Keys. 515 */ 516 int gp_rc; 517 struct sockaddr_in addr; 518 socklen_t addrlen = sizeof(addr); 519 520 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen); 521 bytes_transfered = gp_rc; 522 #endif 523 524 /* Clear operation. */ 525 h->connecting = 0; 526 PJ_FD_CLR(h->fd, &ioqueue->wfdset); 527 PJ_FD_CLR(h->fd, &ioqueue->xfdset); 528 529 /* Call callback. */ 530 if (h->cb.on_connect_complete) 531 (*h->cb.on_connect_complete)(h, bytes_transfered); 532 533 /* Re-scan writable sockets. */ 534 goto do_writable_scan; 535 536 } else 537 #endif /* PJ_HAS_TCP */ 538 { 539 /* Socket is writable. */ 540 struct write_operation *write_op; 541 pj_ssize_t sent; 542 pj_status_t send_rc; 543 544 /* Get the first in the queue. */ 545 write_op = h->write_list.next; 546 547 /* Send the data. */ 548 sent = write_op->size - write_op->written; 549 if (write_op->op == PJ_IOQUEUE_OP_SEND) { 550 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, 551 &sent, write_op->flags); 552 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { 553 send_rc = pj_sock_sendto(h->fd, 554 write_op->buf+write_op->written, 555 &sent, write_op->flags, 556 &write_op->rmt_addr, 557 write_op->rmt_addrlen); 558 } else { 559 pj_assert(!"Invalid operation type!"); 560 send_rc = PJ_EBUG; 561 } 562 563 if (send_rc == PJ_SUCCESS) { 564 write_op->written += sent; 565 } else { 566 pj_assert(send_rc > 0); 567 write_op->written = -send_rc; 568 } 569 570 /* In any case we don't need to process this descriptor again. */ 571 PJ_FD_CLR(h->fd, &wfdset); 572 573 /* Are we finished with this buffer? */ 574 if (send_rc!=PJ_SUCCESS || 575 write_op->written == (pj_ssize_t)write_op->size) 576 { 577 pj_list_erase(write_op); 578 579 /* Clear operation if there's no more data to send. */ 580 if (pj_list_empty(&h->write_list)) 581 PJ_FD_CLR(h->fd, &ioqueue->wfdset); 582 583 /* Call callback. */ 584 if (h->cb.on_write_complete) { 585 (*h->cb.on_write_complete)(h, 586 (pj_ioqueue_op_key_t*)write_op, 587 write_op->written); 588 } 589 } 590 591 /* Re-scan writable sockets. */ 592 goto do_writable_scan; 593 } 594 } 595 596 /* Scan for readable socket. */ 597 h = ioqueue->key_list.next; 598 do_readable_scan: 599 for ( ; h!=&ioqueue->key_list; h = h->next) { 600 if ((!pj_list_empty(&h->read_list) 448 449 /* Scan for readable socket. */ 450 if ((key_has_pending_read(h) || key_has_pending_accept(h)) 451 && PJ_FD_ISSET(h->fd, &rfdset)) 452 { 453 event[counter].key = h; 454 event[counter].event_type = READABLE_EVENT; 455 ++counter; } 456 601 457 #if PJ_HAS_TCP 602 || !pj_list_empty(&h->accept_list) 603 #endif 604 ) && PJ_FD_ISSET(h->fd, &rfdset)) 605 { 606 break; 458 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { 459 event[counter].key = h; 460 event[counter].event_type = EXCEPTION_EVENT; 461 ++counter; 607 462 } 608 } 609 if (h != &ioqueue->key_list) { 610 pj_status_t rc; 611 612 #if PJ_HAS_TCP 613 pj_assert(!pj_list_empty(&h->read_list) || 614 !pj_list_empty(&h->accept_list)); 615 #else 616 pj_assert(!pj_list_empty(&h->read_list)); 617 #endif 618 619 # if PJ_HAS_TCP 620 if (!pj_list_empty(&h->accept_list)) { 621 622 struct accept_operation *accept_op; 623 624 /* Get one accept operation from the list. */ 625 accept_op = h->accept_list.next; 626 pj_list_erase(accept_op); 627 628 rc=pj_sock_accept(h->fd, accept_op->accept_fd, 629 accept_op->rmt_addr, accept_op->addrlen); 630 if (rc==PJ_SUCCESS && accept_op->local_addr) { 631 rc = pj_sock_getsockname(*accept_op->accept_fd, 632 accept_op->local_addr, 633 accept_op->addrlen); 634 } 635 636 /* Clear bit in fdset if there is no more pending accept */ 637 if (pj_list_empty(&h->accept_list)) 638 PJ_FD_CLR(h->fd, &ioqueue->rfdset); 639 640 /* Call callback. */ 641 if (h->cb.on_accept_complete) 642 (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, 643 *accept_op->accept_fd, rc); 644 645 /* Re-scan readable sockets. */ 646 goto do_readable_scan; 463 #endif 464 } 465 466 pj_lock_release(ioqueue->lock); 467 468 count = counter; 469 470 /* Now process all events. The dispatch functions will take care 471 * of locking in each of the key 472 */ 473 for (counter=0; counter<count; ++counter) { 474 switch (event[counter].event_type) { 475 case READABLE_EVENT: 476 ioqueue_dispatch_read_event(ioqueue, event[counter].key); 477 break; 478 case WRITEABLE_EVENT: 479 ioqueue_dispatch_write_event(ioqueue, event[counter].key); 480 break; 481 case EXCEPTION_EVENT: 482 ioqueue_dispatch_exception_event(ioqueue, event[counter].key); 483 break; 484 case NO_EVENT: 485 default: 486 pj_assert(!"Invalid event!"); 487 break; 647 488 } 648 else { 649 # endif 650 struct read_operation *read_op; 651 pj_ssize_t bytes_read; 652 653 pj_assert(!pj_list_empty(&h->read_list)); 654 655 /* Get one pending read operation from the list. */ 656 read_op = h->read_list.next; 657 pj_list_erase(read_op); 658 659 bytes_read = read_op->size; 660 661 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { 662 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, 663 read_op->rmt_addr, 664 read_op->rmt_addrlen); 665 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { 666 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 667 } else { 668 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); 669 /* 670 * User has specified pj_ioqueue_read(). 671 * On Win32, we should do ReadFile(). But because we got 672 * here because of select() anyway, user must have put a 673 * socket descriptor on h->fd, which in this case we can 674 * just call pj_sock_recv() instead of ReadFile(). 675 * On Unix, user may put a file in h->fd, so we'll have 676 * to call read() here. 677 * This may not compile on systems which doesn't have 678 * read(). That's why we only specify PJ_LINUX here so 679 * that error is easier to catch. 680 */ 681 # if defined(PJ_WIN32) && PJ_WIN32 != 0 682 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); 683 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, 684 // &bytes_read, NULL); 685 # elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) 686 bytes_read = read(h->fd, h->rd_buf, bytes_read); 687 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 688 # elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 689 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); 690 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 691 # else 692 # error "Implement read() for this platform!" 693 # endif 694 } 695 696 if (rc != PJ_SUCCESS) { 697 # if defined(PJ_WIN32) && PJ_WIN32 != 0 698 /* On Win32, for UDP, WSAECONNRESET on the receive side 699 * indicates that previous sending has triggered ICMP Port 700 * Unreachable message. 701 * But we wouldn't know at this point which one of previous 702 * key that has triggered the error, since UDP socket can 703 * be shared! 704 * So we'll just ignore it! 705 */ 706 707 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { 708 //PJ_LOG(4,(THIS_FILE, 709 // "Ignored ICMP port unreach. on key=%p", h)); 710 } 711 # endif 712 713 /* In any case we would report this to caller. */ 714 bytes_read = -rc; 715 } 716 717 /* Clear fdset if there is no pending read. */ 718 if (pj_list_empty(&h->read_list)) 719 PJ_FD_CLR(h->fd, &ioqueue->rfdset); 720 721 /* In any case clear from temporary set. */ 722 PJ_FD_CLR(h->fd, &rfdset); 723 724 /* Call callback. */ 725 if (h->cb.on_read_complete) 726 (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, 727 bytes_read); 728 729 /* Re-scan readable sockets. */ 730 goto do_readable_scan; 731 732 } 733 } 734 735 #if PJ_HAS_TCP 736 /* Scan for exception socket for TCP connection error. */ 737 h = ioqueue->key_list.next; 738 do_except_scan: 739 for ( ; h!=&ioqueue->key_list; h = h->next) { 740 if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset)) 741 break; 742 } 743 if (h != &ioqueue->key_list) { 744 745 pj_assert(h->connecting); 746 747 /* Clear operation. */ 748 h->connecting = 0; 749 PJ_FD_CLR(h->fd, &ioqueue->wfdset); 750 PJ_FD_CLR(h->fd, &ioqueue->xfdset); 751 PJ_FD_CLR(h->fd, &wfdset); 752 PJ_FD_CLR(h->fd, &xfdset); 753 754 /* Call callback. */ 755 if (h->cb.on_connect_complete) 756 (*h->cb.on_connect_complete)(h, -1); 757 758 /* Re-scan exception list. */ 759 goto do_except_scan; 760 } 761 #endif /* PJ_HAS_TCP */ 762 763 /* Shouldn't happen. */ 764 /* For strange reason on WinXP select() can return 1 while there is no 765 * pj_fd_set_t signaled. */ 766 /* pj_assert(0); */ 767 768 //count = 0; 769 770 pj_lock_release(ioqueue->lock); 489 } 490 771 491 return count; 772 492 } 773 493 774 /*775 * pj_ioqueue_recv()776 *777 * Start asynchronous recv() from the socket.778 */779 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,780 pj_ioqueue_op_key_t *op_key,781 void *buffer,782 pj_ssize_t *length,783 unsigned flags )784 {785 pj_status_t status;786 pj_ssize_t size;787 struct read_operation *read_op;788 pj_ioqueue_t *ioqueue;789 790 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);791 PJ_CHECK_STACK();792 793 /* Try to see if there's data immediately available.794 */795 size = *length;796 status = pj_sock_recv(key->fd, buffer, &size, flags);797 if (status == PJ_SUCCESS) {798 /* Yes! Data is available! */799 *length = size;800 return PJ_SUCCESS;801 } else {802 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report803 * the error to caller.804 */805 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))806 return status;807 }808 809 /*810 * No data is immediately available.811 * Must schedule asynchronous operation to the ioqueue.812 */813 ioqueue = key->ioqueue;814 pj_lock_acquire(ioqueue->lock);815 816 read_op = (struct read_operation*)op_key;817 818 read_op->op = PJ_IOQUEUE_OP_RECV;819 read_op->buf = buffer;820 read_op->size = *length;821 read_op->flags = flags;822 823 pj_list_insert_before(&key->read_list, read_op);824 PJ_FD_SET(key->fd, &ioqueue->rfdset);825 826 pj_lock_release(ioqueue->lock);827 return PJ_EPENDING;828 }829 830 /*831 * pj_ioqueue_recvfrom()832 *833 * Start asynchronous recvfrom() from the socket.834 */835 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,836 pj_ioqueue_op_key_t *op_key,837 void *buffer,838 pj_ssize_t *length,839 unsigned flags,840 pj_sockaddr_t *addr,841 int *addrlen)842 {843 pj_status_t status;844 pj_ssize_t size;845 struct read_operation *read_op;846 pj_ioqueue_t *ioqueue;847 848 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);849 PJ_CHECK_STACK();850 851 /* Try to see if there's data immediately available.852 */853 size = *length;854 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,855 addr, addrlen);856 if (status == PJ_SUCCESS) {857 /* Yes! Data is available! */858 *length = size;859 return PJ_SUCCESS;860 } else {861 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report862 * the error to caller.863 */864 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))865 return status;866 }867 868 /*869 * No data is immediately available.870 * Must schedule asynchronous operation to the ioqueue.871 */872 ioqueue = key->ioqueue;873 pj_lock_acquire(ioqueue->lock);874 875 read_op = (struct read_operation*)op_key;876 877 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;878 read_op->buf = buffer;879 read_op->size = *length;880 read_op->flags = flags;881 read_op->rmt_addr = addr;882 read_op->rmt_addrlen = addrlen;883 884 pj_list_insert_before(&key->read_list, read_op);885 PJ_FD_SET(key->fd, &ioqueue->rfdset);886 887 pj_lock_release(ioqueue->lock);888 return PJ_EPENDING;889 }890 891 /*892 * pj_ioqueue_send()893 *894 * Start asynchronous send() to the descriptor.895 */896 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,897 pj_ioqueue_op_key_t *op_key,898 const void *data,899 pj_ssize_t *length,900 unsigned flags)901 {902 pj_ioqueue_t *ioqueue;903 struct write_operation *write_op;904 pj_status_t status;905 pj_ssize_t sent;906 907 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);908 PJ_CHECK_STACK();909 910 /* Fast track:911 * Try to send data immediately, only if there's no pending write!912 * Note:913 * We are speculating that the list is empty here without properly914 * acquiring ioqueue's mutex first. This is intentional, to maximize915 * performance via parallelism.916 *917 * This should be safe, because:918 * - by convention, we require caller to make sure that the919 * key is not unregistered while other threads are invoking920 * an operation on the same key.921 * - pj_list_empty() is safe to be invoked by multiple threads,922 * even when other threads are modifying the list.923 */924 if (pj_list_empty(&key->write_list)) {925 /*926 * See if data can be sent immediately.927 */928 sent = *length;929 status = pj_sock_send(key->fd, data, &sent, flags);930 if (status == PJ_SUCCESS) {931 /* Success! */932 *length = sent;933 return PJ_SUCCESS;934 } else {935 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report936 * the error to caller.937 */938 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {939 return status;940 }941 }942 }943 944 /*945 * Schedule asynchronous send.946 */947 ioqueue = key->ioqueue;948 pj_lock_acquire(ioqueue->lock);949 950 write_op = (struct write_operation*)op_key;951 write_op->op = PJ_IOQUEUE_OP_SEND;952 write_op->buf = NULL;953 write_op->size = *length;954 write_op->written = 0;955 write_op->flags = flags;956 957 pj_list_insert_before(&key->write_list, write_op);958 PJ_FD_SET(key->fd, &ioqueue->wfdset);959 960 pj_lock_release(ioqueue->lock);961 962 return PJ_EPENDING;963 }964 965 966 /*967 * pj_ioqueue_sendto()968 *969 * Start asynchronous write() to the descriptor.970 */971 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,972 pj_ioqueue_op_key_t *op_key,973 const void *data,974 pj_ssize_t *length,975 unsigned flags,976 const pj_sockaddr_t *addr,977 int addrlen)978 {979 pj_ioqueue_t *ioqueue;980 struct write_operation *write_op;981 pj_status_t status;982 pj_ssize_t sent;983 984 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);985 PJ_CHECK_STACK();986 987 /* Fast track:988 * Try to send data immediately, only if there's no pending write!989 * Note:990 * We are speculating that the list is empty here without properly991 * acquiring ioqueue's mutex first. This is intentional, to maximize992 * performance via parallelism.993 *994 * This should be safe, because:995 * - by convention, we require caller to make sure that the996 * key is not unregistered while other threads are invoking997 * an operation on the same key.998 * - pj_list_empty() is safe to be invoked by multiple threads,999 * even when other threads are modifying the list.1000 */1001 if (pj_list_empty(&key->write_list)) {1002 /*1003 * See if data can be sent immediately.1004 */1005 sent = *length;1006 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);1007 if (status == PJ_SUCCESS) {1008 /* Success! */1009 *length = sent;1010 return PJ_SUCCESS;1011 } else {1012 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report1013 * the error to caller.1014 */1015 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {1016 return status;1017 }1018 }1019 }1020 1021 /*1022 * Check that address storage can hold the address parameter.1023 */1024 PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);1025 1026 /*1027 * Schedule asynchronous send.1028 */1029 ioqueue = key->ioqueue;1030 pj_lock_acquire(ioqueue->lock);1031 1032 write_op = (struct write_operation*)op_key;1033 write_op->op = PJ_IOQUEUE_OP_SEND_TO;1034 write_op->buf = NULL;1035 write_op->size = *length;1036 write_op->written = 0;1037 write_op->flags = flags;1038 pj_memcpy(&write_op->rmt_addr, addr, addrlen);1039 write_op->rmt_addrlen = addrlen;1040 1041 pj_list_insert_before(&key->write_list, write_op);1042 PJ_FD_SET(key->fd, &ioqueue->wfdset);1043 1044 pj_lock_release(ioqueue->lock);1045 1046 return PJ_EPENDING;1047 }1048 1049 #if PJ_HAS_TCP1050 /*1051 * Initiate overlapped accept() operation.1052 */1053 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,1054 pj_ioqueue_op_key_t *op_key,1055 pj_sock_t *new_sock,1056 pj_sockaddr_t *local,1057 pj_sockaddr_t *remote,1058 int *addrlen)1059 {1060 pj_ioqueue_t *ioqueue;1061 struct accept_operation *accept_op;1062 pj_status_t status;1063 1064 /* check parameters. All must be specified! */1065 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);1066 1067 /* Fast track:1068 * See if there's new connection available immediately.1069 */1070 if (pj_list_empty(&key->accept_list)) {1071 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);1072 if (status == PJ_SUCCESS) {1073 /* Yes! New connection is available! */1074 if (local && addrlen) {1075 status = pj_sock_getsockname(*new_sock, local, addrlen);1076 if (status != PJ_SUCCESS) {1077 pj_sock_close(*new_sock);1078 *new_sock = PJ_INVALID_SOCKET;1079 return status;1080 }1081 }1082 return PJ_SUCCESS;1083 } else {1084 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report1085 * the error to caller.1086 */1087 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {1088 return status;1089 }1090 }1091 }1092 1093 /*1094 * No connection is available immediately.1095 * Schedule accept() operation to be completed when there is incoming1096 * connection available.1097 */1098 ioqueue = key->ioqueue;1099 accept_op = (struct accept_operation*)op_key;1100 1101 pj_lock_acquire(ioqueue->lock);1102 1103 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;1104 accept_op->accept_fd = new_sock;1105 accept_op->rmt_addr = remote;1106 accept_op->addrlen= addrlen;1107 accept_op->local_addr = local;1108 1109 pj_list_insert_before(&key->accept_list, accept_op);1110 PJ_FD_SET(key->fd, &ioqueue->rfdset);1111 1112 pj_lock_release(ioqueue->lock);1113 1114 return PJ_EPENDING;1115 }1116 1117 /*1118 * Initiate overlapped connect() operation (well, it's non-blocking actually,1119 * since there's no overlapped version of connect()).1120 */1121 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,1122 const pj_sockaddr_t *addr,1123 int addrlen )1124 {1125 pj_ioqueue_t *ioqueue;1126 pj_status_t status;1127 1128 /* check parameters. All must be specified! */1129 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);1130 1131 /* Check if socket has not been marked for connecting */1132 if (key->connecting != 0)1133 return PJ_EPENDING;1134 1135 status = pj_sock_connect(key->fd, addr, addrlen);1136 if (status == PJ_SUCCESS) {1137 /* Connected! */1138 return PJ_SUCCESS;1139 } else {1140 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {1141 /* Pending! */1142 ioqueue = key->ioqueue;1143 pj_lock_acquire(ioqueue->lock);1144 key->connecting = PJ_TRUE;1145 PJ_FD_SET(key->fd, &ioqueue->wfdset);1146 PJ_FD_SET(key->fd, &ioqueue->xfdset);1147 pj_lock_release(ioqueue->lock);1148 return PJ_EPENDING;1149 } else {1150 /* Error! */1151 return status;1152 }1153 }1154 }1155 #endif /* PJ_HAS_TCP */1156 -
pjproject/main/pjlib/src/pj/sock_bsd.c
r6 r12 60 60 const pj_uint16_t PJ_SOL_IPV6 = 0xFFFF; 61 61 #endif 62 63 /* optname values. */ 64 const pj_uint16_t PJ_SO_TYPE = SO_TYPE; 65 const pj_uint16_t PJ_SO_RCVBUF = SO_RCVBUF; 66 const pj_uint16_t PJ_SO_SNDBUF = SO_SNDBUF; 62 67 63 68 … … 465 470 */ 466 471 PJ_DEF(pj_status_t) pj_sock_getsockopt( pj_sock_t sock, 467 int level,468 int optname,472 pj_uint16_t level, 473 pj_uint16_t optname, 469 474 void *optval, 470 475 int *optlen) … … 483 488 */ 484 489 PJ_DEF(pj_status_t) pj_sock_setsockopt( pj_sock_t sock, 485 int level,486 int optname,490 pj_uint16_t level, 491 pj_uint16_t optname, 487 492 const void *optval, 488 493 int optlen) -
pjproject/main/pjlib/src/pj/sock_linux_kernel.c
r4 r12 72 72 # error "SOL_IPV6 undeclared!" 73 73 #endif 74 75 /* optname values. */ 76 const pj_uint16_t PJ_SO_TYPE = SO_TYPE; 77 const pj_uint16_t PJ_SO_RCVBUF = SO_RCVBUF; 78 const pj_uint16_t PJ_SO_SNDBUF = SO_SNDBUF; 74 79 75 80 /* … … 554 559 */ 555 560 PJ_DEF(pj_status_t) pj_sock_getsockopt( pj_sock_t sockfd, 556 int level,557 int optname,561 pj_uint16_t level, 562 pj_uint16_t optname, 558 563 void *optval, 559 564 int *optlen) … … 581 586 */ 582 587 PJ_DEF(pj_status_t) pj_sock_setsockopt( pj_sock_t sockfd, 583 int level,584 int optname,588 pj_uint16_t level, 589 pj_uint16_t optname, 585 590 const void *optval, 586 591 int optlen) -
pjproject/main/pjlib/src/pjlib-test/ioq_perf.c
r11 r12 72 72 73 73 if (rc != last_error) { 74 last_error = rc;74 //last_error = rc; 75 75 pj_strerror(rc, errmsg, sizeof(errmsg)); 76 76 PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", -
pjproject/main/pjlib/src/pjlib-test/ioq_tcp.c
r11 r12 180 180 t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo); 181 181 182 if (status < 0) {183 return -176;184 }185 186 182 // Compare recv buffer with send buffer. 187 183 if (pj_memcmp(send_buf, recv_buf, bufsize) != 0) {
Note: See TracChangeset
for help on using the changeset viewer.