Changeset 363
- Timestamp:
- Mar 25, 2006 10:06:00 AM (19 years ago)
- Location:
- pjproject/trunk/pjlib
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/build/pjlib.dsp
r349 r363 234 234 235 235 SOURCE=..\src\pj\ioqueue_select.c 236 # PROP Exclude_From_Build 1237 236 # End Source File 238 237 # Begin Source File 239 238 240 239 SOURCE=..\src\pj\ioqueue_winnt.c 241 242 !IF "$(CFG)" == "pjlib - Win32 Release" 243 244 !ELSEIF "$(CFG)" == "pjlib - Win32 Debug" 245 246 !ENDIF 247 240 # PROP Exclude_From_Build 1 248 241 # End Source File 249 242 # Begin Source File -
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c
r126 r363 28 28 */ 29 29 30 static long ioqueue_tls_id = -1; 31 32 typedef struct key_lock_data { 33 struct key_lock_data *prev; 34 pj_ioqueue_key_t *key; 35 int is_alive; 36 } key_lock_data; 37 38 30 39 static void ioqueue_init( pj_ioqueue_t *ioqueue ) 31 40 { 32 41 ioqueue->lock = NULL; 33 42 ioqueue->auto_delete_lock = 0; 43 44 if (ioqueue_tls_id == -1) { 45 pj_status_t status; 46 status = pj_thread_local_alloc(&ioqueue_tls_id); 47 pj_thread_local_set(ioqueue_tls_id, NULL); 48 } 34 49 } 35 50 … … 93 108 key->fd_type = PJ_SOCK_STREAM; 94 109 110 key->inside_callback = 0; 111 key->destroy_requested = 0; 112 95 113 /* Create mutex for the key. */ 96 rc = pj_mutex_create_ simple(pool, NULL, &key->mutex);114 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); 97 115 98 116 return rc; 99 117 } 100 118 119 /* Lock the key and also keep the lock data in thread local storage. 120 * The lock data is used to detect if the key is deleted by application 121 * when we call its callback. 122 */ 123 static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck) 124 { 125 struct key_lock_data *prev_data; 126 127 pj_mutex_lock(key->mutex); 128 prev_data = (struct key_lock_data *) 129 pj_thread_local_get(ioqueue_tls_id); 130 lck->prev = prev_data; 131 lck->key = key; 132 lck->is_alive = 1; 133 pj_thread_local_set(ioqueue_tls_id, lck); 134 } 135 136 /* Unlock the key only if it is still valid. */ 137 static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck) 138 { 139 pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck); 140 pj_assert( lck->key == key ); 141 pj_thread_local_set(ioqueue_tls_id, lck->prev); 142 if (lck->is_alive) 143 pj_mutex_unlock(key->mutex); 144 } 145 146 /* Destroy key */ 101 147 static void ioqueue_destroy_key( pj_ioqueue_key_t *key ) 102 148 { 149 key_lock_data *lck; 150 151 /* Make sure that no other threads are doing something with 152 * the key. 153 */ 154 pj_mutex_lock(key->mutex); 155 156 /* Check if this function is called within a callback context. 157 * If so, then we need to inform the callback that the key has 158 * been destroyed so that it doesn't attempt to unlock the 159 * key's mutex. 160 */ 161 lck = pj_thread_local_get(ioqueue_tls_id); 162 while (lck) { 163 if (lck->key == key) { 164 lck->is_alive = 0; 165 } 166 lck = lck->prev; 167 } 168 103 169 pj_mutex_destroy(key->mutex); 104 170 } … … 164 230 void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) 165 231 { 232 key_lock_data lck_data; 233 166 234 /* Lock the key. */ 167 pj_mutex_lock(h->mutex);235 lock_key(h, &lck_data); 168 236 169 237 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 … … 179 247 180 248 /* Unlock; from this point we don't need to hold key's mutex. */ 181 pj_mutex_unlock(h->mutex);249 //pj_mutex_unlock(h->mutex); 182 250 183 251 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) … … 252 320 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); 253 321 254 pj_mutex_unlock(h->mutex);322 //pj_mutex_unlock(h->mutex); 255 323 } 256 324 … … 299 367 300 368 /* No need to hold mutex anymore */ 301 pj_mutex_unlock(h->mutex);369 //pj_mutex_unlock(h->mutex); 302 370 } 303 371 … … 310 378 311 379 } else { 312 pj_mutex_unlock(h->mutex);380 //pj_mutex_unlock(h->mutex); 313 381 } 314 382 … … 320 388 * able to process the event. 321 389 */ 322 pj_mutex_unlock(h->mutex); 323 } 390 //pj_mutex_unlock(h->mutex); 391 } 392 393 /* Finally unlock key */ 394 unlock_key(h, &lck_data); 324 395 } 325 396 326 397 void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) 327 398 { 399 key_lock_data lck_data; 328 400 pj_status_t rc; 329 401 330 402 /* Lock the key. */ 331 pj_mutex_lock(h->mutex);403 lock_key(h, &lck_data); 332 404 333 405 # if PJ_HAS_TCP … … 346 418 347 419 /* Unlock; from this point we don't need to hold key's mutex. */ 348 pj_mutex_unlock(h->mutex);420 //pj_mutex_unlock(h->mutex); 349 421 350 422 rc=pj_sock_accept(h->fd, accept_op->accept_fd, … … 379 451 380 452 /* Unlock; from this point we don't need to hold key's mutex. */ 381 pj_mutex_unlock(h->mutex); 453 //Crash as of revision 353 (since we added pjmedia socket to 454 //main ioqueue). 455 //pj_mutex_unlock(h->mutex); 382 456 383 457 bytes_read = read_op->size; … … 456 530 * able to process the event. 457 531 */ 458 pj_mutex_unlock(h->mutex); 459 } 532 //pj_mutex_unlock(h->mutex); 533 } 534 535 /* Unlock handle. */ 536 unlock_key(h, &lck_data); 460 537 } 461 538 … … 464 541 pj_ioqueue_key_t *h ) 465 542 { 466 pj_mutex_lock(h->mutex); 543 key_lock_data lck_data; 544 545 lock_key(h, &lck_data); 467 546 468 547 if (!h->connecting) { … … 471 550 * it has been processed by other thread. 472 551 */ 473 pj_mutex_unlock(h->mutex); 552 //pj_mutex_unlock(h->mutex); 553 unlock_key(h, &lck_data); 474 554 return; 475 555 } … … 478 558 h->connecting = 0; 479 559 480 pj_mutex_unlock(h->mutex);560 //pj_mutex_unlock(h->mutex); 481 561 482 562 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); … … 486 566 if (h->cb.on_connect_complete) 487 567 (*h->cb.on_connect_complete)(h, -1); 568 569 unlock_key(h, &lck_data); 488 570 } 489 571 … … 905 987 { 906 988 struct generic_operation *op_rec; 989 key_lock_data lck_data; 907 990 908 991 /* … … 910 993 * really make sure that it's still there; then call the callback. 911 994 */ 912 pj_mutex_lock(key->mutex);995 lock_key(key, &lck_data); 913 996 914 997 /* Find the operation in the pending read list. */ … … 918 1001 pj_list_erase(op_rec); 919 1002 op_rec->op = 0; 920 pj_mutex_unlock(key->mutex);1003 //pj_mutex_unlock(key->mutex); 921 1004 922 1005 (*key->cb.on_read_complete)(key, op_key, bytes_status); 1006 1007 unlock_key(key, &lck_data); 923 1008 return PJ_SUCCESS; 924 1009 } … … 932 1017 pj_list_erase(op_rec); 933 1018 op_rec->op = 0; 934 pj_mutex_unlock(key->mutex);1019 //pj_mutex_unlock(key->mutex); 935 1020 936 1021 (*key->cb.on_write_complete)(key, op_key, bytes_status); 1022 1023 unlock_key(key, &lck_data); 937 1024 return PJ_SUCCESS; 938 1025 } … … 946 1033 pj_list_erase(op_rec); 947 1034 op_rec->op = 0; 948 pj_mutex_unlock(key->mutex);1035 //pj_mutex_unlock(key->mutex); 949 1036 950 1037 (*key->cb.on_accept_complete)(key, op_key, 951 1038 PJ_INVALID_SOCKET, 952 1039 bytes_status); 1040 1041 unlock_key(key, &lck_data); 953 1042 return PJ_SUCCESS; 954 1043 } … … 956 1045 } 957 1046 958 pj_mutex_unlock(key->mutex);1047 unlock_key(key, &lck_data); 959 1048 960 1049 return PJ_EINVALIDOP; -
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.h
r165 r363 92 92 pj_ioqueue_t *ioqueue; \ 93 93 pj_mutex_t *mutex; \ 94 pj_bool_t inside_callback; \ 95 pj_bool_t destroy_requested; \ 94 96 pj_sock_t fd; \ 95 97 int fd_type; \ -
pjproject/trunk/pjlib/src/pjlib-test/ioq_perf.c
r351 r363 392 392 TRACE_((THIS_FILE, " join thread %d..", i)); 393 393 pj_thread_join(thread[i]); 394 pj_thread_destroy(thread[i]);395 394 } 396 395 … … 429 428 *p_bandwidth = (pj_uint32_t)bandwidth; 430 429 431 PJ_LOG(3,(THIS_FILE, " %.4s % d %d%8d KB/s",430 PJ_LOG(3,(THIS_FILE, " %.4s %2d %2d %8d KB/s", 432 431 type_name, thread_cnt, sockpair_cnt, 433 432 *p_bandwidth));
Note: See TracChangeset
for help on using the changeset viewer.