Changeset 365 for pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c
- Timestamp:
- Mar 30, 2006 4:32:18 PM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c
r363 r365 28 28 */ 29 29 30 static long ioqueue_tls_id = -1;31 32 typedef struct key_lock_data {33 struct key_lock_data *prev;34 pj_ioqueue_key_t *key;35 int is_alive;36 } key_lock_data;37 38 39 30 static void ioqueue_init( pj_ioqueue_t *ioqueue ) 40 31 { 41 32 ioqueue->lock = NULL; 42 33 ioqueue->auto_delete_lock = 0; 43 44 if (ioqueue_tls_id == -1) {45 pj_status_t status;46 status = pj_thread_local_alloc(&ioqueue_tls_id);47 pj_thread_local_set(ioqueue_tls_id, NULL);48 }49 34 } 50 35 … … 94 79 #if PJ_HAS_TCP 95 80 pj_list_init(&key->accept_list); 81 key->connecting = 0; 96 82 #endif 97 83 98 84 /* Save callback. */ 99 85 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 86 87 #if PJ_IOQUEUE_HAS_SAFE_UNREG 88 /* Set initial reference count to 1 */ 89 pj_assert(key->ref_count == 0); 90 ++key->ref_count; 91 92 key->closing = 0; 93 #endif 100 94 101 95 /* Get socket type. When socket type is datagram, some optimization … … 108 102 key->fd_type = PJ_SOCK_STREAM; 109 103 110 key->inside_callback = 0;111 key->destroy_requested = 0;112 113 104 /* Create mutex for the key. */ 114 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 105 #if !PJ_IOQUEUE_HAS_SAFE_UNREG 106 rc = pj_mutex_create_simple(pool, NULL, &key->mutex); 107 #endif 115 108 116 109 return rc; 117 }118 119 /* Lock the key and also keep the lock data in thread local storage.120 * The lock data is used to detect if the key is deleted by application121 * when we call its callback.122 */123 static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck)124 {125 struct key_lock_data *prev_data;126 127 pj_mutex_lock(key->mutex);128 prev_data = (struct key_lock_data *)129 pj_thread_local_get(ioqueue_tls_id);130 lck->prev = prev_data;131 lck->key = key;132 lck->is_alive = 1;133 pj_thread_local_set(ioqueue_tls_id, lck);134 }135 136 /* Unlock the key only if it is still valid. */137 static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck)138 {139 pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck);140 pj_assert( lck->key == key );141 pj_thread_local_set(ioqueue_tls_id, lck->prev);142 if (lck->is_alive)143 pj_mutex_unlock(key->mutex);144 }145 146 /* Destroy key */147 static void ioqueue_destroy_key( pj_ioqueue_key_t *key )148 {149 key_lock_data *lck;150 151 /* Make sure that no other threads are doing something with152 * the key.153 */154 pj_mutex_lock(key->mutex);155 156 /* Check if this function is called within a callback context.157 * If so, then we need to inform the callback that the key has158 * been destroyed so that it doesn't attempt to unlock the159 * key's mutex.160 */161 lck = pj_thread_local_get(ioqueue_tls_id);162 while (lck) {163 if (lck->key == key) {164 lck->is_alive = 0;165 }166 lck = lck->prev;167 }168 169 pj_mutex_destroy(key->mutex);170 110 } 171 111 … … 222 162 223 163 164 #if PJ_IOQUEUE_HAS_SAFE_UNREG 165 # define IS_CLOSING(key) (key->closing) 166 #else 167 # define IS_CLOSING(key) (0) 168 #endif 169 170 224 171 /* 225 172 * ioqueue_dispatch_event() … … 230 177 void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) 231 178 { 232 key_lock_data lck_data;233 234 179 /* Lock the key. */ 235 lock_key(h, &lck_data); 180 pj_mutex_lock(h->mutex); 181 182 if (h->closing) { 183 pj_mutex_unlock(h->mutex); 184 return; 185 } 236 186 237 187 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 … … 246 196 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); 247 197 248 /* Unlock; from this point we don't need to hold key's mutex. */249 //pj_mutex_unlock(h->mutex);250 198 251 199 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) … … 294 242 #endif 295 243 244 /* Unlock; from this point we don't need to hold key's mutex. */ 245 pj_mutex_unlock(h->mutex); 246 296 247 /* Call callback. */ 297 if (h->cb.on_connect_complete )248 if (h->cb.on_connect_complete && !IS_CLOSING(h)) 298 249 (*h->cb.on_connect_complete)(h, bytes_transfered); 299 250 … … 320 271 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 321 272 322 //pj_mutex_unlock(h->mutex);323 273 } 324 274 … … 366 316 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 367 317 368 /* No need to hold mutex anymore */369 //pj_mutex_unlock(h->mutex);370 318 } 371 319 320 /* No need to hold mutex anymore */ 321 pj_mutex_unlock(h->mutex); 322 372 323 /* Call callback. */ 373 if (h->cb.on_write_complete ) {324 if (h->cb.on_write_complete && !IS_CLOSING(h)) { 374 325 (*h->cb.on_write_complete)(h, 375 326 (pj_ioqueue_op_key_t*)write_op, … … 378 329 379 330 } else { 380 //pj_mutex_unlock(h->mutex);331 pj_mutex_unlock(h->mutex); 381 332 } 382 333 … … 388 339 * able to process the event. 389 340 */ 390 //pj_mutex_unlock(h->mutex); 391 } 392 393 /* Finally unlock key */ 394 unlock_key(h, &lck_data); 341 pj_mutex_unlock(h->mutex); 342 } 395 343 } 396 344 397 345 void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) 398 346 { 399 key_lock_data lck_data;400 347 pj_status_t rc; 401 348 402 349 /* Lock the key. */ 403 lock_key(h, &lck_data); 350 pj_mutex_lock(h->mutex); 351 352 if (h->closing) { 353 pj_mutex_unlock(h->mutex); 354 return; 355 } 404 356 405 357 # if PJ_HAS_TCP … … 416 368 if (pj_list_empty(&h->accept_list)) 417 369 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); 418 419 /* Unlock; from this point we don't need to hold key's mutex. */420 //pj_mutex_unlock(h->mutex);421 370 422 371 rc=pj_sock_accept(h->fd, accept_op->accept_fd, … … 428 377 } 429 378 379 /* Unlock; from this point we don't need to hold key's mutex. */ 380 pj_mutex_unlock(h->mutex); 381 430 382 /* Call callback. */ 431 if (h->cb.on_accept_complete ) {383 if (h->cb.on_accept_complete && !IS_CLOSING(h)) { 432 384 (*h->cb.on_accept_complete)(h, 433 385 (pj_ioqueue_op_key_t*)accept_op, … … 449 401 if (pj_list_empty(&h->read_list)) 450 402 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); 451 452 /* Unlock; from this point we don't need to hold key's mutex. */453 //Crash as of revision 353 (since we added pjmedia socket to454 //main ioqueue).455 //pj_mutex_unlock(h->mutex);456 403 457 404 bytes_read = read_op->size; … … 517 464 } 518 465 466 /* Unlock; from this point we don't need to hold key's mutex. */ 467 pj_mutex_unlock(h->mutex); 468 519 469 /* Call callback. */ 520 if (h->cb.on_read_complete ) {470 if (h->cb.on_read_complete && !IS_CLOSING(h)) { 521 471 (*h->cb.on_read_complete)(h, 522 472 (pj_ioqueue_op_key_t*)read_op, … … 530 480 * able to process the event. 531 481 */ 532 //pj_mutex_unlock(h->mutex); 533 } 534 535 /* Unlock handle. */ 536 unlock_key(h, &lck_data); 482 pj_mutex_unlock(h->mutex); 483 } 537 484 } 538 485 … … 541 488 pj_ioqueue_key_t *h ) 542 489 { 543 key_lock_data lck_data; 544 545 lock_key(h, &lck_data); 490 pj_mutex_lock(h->mutex); 546 491 547 492 if (!h->connecting) { … … 550 495 * it has been processed by other thread. 551 496 */ 552 //pj_mutex_unlock(h->mutex); 553 unlock_key(h, &lck_data); 497 pj_mutex_unlock(h->mutex); 554 498 return; 499 } 500 501 if (h->closing) { 502 pj_mutex_unlock(h->mutex); 503 return; 555 504 } 556 505 … … 558 507 h->connecting = 0; 559 508 560 //pj_mutex_unlock(h->mutex);561 562 509 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 563 510 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); 564 511 512 pj_mutex_unlock(h->mutex); 513 565 514 /* Call callback. */ 566 if (h->cb.on_connect_complete )515 if (h->cb.on_connect_complete && !IS_CLOSING(h)) 567 516 (*h->cb.on_connect_complete)(h, -1); 568 569 unlock_key(h, &lck_data);570 517 } 571 518 … … 588 535 read_op = (struct read_operation*)op_key; 589 536 read_op->op = 0; 537 538 /* Check if key is closing. */ 539 if (key->closing) 540 return PJ_ECANCELLED; 590 541 591 542 /* Try to see if there's data immediately available. … … 646 597 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); 647 598 PJ_CHECK_STACK(); 599 600 /* Check if key is closing. */ 601 if (key->closing) 602 return PJ_ECANCELLED; 648 603 649 604 read_op = (struct read_operation*)op_key; … … 710 665 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); 711 666 PJ_CHECK_STACK(); 667 668 /* Check if key is closing. */ 669 if (key->closing) 670 return PJ_ECANCELLED; 712 671 713 672 write_op = (struct write_operation*)op_key; … … 789 748 PJ_CHECK_STACK(); 790 749 750 /* Check if key is closing. */ 751 if (key->closing) 752 return PJ_ECANCELLED; 753 791 754 write_op = (struct write_operation*)op_key; 792 755 write_op->op = 0; … … 869 832 /* check parameters. All must be specified! */ 870 833 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); 834 835 /* Check if key is closing. */ 836 if (key->closing) 837 return PJ_ECANCELLED; 871 838 872 839 accept_op = (struct accept_operation*)op_key; … … 930 897 /* check parameters. All must be specified! */ 931 898 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); 899 900 /* Check if key is closing. */ 901 if (key->closing) 902 return PJ_ECANCELLED; 932 903 933 904 /* Check if socket has not been marked for connecting */ … … 987 958 { 988 959 struct generic_operation *op_rec; 989 key_lock_data lck_data;990 960 991 961 /* … … 993 963 * really make sure that it's still there; then call the callback. 994 964 */ 995 lock_key(key, &lck_data);965 pj_mutex_lock(key->mutex); 996 966 997 967 /* Find the operation in the pending read list. */ … … 1001 971 pj_list_erase(op_rec); 1002 972 op_rec->op = 0; 1003 //pj_mutex_unlock(key->mutex);973 pj_mutex_unlock(key->mutex); 1004 974 1005 975 (*key->cb.on_read_complete)(key, op_key, bytes_status); 1006 1007 unlock_key(key, &lck_data);1008 976 return PJ_SUCCESS; 1009 977 } … … 1017 985 pj_list_erase(op_rec); 1018 986 op_rec->op = 0; 1019 //pj_mutex_unlock(key->mutex);987 pj_mutex_unlock(key->mutex); 1020 988 1021 989 (*key->cb.on_write_complete)(key, op_key, bytes_status); 1022 1023 unlock_key(key, &lck_data);1024 990 return PJ_SUCCESS; 1025 991 } … … 1033 999 pj_list_erase(op_rec); 1034 1000 op_rec->op = 0; 1035 //pj_mutex_unlock(key->mutex);1001 pj_mutex_unlock(key->mutex); 1036 1002 1037 1003 (*key->cb.on_accept_complete)(key, op_key, 1038 1004 PJ_INVALID_SOCKET, 1039 1005 bytes_status); 1040 1041 unlock_key(key, &lck_data);1042 1006 return PJ_SUCCESS; 1043 1007 } … … 1045 1009 } 1046 1010 1047 unlock_key(key, &lck_data);1011 pj_mutex_unlock(key->mutex); 1048 1012 1049 1013 return PJ_EINVALIDOP;
Note: See TracChangeset
for help on using the changeset viewer.