- Timestamp:
- Mar 30, 2006 4:32:18 PM (19 years ago)
- Location:
- pjproject/trunk/pjlib
- Files:
-
- 1 added
- 1 deleted
- 11 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/build/Makefile
r338 r365 24 24 array.o config.o ctype.o errno.o except.o fifobuf.o guid.o \ 25 25 hash.o list.o lock.o log.o os_time_common.o \ 26 pool.o pool_caching.o rand.o \26 pool.o pool_caching.o pool_dbg.o rand.o \ 27 27 rbtree.o string.o timer.o \ 28 28 types.o symbols.o … … 35 35 export TEST_OBJS += atomic.o echo_clt.o errno.o exception.o \ 36 36 fifobuf.o file.o \ 37 ioq_perf.o ioq_udp.o ioq_ tcp.o \37 ioq_perf.o ioq_udp.o ioq_unreg.o ioq_tcp.o \ 38 38 list.o mutex.o os.o pool.o pool_perf.o rand.o rbtree.o \ 39 39 select.o sleep.o sock.o sock_perf.o \ -
pjproject/trunk/pjlib/build/pjlib.dsp
r363 r365 234 234 235 235 SOURCE=..\src\pj\ioqueue_select.c 236 237 !IF "$(CFG)" == "pjlib - Win32 Release" 238 239 !ELSEIF "$(CFG)" == "pjlib - Win32 Debug" 240 241 !ENDIF 242 236 243 # End Source File 237 244 # Begin Source File … … 286 293 # Begin Source File 287 294 288 SOURCE=..\src\pj\pool_dbg _win32.c295 SOURCE=..\src\pj\pool_dbg.c 289 296 # End Source File 290 297 # Begin Source File … … 523 530 524 531 SOURCE=..\include\pj\pool.h 532 # End Source File 533 # Begin Source File 534 535 SOURCE=..\include\pj\pool_alt.h 525 536 # End Source File 526 537 # Begin Source File -
pjproject/trunk/pjlib/build/pjlib_test.dsp
r349 r365 124 124 # Begin Source File 125 125 126 SOURCE="..\src\pjlib-test\ioq_unreg.c" 127 # End Source File 128 # Begin Source File 129 126 130 SOURCE="..\src\pjlib-test\list.c" 127 131 # End Source File -
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c
r363 r365 28 28 */ 29 29 30 static long ioqueue_tls_id = -1;31 32 typedef struct key_lock_data {33 struct key_lock_data *prev;34 pj_ioqueue_key_t *key;35 int is_alive;36 } key_lock_data;37 38 39 30 static void ioqueue_init( pj_ioqueue_t *ioqueue ) 40 31 { 41 32 ioqueue->lock = NULL; 42 33 ioqueue->auto_delete_lock = 0; 43 44 if (ioqueue_tls_id == -1) {45 pj_status_t status;46 status = pj_thread_local_alloc(&ioqueue_tls_id);47 pj_thread_local_set(ioqueue_tls_id, NULL);48 }49 34 } 50 35 … … 94 79 #if PJ_HAS_TCP 95 80 pj_list_init(&key->accept_list); 81 key->connecting = 0; 96 82 #endif 97 83 98 84 /* Save callback. */ 99 85 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 86 87 #if PJ_IOQUEUE_HAS_SAFE_UNREG 88 /* Set initial reference count to 1 */ 89 pj_assert(key->ref_count == 0); 90 ++key->ref_count; 91 92 key->closing = 0; 93 #endif 100 94 101 95 /* Get socket type. When socket type is datagram, some optimization … … 108 102 key->fd_type = PJ_SOCK_STREAM; 109 103 110 key->inside_callback = 0;111 key->destroy_requested = 0;112 113 104 /* Create mutex for the key. */ 114 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 105 #if !PJ_IOQUEUE_HAS_SAFE_UNREG 106 rc = pj_mutex_create_simple(pool, NULL, &key->mutex); 107 #endif 115 108 116 109 return rc; 117 }118 119 /* Lock the key and also keep the lock data in thread local storage.120 * The lock data is used to detect if the key is deleted by application121 * when we call its callback.122 */123 static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck)124 {125 struct key_lock_data *prev_data;126 127 pj_mutex_lock(key->mutex);128 prev_data = (struct key_lock_data *)129 pj_thread_local_get(ioqueue_tls_id);130 lck->prev = prev_data;131 lck->key = key;132 lck->is_alive = 1;133 pj_thread_local_set(ioqueue_tls_id, lck);134 }135 136 /* Unlock the key only if it is still valid. */137 static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck)138 {139 pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck);140 pj_assert( lck->key == key );141 pj_thread_local_set(ioqueue_tls_id, lck->prev);142 if (lck->is_alive)143 pj_mutex_unlock(key->mutex);144 }145 146 /* Destroy key */147 static void ioqueue_destroy_key( pj_ioqueue_key_t *key )148 {149 key_lock_data *lck;150 151 /* Make sure that no other threads are doing something with152 * the key.153 */154 pj_mutex_lock(key->mutex);155 156 /* Check if this function is called within a callback context.157 * If so, then we need to inform the callback that the key has158 * been destroyed so that it doesn't attempt to unlock the159 * key's mutex.160 */161 lck = pj_thread_local_get(ioqueue_tls_id);162 while (lck) {163 if (lck->key == key) {164 lck->is_alive = 0;165 }166 lck = lck->prev;167 }168 169 pj_mutex_destroy(key->mutex);170 110 } 171 111 … … 222 162 223 163 164 #if PJ_IOQUEUE_HAS_SAFE_UNREG 165 # define IS_CLOSING(key) (key->closing) 166 #else 167 # define IS_CLOSING(key) (0) 168 #endif 169 170 224 171 /* 225 172 * ioqueue_dispatch_event() … … 230 177 void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) 231 178 { 232 key_lock_data lck_data;233 234 179 /* Lock the key. */ 235 lock_key(h, &lck_data); 180 pj_mutex_lock(h->mutex); 181 182 if (h->closing) { 183 pj_mutex_unlock(h->mutex); 184 return; 185 } 236 186 237 187 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 … … 246 196 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); 247 197 248 /* Unlock; from this point we don't need to hold key's mutex. */249 //pj_mutex_unlock(h->mutex);250 198 251 199 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) … … 294 242 #endif 295 243 244 /* Unlock; from this point we don't need to hold key's mutex. */ 245 pj_mutex_unlock(h->mutex); 246 296 247 /* Call callback. */ 297 if (h->cb.on_connect_complete )248 if (h->cb.on_connect_complete && !IS_CLOSING(h)) 298 249 (*h->cb.on_connect_complete)(h, bytes_transfered); 299 250 … … 320 271 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 321 272 322 //pj_mutex_unlock(h->mutex);323 273 } 324 274 … … 366 316 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 367 317 368 /* No need to hold mutex anymore */369 //pj_mutex_unlock(h->mutex);370 318 } 371 319 320 /* No need to hold mutex anymore */ 321 pj_mutex_unlock(h->mutex); 322 372 323 /* Call callback. */ 373 if (h->cb.on_write_complete ) {324 if (h->cb.on_write_complete && !IS_CLOSING(h)) { 374 325 (*h->cb.on_write_complete)(h, 375 326 (pj_ioqueue_op_key_t*)write_op, … … 378 329 379 330 } else { 380 //pj_mutex_unlock(h->mutex);331 pj_mutex_unlock(h->mutex); 381 332 } 382 333 … … 388 339 * able to process the event. 389 340 */ 390 //pj_mutex_unlock(h->mutex); 391 } 392 393 /* Finally unlock key */ 394 unlock_key(h, &lck_data); 341 pj_mutex_unlock(h->mutex); 342 } 395 343 } 396 344 397 345 void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) 398 346 { 399 key_lock_data lck_data;400 347 pj_status_t rc; 401 348 402 349 /* Lock the key. */ 403 lock_key(h, &lck_data); 350 pj_mutex_lock(h->mutex); 351 352 if (h->closing) { 353 pj_mutex_unlock(h->mutex); 354 return; 355 } 404 356 405 357 # if PJ_HAS_TCP … … 416 368 if (pj_list_empty(&h->accept_list)) 417 369 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); 418 419 /* Unlock; from this point we don't need to hold key's mutex. */420 //pj_mutex_unlock(h->mutex);421 370 422 371 rc=pj_sock_accept(h->fd, accept_op->accept_fd, … … 428 377 } 429 378 379 /* Unlock; from this point we don't need to hold key's mutex. */ 380 pj_mutex_unlock(h->mutex); 381 430 382 /* Call callback. */ 431 if (h->cb.on_accept_complete ) {383 if (h->cb.on_accept_complete && !IS_CLOSING(h)) { 432 384 (*h->cb.on_accept_complete)(h, 433 385 (pj_ioqueue_op_key_t*)accept_op, … … 449 401 if (pj_list_empty(&h->read_list)) 450 402 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); 451 452 /* Unlock; from this point we don't need to hold key's mutex. */453 //Crash as of revision 353 (since we added pjmedia socket to454 //main ioqueue).455 //pj_mutex_unlock(h->mutex);456 403 457 404 bytes_read = read_op->size; … … 517 464 } 518 465 466 /* Unlock; from this point we don't need to hold key's mutex. */ 467 pj_mutex_unlock(h->mutex); 468 519 469 /* Call callback. */ 520 if (h->cb.on_read_complete ) {470 if (h->cb.on_read_complete && !IS_CLOSING(h)) { 521 471 (*h->cb.on_read_complete)(h, 522 472 (pj_ioqueue_op_key_t*)read_op, … … 530 480 * able to process the event. 531 481 */ 532 //pj_mutex_unlock(h->mutex); 533 } 534 535 /* Unlock handle. */ 536 unlock_key(h, &lck_data); 482 pj_mutex_unlock(h->mutex); 483 } 537 484 } 538 485 … … 541 488 pj_ioqueue_key_t *h ) 542 489 { 543 key_lock_data lck_data; 544 545 lock_key(h, &lck_data); 490 pj_mutex_lock(h->mutex); 546 491 547 492 if (!h->connecting) { … … 550 495 * it has been processed by other thread. 551 496 */ 552 //pj_mutex_unlock(h->mutex); 553 unlock_key(h, &lck_data); 497 pj_mutex_unlock(h->mutex); 554 498 return; 499 } 500 501 if (h->closing) { 502 pj_mutex_unlock(h->mutex); 503 return; 555 504 } 556 505 … … 558 507 h->connecting = 0; 559 508 560 //pj_mutex_unlock(h->mutex);561 562 509 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 563 510 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); 564 511 512 pj_mutex_unlock(h->mutex); 513 565 514 /* Call callback. */ 566 if (h->cb.on_connect_complete )515 if (h->cb.on_connect_complete && !IS_CLOSING(h)) 567 516 (*h->cb.on_connect_complete)(h, -1); 568 569 unlock_key(h, &lck_data);570 517 } 571 518 … … 588 535 read_op = (struct read_operation*)op_key; 589 536 read_op->op = 0; 537 538 /* Check if key is closing. */ 539 if (key->closing) 540 return PJ_ECANCELLED; 590 541 591 542 /* Try to see if there's data immediately available. … … 646 597 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 647 598 PJ_CHECK_STACK(); 599 600 /* Check if key is closing. */ 601 if (key->closing) 602 return PJ_ECANCELLED; 648 603 649 604 read_op = (struct read_operation*)op_key; … … 710 665 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 711 666 PJ_CHECK_STACK(); 667 668 /* Check if key is closing. */ 669 if (key->closing) 670 return PJ_ECANCELLED; 712 671 713 672 write_op = (struct write_operation*)op_key; … … 789 748 PJ_CHECK_STACK(); 790 749 750 /* Check if key is closing. */ 751 if (key->closing) 752 return PJ_ECANCELLED; 753 791 754 write_op = (struct write_operation*)op_key; 792 755 write_op->op = 0; … … 869 832 /* check parameters. All must be specified! */ 870 833 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 834 835 /* Check if key is closing. */ 836 if (key->closing) 837 return PJ_ECANCELLED; 871 838 872 839 accept_op = (struct accept_operation*)op_key; … … 930 897 /* check parameters. All must be specified! */ 931 898 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 899 900 /* Check if key is closing. */ 901 if (key->closing) 902 return PJ_ECANCELLED; 932 903 933 904 /* Check if socket has not been marked for connecting */ … … 987 958 { 988 959 struct generic_operation *op_rec; 989 key_lock_data lck_data;990 960 991 961 /* … … 993 963 * really make sure that it's still there; then call the callback. 994 964 */ 995 lock_key(key, &lck_data);965 pj_mutex_lock(key->mutex); 996 966 997 967 /* Find the operation in the pending read list. */ … … 1001 971 pj_list_erase(op_rec); 1002 972 op_rec->op = 0; 1003 //pj_mutex_unlock(key->mutex);973 pj_mutex_unlock(key->mutex); 1004 974 1005 975 (*key->cb.on_read_complete)(key, op_key, bytes_status); 1006 1007 unlock_key(key, &lck_data);1008 976 return PJ_SUCCESS; 1009 977 } … … 1017 985 pj_list_erase(op_rec); 1018 986 op_rec->op = 0; 1019 //pj_mutex_unlock(key->mutex);987 pj_mutex_unlock(key->mutex); 1020 988 1021 989 (*key->cb.on_write_complete)(key, op_key, bytes_status); 1022 1023 unlock_key(key, &lck_data);1024 990 return PJ_SUCCESS; 1025 991 } … … 1033 999 pj_list_erase(op_rec); 1034 1000 op_rec->op = 0; 1035 //pj_mutex_unlock(key->mutex);1001 pj_mutex_unlock(key->mutex); 1036 1002 1037 1003 (*key->cb.on_accept_complete)(key, op_key, 1038 1004 PJ_INVALID_SOCKET, 1039 1005 bytes_status); 1040 1041 unlock_key(key, &lck_data);1042 1006 return PJ_SUCCESS; 1043 1007 } … … 1045 1009 } 1046 1010 1047 unlock_key(key, &lck_data);1011 pj_mutex_unlock(key->mutex); 1048 1012 1049 1013 return PJ_EINVALIDOP; -
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.h
r363 r365 88 88 }; 89 89 90 #if PJ_IOQUEUE_HAS_SAFE_UNREG 91 # define UNREG_FIELDS \ 92 unsigned ref_count; \ 93 pj_bool_t closing; \ 94 pj_time_val free_time; \ 95 96 #else 97 # define UNREG_FIELDS 98 #endif 99 90 100 #define DECLARE_COMMON_KEY \ 91 101 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \ … … 101 111 struct read_operation read_list; \ 102 112 struct write_operation write_list; \ 103 struct accept_operation accept_list; 113 struct accept_operation accept_list; \ 114 UNREG_FIELDS 104 115 105 116 -
pjproject/trunk/pjlib/src/pj/ioqueue_select.c
r126 r365 110 110 111 111 unsigned max, count; 112 pj_ioqueue_key_t key_list;112 pj_ioqueue_key_t active_list; 113 113 pj_fd_set_t rfdset; 114 114 pj_fd_set_t wfdset; … … 116 116 pj_fd_set_t xfdset; 117 117 #endif 118 119 #if PJ_IOQUEUE_HAS_SAFE_UNREG 120 pj_mutex_t *ref_cnt_mutex; 121 pj_ioqueue_key_t closing_list; 122 pj_ioqueue_key_t free_list; 123 #endif 118 124 }; 119 125 … … 142 148 pj_ioqueue_t *ioqueue; 143 149 pj_lock_t *lock; 150 unsigned i; 144 151 pj_status_t rc; 145 152 … … 153 160 sizeof(union operation_key), PJ_EBUG); 154 161 162 /* Create and init common ioqueue stuffs */ 155 163 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 156 157 164 ioqueue_init(ioqueue); 158 165 … … 164 171 PJ_FD_ZERO(&ioqueue->xfdset); 165 172 #endif 166 pj_list_init(&ioqueue->key_list); 167 173 pj_list_init(&ioqueue->active_list); 174 175 #if PJ_IOQUEUE_HAS_SAFE_UNREG 176 /* When safe unregistration is used (the default), we pre-create 177 * all keys and put them in the free list. 178 */ 179 180 /* Mutex to protect key's reference counter 181 * We don't want to use key's mutex or ioqueue's mutex because 182 * that would create deadlock situation in some cases. 183 */ 184 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); 185 if (rc != PJ_SUCCESS) 186 return rc; 187 188 189 /* Init key list */ 190 pj_list_init(&ioqueue->free_list); 191 pj_list_init(&ioqueue->closing_list); 192 193 194 /* Pre-create all keys according to max_fd */ 195 for (i=0; i<max_fd; ++i) { 196 pj_ioqueue_key_t *key; 197 198 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 199 key->ref_count = 0; 200 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 201 if (rc != PJ_SUCCESS) { 202 key = ioqueue->free_list.next; 203 while (key != &ioqueue->free_list) { 204 pj_mutex_destroy(key->mutex); 205 key = key->next; 206 } 207 pj_mutex_destroy(ioqueue->ref_cnt_mutex); 208 return rc; 209 } 210 211 pj_list_push_back(&ioqueue->free_list, key); 212 } 213 #endif 214 215 /* Create and init ioqueue mutex */ 168 216 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); 169 217 if (rc != PJ_SUCCESS) … … 187 235 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 188 236 { 237 pj_ioqueue_key_t *key; 238 189 239 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 190 240 191 241 pj_lock_acquire(ioqueue->lock); 242 243 #if PJ_IOQUEUE_HAS_SAFE_UNREG 244 /* Destroy reference counters */ 245 key = ioqueue->active_list.next; 246 while (key != &ioqueue->active_list) { 247 pj_mutex_destroy(key->mutex); 248 key = key->next; 249 } 250 251 key = ioqueue->closing_list.next; 252 while (key != &ioqueue->closing_list) { 253 pj_mutex_destroy(key->mutex); 254 key = key->next; 255 } 256 257 key = ioqueue->free_list.next; 258 while (key != &ioqueue->free_list) { 259 pj_mutex_destroy(key->mutex); 260 key = key->next; 261 } 262 263 pj_mutex_destroy(ioqueue->ref_cnt_mutex); 264 #endif 265 192 266 return ioqueue_destroy(ioqueue); 193 267 } … … 197 271 * pj_ioqueue_register_sock() 198 272 * 199 * Register ahandle to ioqueue.273 * Register socket handle to ioqueue. 200 274 */ 201 275 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, … … 217 291 if (ioqueue->count >= ioqueue->max) { 218 292 rc = PJ_ETOOMANY; 293 goto on_return; 294 } 295 296 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get 297 * the key from the free list. Otherwise allocate a new one. 298 */ 299 #if PJ_IOQUEUE_HAS_SAFE_UNREG 300 pj_assert(!pj_list_empty(&ioqueue->free_list)); 301 if (pj_list_empty(&ioqueue->free_list)) { 302 rc = PJ_ETOOMANY; 303 goto on_return; 304 } 305 306 key = ioqueue->free_list.next; 307 pj_list_erase(key); 308 #else 309 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 310 #endif 311 312 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 313 if (rc != PJ_SUCCESS) { 314 key = NULL; 219 315 goto on_return; 220 316 } … … 232 328 } 233 329 234 /* Create key. */ 235 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 236 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 237 if (rc != PJ_SUCCESS) { 238 key = NULL; 239 goto on_return; 240 } 241 242 /* Register */ 243 pj_list_insert_before(&ioqueue->key_list, key); 330 331 /* Put in active list. */ 332 pj_list_insert_before(&ioqueue->active_list, key); 244 333 ++ioqueue->count; 245 334 … … 252 341 } 253 342 343 #if PJ_IOQUEUE_HAS_SAFE_UNREG 344 /* Increment key's reference counter */ 345 static void increment_counter(pj_ioqueue_key_t *key) 346 { 347 pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 348 ++key->ref_count; 349 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 350 } 351 352 /* Decrement the key's reference counter, and when the counter reach zero, 353 * destroy the key. 354 * 355 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. 356 */ 357 static void decrement_counter(pj_ioqueue_key_t *key) 358 { 359 pj_mutex_lock(key->ioqueue->ref_cnt_mutex); 360 --key->ref_count; 361 if (key->ref_count == 0) { 362 363 pj_assert(key->closing == 1); 364 pj_gettimeofday(&key->free_time); 365 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 366 pj_time_val_normalize(&key->free_time); 367 368 pj_lock_acquire(key->ioqueue->lock); 369 pj_list_erase(key); 370 pj_list_push_back(&key->ioqueue->closing_list, key); 371 pj_lock_release(key->ioqueue->lock); 372 } 373 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); 374 } 375 #endif 376 377 254 378 /* 255 379 * pj_ioqueue_unregister() … … 265 389 ioqueue = key->ioqueue; 266 390 391 /* Lock the key to make sure no callback is simultaneously modifying 392 * the key. We need to lock the key before ioqueue here to prevent 393 * deadlock. 394 */ 395 pj_mutex_lock(key->mutex); 396 397 /* Also lock ioqueue */ 267 398 pj_lock_acquire(ioqueue->lock); 268 399 … … 276 407 #endif 277 408 278 /* ioqueue_destroy may try to acquire key's mutex. 279 * Since normally the order of locking is to lock key's mutex first 280 * then ioqueue's mutex, ioqueue_destroy may deadlock unless we 281 * release ioqueue's mutex first. 409 /* Close socket. */ 410 pj_sock_close(key->fd); 411 412 /* Clear callback */ 413 key->cb.on_accept_complete = NULL; 414 key->cb.on_connect_complete = NULL; 415 key->cb.on_read_complete = NULL; 416 key->cb.on_write_complete = NULL; 417 418 /* Must release ioqueue lock first before decrementing counter, to 419 * prevent deadlock. 282 420 */ 283 421 pj_lock_release(ioqueue->lock); 284 422 285 /* Destroy the key. */ 286 ioqueue_destroy_key(key); 423 #if PJ_IOQUEUE_HAS_SAFE_UNREG 424 /* Mark key is closing. */ 425 key->closing = 1; 426 427 /* Decrement counter. */ 428 decrement_counter(key); 429 430 /* Done. */ 431 pj_mutex_unlock(key->mutex); 432 #else 433 pj_mutex_destroy(key->mutex); 434 #endif 287 435 288 436 return PJ_SUCCESS; … … 309 457 pj_assert(0); 310 458 311 key = ioqueue-> key_list.next;312 while (key != &ioqueue-> key_list) {459 key = ioqueue->active_list.next; 460 while (key != &ioqueue->active_list) { 313 461 if (!pj_list_empty(&key->read_list) 314 462 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 … … 396 544 } 397 545 546 #if PJ_IOQUEUE_HAS_SAFE_UNREG 547 /* Scan closing keys to be put to free list again */ 548 static void scan_closing_keys(pj_ioqueue_t *ioqueue) 549 { 550 pj_time_val now; 551 pj_ioqueue_key_t *h; 552 553 pj_gettimeofday(&now); 554 h = ioqueue->closing_list.next; 555 while (h != &ioqueue->closing_list) { 556 pj_ioqueue_key_t *next = h->next; 557 558 pj_assert(h->closing != 0); 559 560 if (PJ_TIME_VAL_GTE(now, h->free_time)) { 561 pj_list_erase(h); 562 pj_list_push_back(&ioqueue->free_list, h); 563 } 564 h = next; 565 } 566 } 567 #endif 568 569 398 570 /* 399 571 * pj_ioqueue_poll() … … 436 608 PJ_FD_COUNT(&ioqueue->xfdset)==0) 437 609 { 438 pj_lock_release(ioqueue->lock); 610 #if PJ_IOQUEUE_HAS_SAFE_UNREG 611 scan_closing_keys(ioqueue); 612 #endif 613 pj_lock_release(ioqueue->lock); 439 614 if (timeout) 440 615 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); … … 476 651 * coming with accept(). 477 652 */ 478 h = ioqueue->key_list.next; 479 for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { 653 h = ioqueue->active_list.next; 654 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) { 655 480 656 if ( (key_has_pending_write(h) || key_has_pending_connect(h)) 481 && PJ_FD_ISSET(h->fd, &wfdset) )657 && PJ_FD_ISSET(h->fd, &wfdset) && !h->closing) 482 658 { 659 #if PJ_IOQUEUE_HAS_SAFE_UNREG 660 increment_counter(h); 661 #endif 483 662 event[counter].key = h; 484 663 event[counter].event_type = WRITEABLE_EVENT; … … 488 667 /* Scan for readable socket. */ 489 668 if ((key_has_pending_read(h) || key_has_pending_accept(h)) 490 && PJ_FD_ISSET(h->fd, &rfdset) )669 && PJ_FD_ISSET(h->fd, &rfdset) && !h->closing) 491 670 { 671 #if PJ_IOQUEUE_HAS_SAFE_UNREG 672 increment_counter(h); 673 #endif 492 674 event[counter].key = h; 493 675 event[counter].event_type = READABLE_EVENT; … … 496 678 497 679 #if PJ_HAS_TCP 498 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { 680 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && 681 !h->closing) 682 { 683 #if PJ_IOQUEUE_HAS_SAFE_UNREG 684 increment_counter(h); 685 #endif 499 686 event[counter].key = h; 500 687 event[counter].event_type = EXCEPTION_EVENT; … … 526 713 break; 527 714 } 528 } 715 716 #if PJ_IOQUEUE_HAS_SAFE_UNREG 717 decrement_counter(event[counter].key); 718 #endif 719 } 720 529 721 530 722 return count; -
pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c
r349 r365 107 107 struct pj_ioqueue_key_t 108 108 { 109 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); 110 109 111 pj_ioqueue_t *ioqueue; 110 112 HANDLE hnd; 111 113 void *user_data; 112 114 enum handle_type hnd_type; 115 pj_ioqueue_callback cb; 116 113 117 #if PJ_HAS_TCP 114 118 int connecting; 115 119 #endif 116 pj_ioqueue_callback cb; 117 pj_bool_t has_quit_signal; 120 121 #if PJ_IOQUEUE_HAS_SAFE_UNREG 122 pj_atomic_t *ref_count; 123 pj_bool_t closing; 124 pj_time_val free_time; 125 #endif 126 118 127 }; 119 128 … … 126 135 pj_lock_t *lock; 127 136 pj_bool_t auto_delete_lock; 137 138 #if PJ_IOQUEUE_HAS_SAFE_UNREG 139 pj_ioqueue_key_t active_list; 140 pj_ioqueue_key_t free_list; 141 pj_ioqueue_key_t closing_list; 142 #endif 143 144 /* These are to keep track of connecting sockets */ 145 #if PJ_HAS_TCP 128 146 unsigned event_count; 129 147 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1]; 130 #if PJ_HAS_TCP131 148 unsigned connecting_count; 132 149 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; … … 280 297 { 281 298 pj_ioqueue_t *ioqueue; 299 unsigned i; 282 300 pj_status_t rc; 283 301 … … 291 309 sizeof(union operation_key), PJ_EBUG); 292 310 311 /* Create IOCP */ 293 312 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); 294 313 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); … … 296 315 return PJ_RETURN_OS_ERROR(GetLastError()); 297 316 317 /* Create IOCP mutex */ 298 318 rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); 299 319 if (rc != PJ_SUCCESS) { … … 304 324 ioqueue->auto_delete_lock = PJ_TRUE; 305 325 326 #if PJ_IOQUEUE_HAS_SAFE_UNREG 327 /* 328 * Create and initialize key pools. 329 */ 330 pj_list_init(&ioqueue->active_list); 331 pj_list_init(&ioqueue->free_list); 332 pj_list_init(&ioqueue->closing_list); 333 334 /* Preallocate keys according to max_fd setting, and put them 335 * in free_list. 336 */ 337 for (i=0; i<max_fd; ++i) { 338 pj_ioqueue_key_t *key; 339 340 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 341 342 rc = pj_atomic_create(pool, 0, &key->ref_count); 343 if (rc != PJ_SUCCESS) { 344 key = ioqueue->free_list.next; 345 while (key != &ioqueue->free_list) { 346 pj_atomic_destroy(key->ref_count); 347 key = key->next; 348 } 349 CloseHandle(ioqueue->iocp); 350 return rc; 351 } 352 353 pj_list_push_back(&ioqueue->free_list, key); 354 355 } 356 #endif 357 306 358 *p_ioqueue = ioqueue; 307 359 … … 316 368 { 317 369 unsigned i; 370 pj_ioqueue_key_t *key; 318 371 319 372 PJ_CHECK_STACK(); 320 373 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 321 374 375 pj_lock_acquire(ioqueue->lock); 376 377 #if PJ_HAS_TCP 322 378 /* Destroy events in the pool */ 323 379 for (i=0; i<ioqueue->event_count; ++i) { … … 325 381 } 326 382 ioqueue->event_count = 0; 383 #endif 327 384 328 385 if (CloseHandle(ioqueue->iocp) != TRUE) 329 386 return PJ_RETURN_OS_ERROR(GetLastError()); 387 388 #if PJ_IOQUEUE_HAS_SAFE_UNREG 389 /* Destroy reference counters */ 390 key = ioqueue->active_list.next; 391 while (key != &ioqueue->active_list) { 392 pj_atomic_destroy(key->ref_count); 393 key = key->next; 394 } 395 396 key = ioqueue->closing_list.next; 397 while (key != &ioqueue->closing_list) { 398 pj_atomic_destroy(key->ref_count); 399 key = key->next; 400 } 401 402 key = ioqueue->free_list.next; 403 while (key != &ioqueue->free_list) { 404 pj_atomic_destroy(key->ref_count); 405 key = key->next; 406 } 407 #endif 330 408 331 409 if (ioqueue->auto_delete_lock) … … 371 449 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); 372 450 451 pj_lock_acquire(ioqueue->lock); 452 453 #if PJ_IOQUEUE_HAS_SAFE_UNREG 454 /* If safe unregistration is used, then get the key record from 455 * the free list. 456 */ 457 if (pj_list_empty(&ioqueue->free_list)) { 458 pj_lock_release(ioqueue->lock); 459 return PJ_ETOOMANY; 460 } 461 462 rec = ioqueue->free_list.next; 463 pj_list_erase(rec); 464 465 /* Set initial reference count to 1 */ 466 pj_assert(pj_atomic_get(rec->ref_count) == 0); 467 pj_atomic_inc(rec->ref_count); 468 469 rec->closing = 0; 470 471 #else 472 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 473 #endif 474 373 475 /* Build the key for this socket. */ 374 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));375 476 rec->ioqueue = ioqueue; 376 477 rec->hnd = (HANDLE)sock; … … 379 480 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); 380 481 482 #if PJ_HAS_TCP 483 rec->connecting = 0; 484 #endif 485 381 486 /* Set socket to nonblocking. */ 382 487 value = 1; 383 488 rc = ioctlsocket(sock, FIONBIO, &value); 384 489 if (rc != 0) { 490 pj_lock_release(ioqueue->lock); 385 491 return PJ_RETURN_OS_ERROR(WSAGetLastError()); 386 492 } … … 389 495 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); 390 496 if (!hioq) { 497 pj_lock_release(ioqueue->lock); 391 498 return PJ_RETURN_OS_ERROR(GetLastError()); 392 499 } 393 500 394 501 *key = rec; 502 503 #if PJ_IOQUEUE_HAS_SAFE_UNREG 504 pj_list_push_back(&ioqueue->active_list, rec); 505 #endif 506 507 pj_lock_release(ioqueue->lock); 508 395 509 return PJ_SUCCESS; 396 510 } … … 423 537 424 538 425 426 /* 427 * Internal function to poll the I/O Completion Port, execute callback, 539 #if PJ_IOQUEUE_HAS_SAFE_UNREG 540 /* Decrement the key's reference counter, and when the counter reach zero, 541 * destroy the key. 542 */ 543 static void decrement_counter(pj_ioqueue_key_t *key) 544 { 545 if (pj_atomic_dec_and_get(key->ref_count) == 0) { 546 547 pj_lock_acquire(key->ioqueue->lock); 548 549 pj_assert(key->closing == 1); 550 pj_gettimeofday(&key->free_time); 551 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; 552 pj_time_val_normalize(&key->free_time); 553 554 pj_list_erase(key); 555 pj_list_push_back(&key->ioqueue->closing_list, key); 556 557 pj_lock_release(key->ioqueue->lock); 558 } 559 } 560 #endif 561 562 /* 563 * Poll the I/O Completion Port, execute callback, 428 564 * and return the key and bytes transfered of the last operation. 429 565 */ … … 458 594 *p_key = key; 459 595 460 /* If size_status is POST_QUIT_LEN, mark the key as quitting */ 461 if (size_status == POST_QUIT_LEN) {462 key->has_quit_signal = 1;596 #if PJ_IOQUEUE_HAS_SAFE_UNREG 597 /* We shouldn't call callbacks if key is quitting. */ 598 if (key->closing) 463 599 return PJ_TRUE; 464 } 465 466 /* We shouldn't call callbacks if key is quitting. 467 * But this should have been taken care by unregister function 468 * (the unregister function should have cleared out the callbacks) 600 601 /* Increment reference counter to prevent this key from being 602 * deleted 469 603 */ 604 pj_atomic_inc(key->ref_count); 605 #endif 470 606 471 607 /* Carry out the callback */ … … 505 641 break; 506 642 } 643 644 #if PJ_IOQUEUE_HAS_SAFE_UNREG 645 decrement_counter(key); 646 #endif 647 507 648 return PJ_TRUE; 508 649 } … … 517 658 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 518 659 { 519 pj_ssize_t polled_len;520 pj_ioqueue_key_t *polled_key;521 generic_overlapped ov;522 BOOL rc;523 524 660 PJ_ASSERT_RETURN(key, PJ_EINVAL); 525 661 … … 543 679 } 544 680 #endif 545 546 547 /* Unregistering handle from IOCP is pretty tricky. 548 * 549 * Even after the socket has been closed, GetQueuedCompletionStatus 550 * may still return events for the handle. This will likely to 551 * cause crash in pjlib, because the key associated with the handle 552 * most likely will have been destroyed. 553 * 554 * The solution is to poll the IOCP until we're sure that there are 555 * no further events for the handle. 556 */ 557 558 /* Clear up callbacks for the key. 559 * We don't want the callback to be called for this key. 560 */ 681 682 /* Close handle (the only way to disassociate handle from IOCP). 683 * We also need to close handle to make sure that no further events 684 * will come to the handle. 685 */ 686 CloseHandle(key->hnd); 687 688 /* Reset callbacks */ 689 key->cb.on_accept_complete = NULL; 690 key->cb.on_connect_complete = NULL; 561 691 key->cb.on_read_complete = NULL; 562 692 key->cb.on_write_complete = NULL; 563 key->cb.on_accept_complete = NULL; 564 key->cb.on_connect_complete = NULL; 565 566 /* Init overlapped struct */ 567 pj_memset(&ov, 0, sizeof(ov)); 568 ov.operation = PJ_IOQUEUE_OP_READ; 569 570 /* Post queued completion status with a special length. */ 571 rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN, 572 (DWORD)key, &ov.overlapped); 573 574 /* Poll IOCP until has_quit_signal is set in the key. 575 * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN 576 * is detected. We need to have this flag because POST_QUIT_LEN may be 577 * detected by other threads. 578 */ 579 do { 580 polled_len = 0; 581 polled_key = NULL; 582 583 rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key); 584 585 } while (rc && !key->has_quit_signal); 586 587 588 /* Close handle if this is a file. */ 589 if (key->hnd_type == HND_IS_FILE) { 590 CloseHandle(key->hnd); 591 } 693 694 #if PJ_IOQUEUE_HAS_SAFE_UNREG 695 /* Mark key as closing. */ 696 key->closing = 1; 697 698 /* Decrement reference counter. */ 699 decrement_counter(key); 700 701 /* Even after handle is closed, I suspect that IOCP may still try to 702 * do something with the handle, causing memory corruption when pool 703 * debugging is enabled. 704 * 705 * Forcing context switch seems to have fixed that, but this is quite 706 * an ugly solution.. 707 */ 708 pj_thread_sleep(0); 709 #endif 592 710 593 711 return PJ_SUCCESS; … … 603 721 DWORD dwMsec; 604 722 int connect_count = 0; 605 pj_bool_t has_event;723 int event_count = 0; 606 724 607 725 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 608 609 /* Check the connecting array. */610 #if PJ_HAS_TCP611 connect_count = check_connecting(ioqueue);612 #endif613 726 614 727 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ … … 616 729 617 730 /* Poll for completion status. */ 618 has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 731 event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 732 733 #if PJ_HAS_TCP 734 /* Check the connecting array, only when there's no activity. */ 735 if (event_count == 0) { 736 connect_count = check_connecting(ioqueue); 737 if (connect_count > 0) 738 event_count += connect_count; 739 } 740 #endif 741 742 #if PJ_IOQUEUE_HAS_SAFE_UNREG 743 /* Check the closing keys only when there's no activity and when there are 744 * pending closing keys. 745 */ 746 if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { 747 pj_time_val now; 748 pj_ioqueue_key_t *key; 749 750 pj_gettimeofday(&now); 751 752 /* Move closing keys to free list when they've finished the closing 753 * idle time. 754 */ 755 pj_lock_acquire(ioqueue->lock); 756 key = ioqueue->closing_list.next; 757 while (key != &ioqueue->closing_list) { 758 pj_ioqueue_key_t *next = key->next; 759 760 pj_assert(key->closing != 0); 761 762 if (PJ_TIME_VAL_GTE(now, key->free_time)) { 763 pj_list_erase(key); 764 pj_list_push_back(&ioqueue->free_list, key); 765 } 766 key = next; 767 } 768 pj_lock_release(ioqueue->lock); 769 } 770 #endif 619 771 620 772 /* Return number of events. */ 621 return connect_count + has_event;773 return event_count; 622 774 } 623 775 … … 645 797 PJ_CHECK_STACK(); 646 798 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 799 800 #if PJ_IOQUEUE_HAS_SAFE_UNREG 801 /* Check key is not closing */ 802 if (key->closing) 803 return PJ_ECANCELLED; 804 #endif 647 805 648 806 op_key_rec = (union operation_key*)op_key->internal__; … … 716 874 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); 717 875 876 #if PJ_IOQUEUE_HAS_SAFE_UNREG 877 /* Check key is not closing */ 878 if (key->closing) 879 return PJ_ECANCELLED; 880 #endif 881 718 882 op_key_rec = (union operation_key*)op_key->internal__; 719 883 op_key_rec->overlapped.wsabuf.buf = buffer; … … 800 964 PJ_CHECK_STACK(); 801 965 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); 802 966 967 #if PJ_IOQUEUE_HAS_SAFE_UNREG 968 /* Check key is not closing */ 969 if (key->closing) 970 return PJ_ECANCELLED; 971 #endif 972 803 973 op_key_rec = (union operation_key*)op_key->internal__; 804 974 … … 873 1043 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 874 1044 1045 #if PJ_IOQUEUE_HAS_SAFE_UNREG 1046 /* Check key is not closing */ 1047 if (key->closing) 1048 return PJ_ECANCELLED; 1049 #endif 1050 875 1051 /* 876 1052 * See if there is a new connection immediately available. … … 962 1138 PJ_CHECK_STACK(); 963 1139 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 1140 1141 #if PJ_IOQUEUE_HAS_SAFE_UNREG 1142 /* Check key is not closing */ 1143 if (key->closing) 1144 return PJ_ECANCELLED; 1145 #endif 964 1146 965 1147 /* Initiate connect() */ -
pjproject/trunk/pjlib/src/pjlib-test/ioq_perf.c
r363 r365 399 399 pj_ioqueue_unregister(items[i].server_key); 400 400 pj_ioqueue_unregister(items[i].client_key); 401 pj_sock_close(items[i].server_fd);402 pj_sock_close(items[i].client_fd);403 401 } 404 402 -
pjproject/trunk/pjlib/src/pjlib-test/ioq_udp.c
r349 r365 458 458 /* Now unregister and close socket. */ 459 459 pj_ioqueue_unregister(key); 460 pj_sock_close(rsock);461 460 462 461 /* Poll ioqueue. */ … … 539 538 if (rc != PJ_SUCCESS) { 540 539 app_perror("...error in pj_ioqueue_unregister", rc); 541 }542 rc = pj_sock_close(sock[i]);543 if (rc != PJ_SUCCESS) {544 app_perror("...error in pj_sock_close", rc);545 540 } 546 541 } -
pjproject/trunk/pjlib/src/pjlib-test/test.c
r126 r365 146 146 #endif 147 147 148 #if INCLUDE_IOQUEUE_UNREG_TEST 149 DO_TEST( udp_ioqueue_unreg_test() ); 150 #endif 151 148 152 #if INCLUDE_FILE_TEST 149 153 DO_TEST( file_test() ); -
pjproject/trunk/pjlib/src/pjlib-test/test.h
r65 r365 49 49 #define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK 50 50 #define INCLUDE_IOQUEUE_PERF_TEST GROUP_NETWORK 51 #define INCLUDE_IOQUEUE_UNREG_TEST GROUP_NETWORK 51 52 #define INCLUDE_FILE_TEST GROUP_FILE 52 53 … … 83 84 extern int select_test(void); 84 85 extern int udp_ioqueue_test(void); 86 extern int udp_ioqueue_unreg_test(void); 85 87 extern int tcp_ioqueue_test(void); 86 88 extern int ioqueue_perf_test(void);
Note: See TracChangeset
for help on using the changeset viewer.