Changeset 5194
- Timestamp:
- Nov 6, 2015 4:18:46 AM (9 years ago)
- Location:
- pjproject/trunk/pjlib
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/include/pj/ioqueue.h
r4724 r5194 304 304 #endif 305 305 306 307 /** 308 * This macro specifies the maximum event candidates collected by each 309 * polling thread to be able to reach maximum number of processed events 310 * (i.e: PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) in each poll cycle. 311 * An event candidate will be dispatched to application as event unless 312 * it is already being dispatched by other polling thread. So in order to 313 * anticipate such race condition, each poll operation should collects its 314 * event candidates more than PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, the 315 * recommended value is (PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL * 316 * number of polling threads). 317 * 318 * The value is only meaningfull when specified during PJLIB build and 319 * is only effective on multiple polling threads environment. 320 */ 321 #if !defined(PJ_IOQUEUE_MAX_CAND_EVENTS) || \ 322 PJ_IOQUEUE_MAX_CAND_EVENTS < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL 323 # undef PJ_IOQUEUE_MAX_CAND_EVENTS 324 # define PJ_IOQUEUE_MAX_CAND_EVENTS PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL 325 #endif 326 327 306 328 /** 307 329 * When this flag is specified in ioqueue's recv() or send() operations, … … 503 525 */ 504 526 PJ_DECL(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key); 527 528 /** 529 * Try to acquire the key's mutex. When the key's concurrency is disabled, 530 * application may call this function to synchronize its operation 531 * with the key's callback. 532 * 533 * @param key The key that was previously obtained from registration. 534 * 535 * @return PJ_SUCCESS on success or the appropriate error code. 536 */ 537 PJ_DECL(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key); 505 538 506 539 /** -
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c
r4890 r5194 196 196 * framework. 197 197 */ 198 void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) 199 { 200 /* Lock the key. */ 201 pj_ioqueue_lock_key(h); 198 pj_bool_t ioqueue_dispatch_write_event( pj_ioqueue_t *ioqueue, 199 pj_ioqueue_key_t *h) 200 { 201 pj_status_t rc; 202 203 /* Try lock the key. */ 204 rc = pj_ioqueue_trylock_key(h); 205 if (rc != PJ_SUCCESS) { 206 return PJ_FALSE; 207 } 202 208 203 209 if (IS_CLOSING(h)) { 204 210 pj_ioqueue_unlock_key(h); 205 return ;211 return PJ_TRUE; 206 212 } 207 213 … … 418 424 */ 419 425 pj_ioqueue_unlock_key(h); 420 } 421 } 422 423 void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) 426 427 return PJ_FALSE; 428 } 429 430 return PJ_TRUE; 431 } 432 433 pj_bool_t ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, 434 pj_ioqueue_key_t *h ) 424 435 { 425 436 pj_status_t rc; 426 437 427 /* Lock the key. */ 428 pj_ioqueue_lock_key(h); 438 /* Try lock the key. */ 439 rc = pj_ioqueue_trylock_key(h); 440 if (rc != PJ_SUCCESS) { 441 return PJ_FALSE; 442 } 429 443 430 444 if (IS_CLOSING(h)) { 431 445 pj_ioqueue_unlock_key(h); 432 return ;446 return PJ_TRUE; 433 447 } 434 448 … … 605 619 */ 606 620 pj_ioqueue_unlock_key(h); 607 } 608 } 609 610 611 void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, 612 pj_ioqueue_key_t *h ) 621 622 return PJ_FALSE; 623 } 624 625 return PJ_TRUE; 626 } 627 628 629 pj_bool_t ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, 630 pj_ioqueue_key_t *h ) 613 631 { 614 632 pj_bool_t has_lock; 615 616 pj_ioqueue_lock_key(h); 633 pj_status_t rc; 634 635 /* Try lock the key. */ 636 rc = pj_ioqueue_trylock_key(h); 637 if (rc != PJ_SUCCESS) { 638 return PJ_FALSE; 639 } 617 640 618 641 if (!h->connecting) { … … 622 645 */ 623 646 pj_ioqueue_unlock_key(h); 624 return;647 return PJ_TRUE; 625 648 } 626 649 627 650 if (IS_CLOSING(h)) { 628 651 pj_ioqueue_unlock_key(h); 629 return ;652 return PJ_TRUE; 630 653 } 631 654 … … 669 692 pj_ioqueue_unlock_key(h); 670 693 } 694 695 return PJ_TRUE; 671 696 } 672 697 … … 1325 1350 } 1326 1351 1352 PJ_DEF(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key) 1353 { 1354 if (key->grp_lock) 1355 return pj_grp_lock_tryacquire(key->grp_lock); 1356 else 1357 return pj_lock_tryacquire(key->lock); 1358 } 1359 1327 1360 PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 1328 1361 { -
pjproject/trunk/pjlib/src/pj/ioqueue_epoll.c
r4704 r5194 652 652 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 653 653 { 654 int i, count, processed;654 int i, count, event_cnt, processed_cnt; 655 655 int msec; 656 656 //struct epoll_event *events = ioqueue->events; 657 657 //struct queue *queue = ioqueue->queue; 658 struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 659 struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 658 enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; 659 struct epoll_event events[MAX_EVENTS]; 660 struct queue queue[MAX_EVENTS]; 660 661 pj_timestamp t1, t2; 661 662 … … 668 669 669 670 //count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); 670 count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec);671 count = os_epoll_wait( ioqueue->epfd, events, MAX_EVENTS, msec); 671 672 if (count == 0) { 672 673 #if PJ_IOQUEUE_HAS_SAFE_UNREG … … 695 696 pj_lock_acquire(ioqueue->lock); 696 697 697 for ( processed=0, i=0; i<count; ++i) {698 for (event_cnt=0, i=0; i<count; ++i) { 698 699 pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) 699 700 events[i].epoll_data; … … 710 711 increment_counter(h); 711 712 #endif 712 queue[ processed].key = h;713 queue[ processed].event_type = READABLE_EVENT;714 ++ processed;713 queue[event_cnt].key = h; 714 queue[event_cnt].event_type = READABLE_EVENT; 715 ++event_cnt; 715 716 continue; 716 717 } … … 724 725 increment_counter(h); 725 726 #endif 726 queue[ processed].key = h;727 queue[ processed].event_type = WRITEABLE_EVENT;728 ++ processed;727 queue[event_cnt].key = h; 728 queue[event_cnt].event_type = WRITEABLE_EVENT; 729 ++event_cnt; 729 730 continue; 730 731 } … … 739 740 increment_counter(h); 740 741 #endif 741 queue[ processed].key = h;742 queue[ processed].event_type = WRITEABLE_EVENT;743 ++ processed;742 queue[event_cnt].key = h; 743 queue[event_cnt].event_type = WRITEABLE_EVENT; 744 ++event_cnt; 744 745 continue; 745 746 } … … 759 760 increment_counter(h); 760 761 #endif 761 queue[ processed].key = h;762 queue[ processed].event_type = EXCEPTION_EVENT;763 ++ processed;762 queue[event_cnt].key = h; 763 queue[event_cnt].event_type = EXCEPTION_EVENT; 764 ++event_cnt; 764 765 } else if (key_has_pending_read(h) || key_has_pending_accept(h)) { 765 766 #if PJ_IOQUEUE_HAS_SAFE_UNREG 766 767 increment_counter(h); 767 768 #endif 768 queue[ processed].key = h;769 queue[ processed].event_type = READABLE_EVENT;770 ++ processed;769 queue[event_cnt].key = h; 770 queue[event_cnt].event_type = READABLE_EVENT; 771 ++event_cnt; 771 772 } 772 773 continue; 773 774 } 774 775 } 775 for (i=0; i< processed; ++i) {776 for (i=0; i<event_cnt; ++i) { 776 777 if (queue[i].key->grp_lock) 777 778 pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0); … … 784 785 PJ_RACE_ME(5); 785 786 787 processed_cnt = 0; 788 786 789 /* Now process the events. */ 787 for (i=0; i<processed; ++i) { 788 switch (queue[i].event_type) { 789 case READABLE_EVENT: 790 ioqueue_dispatch_read_event(ioqueue, queue[i].key); 791 break; 792 case WRITEABLE_EVENT: 793 ioqueue_dispatch_write_event(ioqueue, queue[i].key); 794 break; 795 case EXCEPTION_EVENT: 796 ioqueue_dispatch_exception_event(ioqueue, queue[i].key); 797 break; 798 case NO_EVENT: 799 pj_assert(!"Invalid event!"); 800 break; 801 } 790 for (i=0; i<event_cnt; ++i) { 791 792 /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ 793 if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { 794 switch (queue[i].event_type) { 795 case READABLE_EVENT: 796 if (ioqueue_dispatch_read_event(ioqueue, queue[i].key)) 797 ++processed_cnt; 798 break; 799 case WRITEABLE_EVENT: 800 if (ioqueue_dispatch_write_event(ioqueue, queue[i].key)) 801 ++processed_cnt; 802 break; 803 case EXCEPTION_EVENT: 804 if (ioqueue_dispatch_exception_event(ioqueue, queue[i].key)) 805 ++processed_cnt; 806 break; 807 case NO_EVENT: 808 pj_assert(!"Invalid event!"); 809 break; 810 } 811 } 802 812 803 813 #if PJ_IOQUEUE_HAS_SAFE_UNREG … … 813 823 * When epoll returns > 0 but no descriptors are actually set! 814 824 */ 815 if (count > 0 && ! processed&& msec > 0) {825 if (count > 0 && !event_cnt && msec > 0) { 816 826 pj_thread_sleep(msec); 817 827 } 828 829 TRACE_((THIS_FILE, " poll: count=%d events=%d processed=%d", 830 count, event_cnt, processed_cnt)); 818 831 819 832 pj_get_timestamp(&t1); … … 821 834 processed, pj_elapsed_usec(&t2, &t1))); 822 835 823 return processed ;824 } 825 836 return processed_cnt; 837 } 838 -
pjproject/trunk/pjlib/src/pj/ioqueue_select.c
r4991 r5194 334 334 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && 335 335 cb && p_key, PJ_EINVAL); 336 337 /* On platforms with fd_set containing fd bitmap such as *nix family, 338 * avoid potential memory corruption caused by select() when given 339 * an fd that is higher than FD_SETSIZE. 340 */ 341 if (sizeof(fd_set) < FD_SETSIZE && sock >= PJ_IOQUEUE_MAX_HANDLES) 342 return PJ_ETOOBIG; 336 343 337 344 pj_lock_acquire(ioqueue->lock); … … 832 839 pj_fd_set_t rfdset, wfdset, xfdset; 833 840 int nfds; 834 int count, i, counter;841 int i, count, event_cnt, processed_cnt; 835 842 pj_ioqueue_key_t *h; 843 enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; 836 844 struct event 837 845 { 838 846 pj_ioqueue_key_t *key; 839 847 enum ioqueue_event_type event_type; 840 } event[ PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];848 } event[MAX_EVENTS]; 841 849 842 850 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); … … 890 898 else if (count < 0) 891 899 return -pj_get_netos_error(); 892 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)893 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;894 900 895 901 /* Scan descriptor sets for event and add the events in the event … … 899 905 pj_lock_acquire(ioqueue->lock); 900 906 901 counter= 0;907 event_cnt = 0; 902 908 903 909 /* Scan for writable sockets first to handle piggy-back data 904 910 * coming with accept(). 905 911 */ 906 h = ioqueue->active_list.next; 907 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) { 912 for (h = ioqueue->active_list.next; 913 h != &ioqueue->active_list && event_cnt < MAX_EVENTS; 914 h = h->next) 915 { 908 916 909 917 if ( (key_has_pending_write(h) || key_has_pending_connect(h)) … … 913 921 increment_counter(h); 914 922 #endif 915 event[ counter].key = h;916 event[ counter].event_type = WRITEABLE_EVENT;917 ++ counter;923 event[event_cnt].key = h; 924 event[event_cnt].event_type = WRITEABLE_EVENT; 925 ++event_cnt; 918 926 } 919 927 … … 921 929 if ((key_has_pending_read(h) || key_has_pending_accept(h)) 922 930 && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) && 923 counter<count)931 event_cnt < MAX_EVENTS) 924 932 { 925 933 #if PJ_IOQUEUE_HAS_SAFE_UNREG 926 934 increment_counter(h); 927 935 #endif 928 event[ counter].key = h;929 event[ counter].event_type = READABLE_EVENT;930 ++ counter;936 event[event_cnt].key = h; 937 event[event_cnt].event_type = READABLE_EVENT; 938 ++event_cnt; 931 939 } 932 940 933 941 #if PJ_HAS_TCP 934 942 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && 935 !IS_CLOSING(h) && counter<count)943 !IS_CLOSING(h) && event_cnt < MAX_EVENTS) 936 944 { 937 945 #if PJ_IOQUEUE_HAS_SAFE_UNREG 938 946 increment_counter(h); 939 947 #endif 940 event[ counter].key = h;941 event[ counter].event_type = EXCEPTION_EVENT;942 ++ counter;948 event[event_cnt].key = h; 949 event[event_cnt].event_type = EXCEPTION_EVENT; 950 ++event_cnt; 943 951 } 944 952 #endif 945 953 } 946 954 947 for (i=0; i< counter; ++i) {955 for (i=0; i<event_cnt; ++i) { 948 956 if (event[i].key->grp_lock) 949 957 pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); … … 956 964 PJ_RACE_ME(5); 957 965 958 count = counter;966 processed_cnt = 0; 959 967 960 968 /* Now process all events. The dispatch functions will take care 961 969 * of locking in each of the key 962 970 */ 963 for (counter=0; counter<count; ++counter) { 964 switch (event[counter].event_type) { 965 case READABLE_EVENT: 966 ioqueue_dispatch_read_event(ioqueue, event[counter].key); 967 break; 968 case WRITEABLE_EVENT: 969 ioqueue_dispatch_write_event(ioqueue, event[counter].key); 970 break; 971 case EXCEPTION_EVENT: 972 ioqueue_dispatch_exception_event(ioqueue, event[counter].key); 973 break; 974 case NO_EVENT: 975 pj_assert(!"Invalid event!"); 976 break; 977 } 978 979 #if PJ_IOQUEUE_HAS_SAFE_UNREG 980 decrement_counter(event[counter].key); 981 #endif 982 983 if (event[counter].key->grp_lock) 984 pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock, 971 for (i=0; i<event_cnt; ++i) { 972 973 /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ 974 if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { 975 switch (event[i].event_type) { 976 case READABLE_EVENT: 977 if (ioqueue_dispatch_read_event(ioqueue, event[i].key)) 978 ++processed_cnt; 979 break; 980 case WRITEABLE_EVENT: 981 if (ioqueue_dispatch_write_event(ioqueue, event[i].key)) 982 ++processed_cnt; 983 break; 984 case EXCEPTION_EVENT: 985 if (ioqueue_dispatch_exception_event(ioqueue, event[i].key)) 986 ++processed_cnt; 987 break; 988 case NO_EVENT: 989 pj_assert(!"Invalid event!"); 990 break; 991 } 992 } 993 994 #if PJ_IOQUEUE_HAS_SAFE_UNREG 995 decrement_counter(event[i].key); 996 #endif 997 998 if (event[i].key->grp_lock) 999 pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock, 985 1000 "ioqueue", 0); 986 1001 } 987 1002 988 989 return count; 990 } 991 1003 TRACE__((THIS_FILE, " poll: count=%d events=%d processed=%d", 1004 count, event_cnt, processed_cnt)); 1005 1006 return processed_cnt; 1007 } 1008
Note: See TracChangeset
for help on using the changeset viewer.