Changeset 4359
- Timestamp:
- Feb 21, 2013 11:18:36 AM (12 years ago)
- Location:
- pjproject/trunk/pjlib
- Files:
-
- 16 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/include/pj/activesock.h
r3553 r4359 175 175 { 176 176 /** 177 * Optional group lock to be assigned to the ioqueue key. 178 */ 179 pj_grp_lock_t *grp_lock; 180 181 /** 177 182 * Number of concurrent asynchronous operations that is to be supported 178 183 * by the active socket. This value only affects socket receive and … … 291 296 pj_sockaddr *bound_addr); 292 297 293 294 298 /** 295 299 * Close the active socket. This will unregister the socket from the … … 549 553 int addr_len); 550 554 555 551 556 #endif /* PJ_HAS_TCP */ 552 557 -
pjproject/trunk/pjlib/include/pj/config.h
r4191 r4359 485 485 #ifndef PJ_TIMER_DEBUG 486 486 # define PJ_TIMER_DEBUG 0 487 #endif 488 489 490 /** 491 * Set this to 1 to enable debugging on the group lock. Default: 0 492 */ 493 #ifndef PJ_GRP_LOCK_DEBUG 494 # define PJ_GRP_LOCK_DEBUG 0 487 495 #endif 488 496 … … 1123 1131 1124 1132 /** 1133 * Simulate race condition by sleeping the thread in strategic locations. 1134 * Default: no! 1135 */ 1136 #ifndef PJ_RACE_ME 1137 # define PJ_RACE_ME(x) 1138 #endif 1139 1140 /** 1125 1141 * Function attributes to inform that the function may throw exception. 1126 1142 * -
pjproject/trunk/pjlib/include/pj/errno.h
r3664 r4359 423 423 */ 424 424 #define PJ_EAFNOTSUP (PJ_ERRNO_START_STATUS + 22)/* 70022 */ 425 /** 426 * @hideinitializer 427 * Object no longer exists 428 */ 429 #define PJ_EGONE (PJ_ERRNO_START_STATUS + 23)/* 70023 */ 425 430 426 431 /** @} */ /* pj_errnum */ -
pjproject/trunk/pjlib/include/pj/ioqueue.h
r3553 r4359 401 401 pj_ioqueue_t *ioque, 402 402 pj_sock_t sock, 403 void *user_data, 404 const pj_ioqueue_callback *cb, 405 pj_ioqueue_key_t **key ); 406 407 /** 408 * Variant of pj_ioqueue_register_sock() with additional group lock parameter. 409 * If group lock is set for the key, the key will add the reference counter 410 * when the socket is registered and decrease it when it is destroyed. 411 */ 412 PJ_DECL(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, 413 pj_ioqueue_t *ioque, 414 pj_sock_t sock, 415 pj_grp_lock_t *grp_lock, 403 416 void *user_data, 404 417 const pj_ioqueue_callback *cb, -
pjproject/trunk/pjlib/include/pj/lock.h
r3553 r4359 148 148 /** @} */ 149 149 150 151 /** 152 * @defgroup PJ_GRP_LOCK Group Lock 153 * @ingroup PJ_LOCK 154 * @{ 155 * 156 * Group lock is a synchronization object to manage concurrency among members 157 * within the same logical group. Example of such groups are: 158 * 159 * - dialog, which has members such as the dialog itself, an invite session, 160 * and several transactions 161 * - ICE, which has members such as ICE stream transport, ICE session, STUN 162 * socket, TURN socket, and down to ioqueue key 163 * 164 * Group lock has three functions: 165 * 166 * - mutual exclusion: to protect resources from being accessed by more than 167 * one threads at the same time 168 * - session management: to make sure that the resource is not destroyed 169 * while others are still using or about to use it. 170 * - lock coordinator: to provide uniform lock ordering among more than one 171 * lock objects, which is necessary to avoid deadlock. 172 * 173 * The requirements of the group lock are: 174 * 175 * - must satisfy all the functions above 176 * - must allow members to join or leave the group (for example, 177 * transaction may be added or removed from a dialog) 178 * - must be able to synchronize with external lock (for example, a dialog 179 * lock must be able to sync itself with PJSUA lock) 180 * 181 * Please see https://trac.pjsip.org/repos/wiki/Group_Lock for more info. 182 */ 183 184 /** 185 * Settings for creating the group lock. 186 */ 187 typedef struct pj_grp_lock_config 188 { 189 /** 190 * Creation flags, currently must be zero. 191 */ 192 unsigned flags; 193 194 } pj_grp_lock_config; 195 196 197 /** 198 * Initialize the config with the default values. 199 * 200 * @param cfg The config to be initialized. 201 */ 202 PJ_DECL(void) pj_grp_lock_config_default(pj_grp_lock_config *cfg); 203 204 /** 205 * Create a group lock object. Initially the group lock will have reference 206 * counter of one. 207 * 208 * @param pool The group lock only uses the pool parameter to get 209 * the pool factory, from which it will create its own 210 * pool. 211 * @param cfg Optional configuration. 212 * @param p_grp_lock Pointer to receive the newly created group lock. 213 * 214 * @return PJ_SUCCESS or the appropriate error code. 215 */ 216 PJ_DECL(pj_status_t) pj_grp_lock_create(pj_pool_t *pool, 217 const pj_grp_lock_config *cfg, 218 pj_grp_lock_t **p_grp_lock); 219 220 /** 221 * Forcibly destroy the group lock, ignoring the reference counter value. 222 * 223 * @param grp_lock The group lock. 224 * 225 * @return PJ_SUCCESS or the appropriate error code. 226 */ 227 PJ_DECL(pj_status_t) pj_grp_lock_destroy( pj_grp_lock_t *grp_lock); 228 229 /** 230 * Move the contents of the old lock to the new lock and destroy the 231 * old lock. 232 * 233 * @param old_lock The old group lock to be destroyed. 234 * @param new_lock The new group lock. 235 * 236 * @return PJ_SUCCESS or the appropriate error code. 237 */ 238 PJ_DECL(pj_status_t) pj_grp_lock_replace(pj_grp_lock_t *old_lock, 239 pj_grp_lock_t *new_lock); 240 241 /** 242 * Acquire lock on the specified group lock. 243 * 244 * @param grp_lock The group lock. 245 * 246 * @return PJ_SUCCESS or the appropriate error code. 247 */ 248 PJ_DECL(pj_status_t) pj_grp_lock_acquire( pj_grp_lock_t *grp_lock); 249 250 /** 251 * Acquire lock on the specified group lock if it is available, otherwise 252 * return immediately wihout waiting. 253 * 254 * @param grp_lock The group lock. 255 * 256 * @return PJ_SUCCESS or the appropriate error code. 257 */ 258 PJ_DECL(pj_status_t) pj_grp_lock_tryacquire( pj_grp_lock_t *grp_lock); 259 260 /** 261 * Release the previously held lock. This may cause the group lock 262 * to be destroyed if it is the last one to hold the reference counter. 263 * In that case, the function will return PJ_EGONE. 264 * 265 * @param grp_lock The group lock. 266 * 267 * @return PJ_SUCCESS or the appropriate error code. 268 */ 269 PJ_DECL(pj_status_t) pj_grp_lock_release( pj_grp_lock_t *grp_lock); 270 271 /** 272 * Add a destructor handler, to be called by the group lock when it is 273 * about to be destroyed. 274 * 275 * @param grp_lock The group lock. 276 * @param pool Pool to allocate memory for the handler. 277 * @param member A pointer to be passed to the handler. 278 * @param handler The destroy handler. 279 * 280 * @return PJ_SUCCESS or the appropriate error code. 281 */ 282 PJ_DECL(pj_status_t) pj_grp_lock_add_handler(pj_grp_lock_t *grp_lock, 283 pj_pool_t *pool, 284 void *member, 285 void (*handler)(void *member)); 286 287 /** 288 * Remove previously registered handler. All parameters must be the same 289 * as when the handler was added. 290 * 291 * @param grp_lock The group lock. 292 * @param member A pointer to be passed to the handler. 293 * @param handler The destroy handler. 294 * 295 * @return PJ_SUCCESS or the appropriate error code. 296 */ 297 PJ_DECL(pj_status_t) pj_grp_lock_del_handler(pj_grp_lock_t *grp_lock, 298 void *member, 299 void (*handler)(void *member)); 300 301 /** 302 * Increment reference counter to prevent the group lock grom being destroyed. 303 * 304 * @param grp_lock The group lock. 305 * 306 * @return PJ_SUCCESS or the appropriate error code. 307 */ 308 #if !PJ_GRP_LOCK_DEBUG 309 PJ_DECL(pj_status_t) pj_grp_lock_add_ref(pj_grp_lock_t *grp_lock); 310 311 #define pj_grp_lock_add_ref_dbg(grp_lock, x, y) pj_grp_lock_add_ref(grp_lock) 312 313 #else 314 315 #define pj_grp_lock_add_ref(g) pj_grp_lock_add_ref_dbg(g, __FILE__, __LINE__) 316 317 PJ_DECL(pj_status_t) pj_grp_lock_add_ref_dbg(pj_grp_lock_t *grp_lock, 318 const char *file, 319 int line); 320 #endif 321 322 /** 323 * Decrement the reference counter. When the counter value reaches zero, the 324 * group lock will be destroyed and all destructor handlers will be called. 325 * 326 * @param grp_lock The group lock. 327 * 328 * @return PJ_SUCCESS or the appropriate error code. 329 */ 330 #if !PJ_GRP_LOCK_DEBUG 331 PJ_DECL(pj_status_t) pj_grp_lock_dec_ref(pj_grp_lock_t *grp_lock); 332 333 #define pj_grp_lock_dec_ref_dbg(grp_lock, x, y) pj_grp_lock_dec_ref(grp_lock) 334 #else 335 336 #define pj_grp_lock_dec_ref(g) pj_grp_lock_dec_ref_dbg(g, __FILE__, __LINE__) 337 338 PJ_DECL(pj_status_t) pj_grp_lock_dec_ref_dbg(pj_grp_lock_t *grp_lock, 339 const char *file, 340 int line); 341 342 #endif 343 344 /** 345 * Get current reference count value. This normally is only used for 346 * debugging purpose. 347 * 348 * @param grp_lock The group lock. 349 * 350 * @return The reference count value. 351 */ 352 PJ_DECL(int) pj_grp_lock_get_ref(pj_grp_lock_t *grp_lock); 353 354 355 /** 356 * Dump group lock info for debugging purpose. If group lock debugging is 357 * enabled (via PJ_GRP_LOCK_DEBUG) macro, this will print the group lock 358 * reference counter value along with the source file and line. If 359 * debugging is disabled, this will only print the reference counter. 360 * 361 * @param grp_lock The group lock. 362 */ 363 PJ_DECL(void) pj_grp_lock_dump(pj_grp_lock_t *grp_lock); 364 365 366 /** 367 * Synchronize an external lock with the group lock, by adding it to the 368 * list of locks to be acquired by the group lock when the group lock is 369 * acquired. 370 * 371 * The ''pos'' argument specifies the lock order and also the relative 372 * position with regard to lock ordering against the group lock. Locks with 373 * lower ''pos'' value will be locked first, and those with negative value 374 * will be locked before the group lock (the group lock's ''pos'' value is 375 * zero). 376 * 377 * @param grp_lock The group lock. 378 * @param ext_lock The external lock 379 * @param pos The position. 380 * 381 * @return PJ_SUCCESS or the appropriate error code. 382 */ 383 PJ_DECL(pj_status_t) pj_grp_lock_chain_lock(pj_grp_lock_t *grp_lock, 384 pj_lock_t *ext_lock, 385 int pos); 386 387 /** 388 * Remove an external lock from group lock's list of synchronized locks. 389 * 390 * @param grp_lock The group lock. 391 * @param ext_lock The external lock 392 * 393 * @return PJ_SUCCESS or the appropriate error code. 394 */ 395 PJ_DECL(pj_status_t) pj_grp_lock_unchain_lock(pj_grp_lock_t *grp_lock, 396 pj_lock_t *ext_lock); 397 398 399 /** @} */ 400 401 150 402 PJ_END_DECL 151 403 -
pjproject/trunk/pjlib/include/pj/timer.h
r4154 r4359 25 25 26 26 #include <pj/types.h> 27 #include <pj/lock.h> 27 28 28 29 PJ_BEGIN_DECL … … 118 119 */ 119 120 pj_time_val _timer_value; 121 122 /** 123 * Internal: the group lock used by this entry, set when 124 * pj_timer_heap_schedule_w_lock() is used. 125 */ 126 pj_grp_lock_t *_grp_lock; 120 127 121 128 #if PJ_TIMER_DEBUG … … 230 237 231 238 /** 232 * Cancel a previously registered timer. 239 * Schedule a timer entry which will expire AFTER the specified delay, and 240 * increment the reference counter of the group lock while the timer entry 241 * is active. The group lock reference counter will automatically be released 242 * after the timer callback is called or when the timer is cancelled. 243 * 244 * @param ht The timer heap. 245 * @param entry The entry to be registered. 246 * @param id_val The value to be set to the "id" field of the timer entry 247 * once the timer is scheduled. 248 * @param delay The interval to expire. 249 * @param grp_lock The group lock. 250 * 251 * @return PJ_SUCCESS, or the appropriate error code. 252 */ 253 #if PJ_TIMER_DEBUG 254 # define pj_timer_heap_schedule_w_grp_lock(ht,e,d,id,g) \ 255 pj_timer_heap_schedule_w_grp_lock_dbg(ht,e,d,id,g,__FILE__,__LINE__) 256 257 PJ_DECL(pj_status_t) pj_timer_heap_schedule_w_grp_lock_dbg( 258 pj_timer_heap_t *ht, 259 pj_timer_entry *entry, 260 const pj_time_val *delay, 261 int id_val, 262 pj_grp_lock_t *grp_lock, 263 const char *src_file, 264 int src_line); 265 #else 266 PJ_DECL(pj_status_t) pj_timer_heap_schedule_w_grp_lock( 267 pj_timer_heap_t *ht, 268 pj_timer_entry *entry, 269 const pj_time_val *delay, 270 int id_val, 271 pj_grp_lock_t *grp_lock); 272 #endif /* PJ_TIMER_DEBUG */ 273 274 275 /** 276 * Cancel a previously registered timer. This will also decrement the 277 * reference counter of the group lock associated with the timer entry, 278 * if the entry was scheduled with one. 233 279 * 234 280 * @param ht The timer heap. … … 242 288 243 289 /** 290 * Cancel only if the previously registered timer is active. This will 291 * also decrement the reference counter of the group lock associated 292 * with the timer entry, if the entry was scheduled with one. In any 293 * case, set the "id" to the specified value. 294 * 295 * @param ht The timer heap. 296 * @param entry The entry to be cancelled. 297 * @param id_val Value to be set to "id" 298 * 299 * @return The number of timer cancelled, which should be one if the 300 * entry has really been registered, or zero if no timer was 301 * cancelled. 302 */ 303 PJ_DECL(int) pj_timer_heap_cancel_if_active(pj_timer_heap_t *ht, 304 pj_timer_entry *entry, 305 int id_val); 306 307 /** 244 308 * Get the number of timer entries. 245 309 * -
pjproject/trunk/pjlib/include/pj/types.h
r4154 r4359 232 232 typedef struct pj_lock_t pj_lock_t; 233 233 234 /** Group lock */ 235 typedef struct pj_grp_lock_t pj_grp_lock_t; 236 234 237 /** Mutex handle. */ 235 238 typedef struct pj_mutex_t pj_mutex_t; -
pjproject/trunk/pjlib/src/pj/activesock.c
r3553 r4359 44 44 }; 45 45 46 enum shutdown_dir 47 { 48 SHUT_NONE = 0, 49 SHUT_RX = 1, 50 SHUT_TX = 2 51 }; 52 46 53 struct read_op 47 54 { … … 78 85 void *user_data; 79 86 unsigned async_count; 87 unsigned shutdown; 80 88 unsigned max_loop; 81 89 pj_activesock_cb cb; … … 210 218 #endif 211 219 212 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock, 213 &ioq_cb, &asock->key); 220 status = pj_ioqueue_register_sock2(pool, ioqueue, sock, 221 (opt? opt->grp_lock : NULL), 222 asock, &ioq_cb, &asock->key); 214 223 if (status != PJ_SUCCESS) { 215 224 pj_activesock_close(asock); … … 284 293 } 285 294 286 287 295 PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock) 288 296 { 289 297 PJ_ASSERT_RETURN(asock, PJ_EINVAL); 298 asock->shutdown = SHUT_RX | SHUT_TX; 290 299 if (asock->key) { 291 300 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ … … 448 457 449 458 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); 459 460 /* Ignore if we've been shutdown */ 461 if (asock->shutdown & SHUT_RX) 462 return; 450 463 451 464 do { … … 570 583 return; 571 584 585 /* Also stop further read if we've been shutdown */ 586 if (asock->shutdown & SHUT_RX) 587 return; 588 572 589 /* Only stream oriented socket may leave data in the packet */ 573 590 if (asock->stream_oriented) { … … 649 666 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); 650 667 668 if (asock->shutdown & SHUT_TX) 669 return PJ_EINVALIDOP; 670 651 671 send_key->activesock_data = NULL; 652 672 … … 699 719 PJ_EINVAL); 700 720 721 if (asock->shutdown & SHUT_TX) 722 return PJ_EINVALIDOP; 723 701 724 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags, 702 725 addr, addr_len); … … 711 734 712 735 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); 736 737 /* Ignore if we've been shutdown. This may cause data to be partially 738 * sent even when 'wholedata' was requested if the OS only sent partial 739 * buffer. 740 */ 741 if (asock->shutdown & SHUT_TX) 742 return; 713 743 714 744 if (bytes_sent > 0 && op_key->activesock_data) { … … 756 786 PJ_ASSERT_RETURN(asock, PJ_EINVAL); 757 787 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP); 788 789 /* Ignore if we've been shutdown */ 790 if (asock->shutdown) 791 return PJ_EINVALIDOP; 758 792 759 793 asock->accept_op = (struct accept_op*) … … 798 832 799 833 PJ_UNUSED_ARG(new_sock); 834 835 /* Ignore if we've been shutdown */ 836 if (asock->shutdown) 837 return; 800 838 801 839 do { … … 836 874 } 837 875 876 /* Don't start another accept() if we've been shutdown */ 877 if (asock->shutdown) 878 return; 879 838 880 /* Prepare next accept() */ 839 881 accept_op->new_sock = PJ_INVALID_SOCKET; … … 854 896 { 855 897 PJ_UNUSED_ARG(pool); 898 899 if (asock->shutdown) 900 return PJ_EINVALIDOP; 901 856 902 return pj_ioqueue_connect(asock->key, remaddr, addr_len); 857 903 } … … 861 907 { 862 908 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); 909 910 /* Ignore if we've been shutdown */ 911 if (asock->shutdown) 912 return; 863 913 864 914 if (asock->cb.on_connect_complete) { -
pjproject/trunk/pjlib/src/pj/errno.c
r3664 r4359 78 78 PJ_BUILD_ERR(PJ_EIGNORED, "Ignored"), 79 79 PJ_BUILD_ERR(PJ_EIPV6NOTSUP, "IPv6 is not supported"), 80 PJ_BUILD_ERR(PJ_EAFNOTSUP, "Unsupported address family") 80 PJ_BUILD_ERR(PJ_EAFNOTSUP, "Unsupported address family"), 81 PJ_BUILD_ERR(PJ_EGONE, "Object no longer exists") 81 82 }; 82 83 #endif /* PJ_HAS_ERROR_STRING */ -
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.c
r3666 r4359 71 71 pj_ioqueue_key_t *key, 72 72 pj_sock_t sock, 73 pj_grp_lock_t *grp_lock, 73 74 void *user_data, 74 75 const pj_ioqueue_callback *cb) … … 115 116 /* Create mutex for the key. */ 116 117 #if !PJ_IOQUEUE_HAS_SAFE_UNREG 117 rc = pj_ mutex_create_simple(pool, NULL, &key->mutex);118 rc = pj_lock_create_simple_mutex(poll, NULL, &key->lock); 118 119 #endif 120 if (rc != PJ_SUCCESS) 121 return rc; 122 123 /* Group lock */ 124 key->grp_lock = grp_lock; 125 if (key->grp_lock) { 126 pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0); 127 } 119 128 120 return rc;129 return PJ_SUCCESS; 121 130 } 122 131 … … 190 199 { 191 200 /* Lock the key. */ 192 pj_ mutex_lock(h->mutex);201 pj_ioqueue_lock_key(h); 193 202 194 203 if (IS_CLOSING(h)) { 195 pj_ mutex_unlock(h->mutex);204 pj_ioqueue_unlock_key(h); 196 205 return; 197 206 } … … 262 271 */ 263 272 has_lock = PJ_FALSE; 264 pj_ mutex_unlock(h->mutex);273 pj_ioqueue_unlock_key(h); 265 274 } else { 266 275 has_lock = PJ_TRUE; … … 273 282 /* Unlock if we still hold the lock */ 274 283 if (has_lock) { 275 pj_ mutex_unlock(h->mutex);284 pj_ioqueue_unlock_key(h); 276 285 } 277 286 … … 380 389 */ 381 390 has_lock = PJ_FALSE; 382 pj_mutex_unlock(h->mutex); 391 pj_ioqueue_unlock_key(h); 392 PJ_RACE_ME(5); 383 393 } else { 384 394 has_lock = PJ_TRUE; … … 393 403 394 404 if (has_lock) { 395 pj_ mutex_unlock(h->mutex);405 pj_ioqueue_unlock_key(h); 396 406 } 397 407 398 408 } else { 399 pj_ mutex_unlock(h->mutex);409 pj_ioqueue_unlock_key(h); 400 410 } 401 411 … … 407 417 * able to process the event. 408 418 */ 409 pj_mutex_unlock(h->mutex);419 pj_ioqueue_unlock_key(h); 410 420 } 411 421 } … … 416 426 417 427 /* Lock the key. */ 418 pj_ mutex_lock(h->mutex);428 pj_ioqueue_lock_key(h); 419 429 420 430 if (IS_CLOSING(h)) { 421 pj_ mutex_unlock(h->mutex);431 pj_ioqueue_unlock_key(h); 422 432 return; 423 433 } … … 454 464 */ 455 465 has_lock = PJ_FALSE; 456 pj_mutex_unlock(h->mutex); 466 pj_ioqueue_unlock_key(h); 467 PJ_RACE_ME(5); 457 468 } else { 458 469 has_lock = PJ_TRUE; … … 467 478 468 479 if (has_lock) { 469 pj_ mutex_unlock(h->mutex);480 pj_ioqueue_unlock_key(h); 470 481 } 471 482 } … … 568 579 */ 569 580 has_lock = PJ_FALSE; 570 pj_mutex_unlock(h->mutex); 581 pj_ioqueue_unlock_key(h); 582 PJ_RACE_ME(5); 571 583 } else { 572 584 has_lock = PJ_TRUE; … … 581 593 582 594 if (has_lock) { 583 pj_ mutex_unlock(h->mutex);595 pj_ioqueue_unlock_key(h); 584 596 } 585 597 … … 590 602 * able to process the event. 591 603 */ 592 pj_mutex_unlock(h->mutex);604 pj_ioqueue_unlock_key(h); 593 605 } 594 606 } … … 600 612 pj_bool_t has_lock; 601 613 602 pj_ mutex_lock(h->mutex);614 pj_ioqueue_lock_key(h); 603 615 604 616 if (!h->connecting) { … … 607 619 * it has been processed by other thread. 608 620 */ 609 pj_mutex_unlock(h->mutex);621 pj_ioqueue_unlock_key(h); 610 622 return; 611 623 } 612 624 613 625 if (IS_CLOSING(h)) { 614 pj_ mutex_unlock(h->mutex);626 pj_ioqueue_unlock_key(h); 615 627 return; 616 628 } … … 630 642 */ 631 643 has_lock = PJ_FALSE; 632 pj_mutex_unlock(h->mutex); 644 pj_ioqueue_unlock_key(h); 645 PJ_RACE_ME(5); 633 646 } else { 634 647 has_lock = PJ_TRUE; … … 652 665 653 666 if (has_lock) { 654 pj_ mutex_unlock(h->mutex);667 pj_ioqueue_unlock_key(h); 655 668 } 656 669 } … … 714 727 read_op->flags = flags; 715 728 716 pj_ mutex_lock(key->mutex);729 pj_ioqueue_lock_key(key); 717 730 /* Check again. Handle may have been closed after the previous check 718 731 * in multithreaded app. If we add bad handle to the set it will … … 720 733 */ 721 734 if (IS_CLOSING(key)) { 722 pj_ mutex_unlock(key->mutex);735 pj_ioqueue_unlock_key(key); 723 736 return PJ_ECANCELLED; 724 737 } 725 738 pj_list_insert_before(&key->read_list, read_op); 726 739 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); 727 pj_ mutex_unlock(key->mutex);740 pj_ioqueue_unlock_key(key); 728 741 729 742 return PJ_EPENDING; … … 790 803 read_op->rmt_addrlen = addrlen; 791 804 792 pj_ mutex_lock(key->mutex);805 pj_ioqueue_lock_key(key); 793 806 /* Check again. Handle may have been closed after the previous check 794 807 * in multithreaded app. If we add bad handle to the set it will … … 796 809 */ 797 810 if (IS_CLOSING(key)) { 798 pj_ mutex_unlock(key->mutex);811 pj_ioqueue_unlock_key(key); 799 812 return PJ_ECANCELLED; 800 813 } 801 814 pj_list_insert_before(&key->read_list, read_op); 802 815 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); 803 pj_ mutex_unlock(key->mutex);816 pj_ioqueue_unlock_key(key); 804 817 805 818 return PJ_EPENDING; … … 904 917 write_op->flags = flags; 905 918 906 pj_ mutex_lock(key->mutex);919 pj_ioqueue_lock_key(key); 907 920 /* Check again. Handle may have been closed after the previous check 908 921 * in multithreaded app. If we add bad handle to the set it will … … 910 923 */ 911 924 if (IS_CLOSING(key)) { 912 pj_ mutex_unlock(key->mutex);925 pj_ioqueue_unlock_key(key); 913 926 return PJ_ECANCELLED; 914 927 } 915 928 pj_list_insert_before(&key->write_list, write_op); 916 929 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); 917 pj_ mutex_unlock(key->mutex);930 pj_ioqueue_unlock_key(key); 918 931 919 932 return PJ_EPENDING; … … 1051 1064 write_op->rmt_addrlen = addrlen; 1052 1065 1053 pj_ mutex_lock(key->mutex);1066 pj_ioqueue_lock_key(key); 1054 1067 /* Check again. Handle may have been closed after the previous check 1055 1068 * in multithreaded app. If we add bad handle to the set it will … … 1057 1070 */ 1058 1071 if (IS_CLOSING(key)) { 1059 pj_ mutex_unlock(key->mutex);1072 pj_ioqueue_unlock_key(key); 1060 1073 return PJ_ECANCELLED; 1061 1074 } 1062 1075 pj_list_insert_before(&key->write_list, write_op); 1063 1076 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); 1064 pj_ mutex_unlock(key->mutex);1077 pj_ioqueue_unlock_key(key); 1065 1078 1066 1079 return PJ_EPENDING; … … 1128 1141 accept_op->local_addr = local; 1129 1142 1130 pj_ mutex_lock(key->mutex);1143 pj_ioqueue_lock_key(key); 1131 1144 /* Check again. Handle may have been closed after the previous check 1132 1145 * in multithreaded app. If we add bad handle to the set it will … … 1134 1147 */ 1135 1148 if (IS_CLOSING(key)) { 1136 pj_ mutex_unlock(key->mutex);1149 pj_ioqueue_unlock_key(key); 1137 1150 return PJ_ECANCELLED; 1138 1151 } 1139 1152 pj_list_insert_before(&key->accept_list, accept_op); 1140 1153 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); 1141 pj_ mutex_unlock(key->mutex);1154 pj_ioqueue_unlock_key(key); 1142 1155 1143 1156 return PJ_EPENDING; … … 1172 1185 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { 1173 1186 /* Pending! */ 1174 pj_mutex_lock(key->mutex);1187 pj_ioqueue_lock_key(key); 1175 1188 /* Check again. Handle may have been closed after the previous 1176 1189 * check in multithreaded app. See #913 1177 1190 */ 1178 1191 if (IS_CLOSING(key)) { 1179 pj_ mutex_unlock(key->mutex);1192 pj_ioqueue_unlock_key(key); 1180 1193 return PJ_ECANCELLED; 1181 1194 } … … 1183 1196 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); 1184 1197 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT); 1185 pj_ mutex_unlock(key->mutex);1198 pj_ioqueue_unlock_key(key); 1186 1199 return PJ_EPENDING; 1187 1200 } else { … … 1229 1242 * really make sure that it's still there; then call the callback. 1230 1243 */ 1231 pj_ mutex_lock(key->mutex);1244 pj_ioqueue_lock_key(key); 1232 1245 1233 1246 /* Find the operation in the pending read list. */ … … 1237 1250 pj_list_erase(op_rec); 1238 1251 op_rec->op = PJ_IOQUEUE_OP_NONE; 1239 pj_ mutex_unlock(key->mutex);1252 pj_ioqueue_unlock_key(key); 1240 1253 1241 1254 (*key->cb.on_read_complete)(key, op_key, bytes_status); … … 1251 1264 pj_list_erase(op_rec); 1252 1265 op_rec->op = PJ_IOQUEUE_OP_NONE; 1253 pj_ mutex_unlock(key->mutex);1266 pj_ioqueue_unlock_key(key); 1254 1267 1255 1268 (*key->cb.on_write_complete)(key, op_key, bytes_status); … … 1265 1278 pj_list_erase(op_rec); 1266 1279 op_rec->op = PJ_IOQUEUE_OP_NONE; 1267 pj_ mutex_unlock(key->mutex);1280 pj_ioqueue_unlock_key(key); 1268 1281 1269 1282 (*key->cb.on_accept_complete)(key, op_key, … … 1275 1288 } 1276 1289 1277 pj_ mutex_unlock(key->mutex);1290 pj_ioqueue_unlock_key(key); 1278 1291 1279 1292 return PJ_EINVALIDOP; … … 1305 1318 PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 1306 1319 { 1307 return pj_mutex_lock(key->mutex); 1320 if (key->grp_lock) 1321 return pj_grp_lock_acquire(key->grp_lock); 1322 else 1323 return pj_lock_acquire(key->lock); 1308 1324 } 1309 1325 1310 1326 PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 1311 1327 { 1312 return pj_mutex_unlock(key->mutex); 1313 } 1314 1328 if (key->grp_lock) 1329 return pj_grp_lock_release(key->grp_lock); 1330 else 1331 return pj_lock_release(key->lock); 1332 } 1333 1334 -
pjproject/trunk/pjlib/src/pj/ioqueue_common_abs.h
r3553 r4359 102 102 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \ 103 103 pj_ioqueue_t *ioqueue; \ 104 pj_mutex_t *mutex; \ 104 pj_grp_lock_t *grp_lock; \ 105 pj_lock_t *lock; \ 105 106 pj_bool_t inside_callback; \ 106 107 pj_bool_t destroy_requested; \ -
pjproject/trunk/pjlib/src/pj/ioqueue_epoll.c
r3553 r4359 263 263 key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); 264 264 key->ref_count = 0; 265 rc = pj_ mutex_create_recursive(pool, NULL, &key->mutex);265 rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock); 266 266 if (rc != PJ_SUCCESS) { 267 267 key = ioqueue->free_list.next; 268 268 while (key != &ioqueue->free_list) { 269 pj_ mutex_destroy(key->mutex);269 pj_lock_destroy(key->lock); 270 270 key = key->next; 271 271 } … … 324 324 key = ioqueue->active_list.next; 325 325 while (key != &ioqueue->active_list) { 326 pj_ mutex_destroy(key->mutex);326 pj_lock_destroy(key->lock); 327 327 key = key->next; 328 328 } … … 330 330 key = ioqueue->closing_list.next; 331 331 while (key != &ioqueue->closing_list) { 332 pj_ mutex_destroy(key->mutex);332 pj_lock_destroy(key->lock); 333 333 key = key->next; 334 334 } … … 336 336 key = ioqueue->free_list.next; 337 337 while (key != &ioqueue->free_list) { 338 pj_ mutex_destroy(key->mutex);338 pj_lock_destroy(key->lock); 339 339 key = key->next; 340 340 } … … 423 423 if (status < 0) { 424 424 rc = pj_get_os_error(); 425 pj_ mutex_destroy(key->mutex);425 pj_lock_destroy(key->lock); 426 426 key = NULL; 427 427 TRACE_((THIS_FILE, … … 498 498 * deadlock. 499 499 */ 500 pj_ mutex_lock(key->mutex);500 pj_lock_acquire(key->lock); 501 501 502 502 /* Also lock ioqueue */ … … 532 532 533 533 /* Done. */ 534 pj_ mutex_unlock(key->mutex);534 pj_lock_release(key->lock); 535 535 #else 536 pj_ mutex_destroy(key->mutex);536 pj_lock_destroy(key->lock); 537 537 #endif 538 538 -
pjproject/trunk/pjlib/src/pj/ioqueue_select.c
r3553 r4359 40 40 #include <pj/sock_qos.h> 41 41 #include <pj/errno.h> 42 #include <pj/rand.h> 42 43 43 44 /* Now that we have access to OS'es <sys/select>, lets check again that … … 238 239 key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); 239 240 key->ref_count = 0; 240 rc = pj_ mutex_create_recursive(pool, NULL, &key->mutex);241 rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock); 241 242 if (rc != PJ_SUCCESS) { 242 243 key = ioqueue->free_list.next; 243 244 while (key != &ioqueue->free_list) { 244 pj_ mutex_destroy(key->mutex);245 pj_lock_destroy(key->lock); 245 246 key = key->next; 246 247 } … … 285 286 key = ioqueue->active_list.next; 286 287 while (key != &ioqueue->active_list) { 287 pj_ mutex_destroy(key->mutex);288 pj_lock_destroy(key->lock); 288 289 key = key->next; 289 290 } … … 291 292 key = ioqueue->closing_list.next; 292 293 while (key != &ioqueue->closing_list) { 293 pj_ mutex_destroy(key->mutex);294 pj_lock_destroy(key->lock); 294 295 key = key->next; 295 296 } … … 297 298 key = ioqueue->free_list.next; 298 299 while (key != &ioqueue->free_list) { 299 pj_ mutex_destroy(key->mutex);300 pj_lock_destroy(key->lock); 300 301 key = key->next; 301 302 } … … 313 314 * Register socket handle to ioqueue. 314 315 */ 315 PJ_DEF(pj_status_t) pj_ioqueue_register_sock (pj_pool_t *pool,316 PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, 316 317 pj_ioqueue_t *ioqueue, 317 318 pj_sock_t sock, 319 pj_grp_lock_t *grp_lock, 318 320 void *user_data, 319 321 const pj_ioqueue_callback *cb, … … 359 361 #endif 360 362 361 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);363 rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb); 362 364 if (rc != PJ_SUCCESS) { 363 365 key = NULL; … … 387 389 on_return: 388 390 /* On error, socket may be left in non-blocking mode. */ 391 if (rc != PJ_SUCCESS) { 392 if (key->grp_lock) 393 pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); 394 } 389 395 *p_key = key; 390 396 pj_lock_release(ioqueue->lock); … … 393 399 } 394 400 401 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 402 pj_ioqueue_t *ioqueue, 403 pj_sock_t sock, 404 void *user_data, 405 const pj_ioqueue_callback *cb, 406 pj_ioqueue_key_t **p_key) 407 { 408 return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, 409 cb, p_key); 410 } 411 395 412 #if PJ_IOQUEUE_HAS_SAFE_UNREG 396 413 /* Increment key's reference counter */ … … 447 464 * deadlock. 448 465 */ 449 pj_ mutex_lock(key->mutex);466 pj_ioqueue_lock_key(key); 450 467 451 468 /* Also lock ioqueue */ … … 486 503 487 504 /* Done. */ 488 pj_mutex_unlock(key->mutex); 505 if (key->grp_lock) { 506 /* just dec_ref and unlock. we will set grp_lock to NULL 507 * elsewhere */ 508 pj_grp_lock_t *grp_lock = key->grp_lock; 509 // Don't set grp_lock to NULL otherwise the other thread 510 // will crash. Just leave it as dangling pointer, but this 511 // should be safe 512 //key->grp_lock = NULL; 513 pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); 514 pj_grp_lock_release(grp_lock); 515 } else { 516 pj_ioqueue_unlock_key(key); 517 } 489 518 #else 490 pj_mutex_destroy(key->mutex); 519 if (key->grp_lock) { 520 /* set grp_lock to NULL and unlock */ 521 pj_grp_lock_t *grp_lock = key->grp_lock; 522 // Don't set grp_lock to NULL otherwise the other thread 523 // will crash. Just leave it as dangling pointer, but this 524 // should be safe 525 //key->grp_lock = NULL; 526 pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); 527 pj_grp_lock_release(grp_lock); 528 } else { 529 pj_ioqueue_unlock_key(key); 530 } 531 532 pj_lock_destroy(key->lock); 491 533 #endif 492 534 … … 621 663 if (PJ_TIME_VAL_GTE(now, h->free_time)) { 622 664 pj_list_erase(h); 665 // Don't set grp_lock to NULL otherwise the other thread 666 // will crash. Just leave it as dangling pointer, but this 667 // should be safe 668 //h->grp_lock = NULL; 623 669 pj_list_push_back(&ioqueue->free_list, h); 624 670 } … … 782 828 { 783 829 pj_fd_set_t rfdset, wfdset, xfdset; 784 int count, counter;830 int count, i, counter; 785 831 pj_ioqueue_key_t *h; 786 832 struct event … … 893 939 } 894 940 941 for (i=0; i<counter; ++i) { 942 if (event[i].key->grp_lock) 943 pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); 944 } 945 946 PJ_RACE_ME(5); 947 895 948 pj_lock_release(ioqueue->lock); 949 950 PJ_RACE_ME(5); 896 951 897 952 count = counter; … … 919 974 decrement_counter(event[counter].key); 920 975 #endif 976 977 if (event[counter].key->grp_lock) 978 pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock, 979 "ioqueue", 0); 921 980 } 922 981 -
pjproject/trunk/pjlib/src/pj/lock.c
r3553 r4359 21 21 #include <pj/os.h> 22 22 #include <pj/assert.h> 23 #include <pj/log.h> 23 24 #include <pj/pool.h> 24 25 #include <pj/string.h> 25 26 #include <pj/errno.h> 26 27 28 #define THIS_FILE "lock.c" 27 29 28 30 typedef void LOCK_OBJ; … … 197 199 } 198 200 201 202 /****************************************************************************** 203 * Group lock 204 */ 205 206 /* Individual lock in the group lock */ 207 typedef struct grp_lock_item 208 { 209 PJ_DECL_LIST_MEMBER(struct grp_lock_item); 210 int prio; 211 pj_lock_t *lock; 212 213 } grp_lock_item; 214 215 /* Destroy callbacks */ 216 typedef struct grp_destroy_callback 217 { 218 PJ_DECL_LIST_MEMBER(struct grp_destroy_callback); 219 void *comp; 220 void (*handler)(void*); 221 } grp_destroy_callback; 222 223 #if PJ_GRP_LOCK_DEBUG 224 /* Store each add_ref caller */ 225 typedef struct grp_lock_ref 226 { 227 PJ_DECL_LIST_MEMBER(struct grp_lock_ref); 228 const char *file; 229 int line; 230 } grp_lock_ref; 231 #endif 232 233 /* The group lock */ 234 struct pj_grp_lock_t 235 { 236 pj_lock_t base; 237 238 pj_pool_t *pool; 239 pj_atomic_t *ref_cnt; 240 pj_lock_t *own_lock; 241 242 pj_thread_t *owner; 243 int owner_cnt; 244 245 grp_lock_item lock_list; 246 grp_destroy_callback destroy_list; 247 248 #if PJ_GRP_LOCK_DEBUG 249 grp_lock_ref ref_list; 250 grp_lock_ref ref_free_list; 251 #endif 252 }; 253 254 255 PJ_DEF(void) pj_grp_lock_config_default(pj_grp_lock_config *cfg) 256 { 257 pj_bzero(cfg, sizeof(*cfg)); 258 } 259 260 static void grp_lock_set_owner_thread(pj_grp_lock_t *glock) 261 { 262 if (!glock->owner) { 263 glock->owner = pj_thread_this(); 264 glock->owner_cnt = 1; 265 } else { 266 pj_assert(glock->owner == pj_thread_this()); 267 glock->owner_cnt++; 268 } 269 } 270 271 static void grp_lock_unset_owner_thread(pj_grp_lock_t *glock) 272 { 273 pj_assert(glock->owner == pj_thread_this()); 274 pj_assert(glock->owner_cnt > 0); 275 if (--glock->owner_cnt <= 0) { 276 glock->owner = NULL; 277 glock->owner_cnt = 0; 278 } 279 } 280 281 static pj_status_t grp_lock_acquire(LOCK_OBJ *p) 282 { 283 pj_grp_lock_t *glock = (pj_grp_lock_t*)p; 284 grp_lock_item *lck; 285 286 pj_assert(pj_atomic_get(glock->ref_cnt) > 0); 287 288 lck = glock->lock_list.next; 289 while (lck != &glock->lock_list) { 290 pj_lock_acquire(lck->lock); 291 lck = lck->next; 292 } 293 grp_lock_set_owner_thread(glock); 294 pj_grp_lock_add_ref(glock); 295 return PJ_SUCCESS; 296 } 297 298 static pj_status_t grp_lock_tryacquire(LOCK_OBJ *p) 299 { 300 pj_grp_lock_t *glock = (pj_grp_lock_t*)p; 301 grp_lock_item *lck; 302 303 pj_assert(pj_atomic_get(glock->ref_cnt) > 0); 304 305 lck = glock->lock_list.next; 306 while (lck != &glock->lock_list) { 307 pj_status_t status = pj_lock_tryacquire(lck->lock); 308 if (status != PJ_SUCCESS) { 309 lck = lck->prev; 310 while (lck != &glock->lock_list) { 311 pj_lock_release(lck->lock); 312 lck = lck->prev; 313 } 314 return status; 315 } 316 lck = lck->next; 317 } 318 grp_lock_set_owner_thread(glock); 319 pj_grp_lock_add_ref(glock); 320 return PJ_SUCCESS; 321 } 322 323 static pj_status_t grp_lock_release(LOCK_OBJ *p) 324 { 325 pj_grp_lock_t *glock = (pj_grp_lock_t*)p; 326 grp_lock_item *lck; 327 328 grp_lock_unset_owner_thread(glock); 329 330 lck = glock->lock_list.prev; 331 while (lck != &glock->lock_list) { 332 pj_lock_release(lck->lock); 333 lck = lck->prev; 334 } 335 return pj_grp_lock_dec_ref(glock); 336 } 337 338 static pj_status_t grp_lock_destroy(LOCK_OBJ *p) 339 { 340 pj_grp_lock_t *glock = (pj_grp_lock_t*)p; 341 pj_pool_t *pool = glock->pool; 342 grp_lock_item *lck; 343 grp_destroy_callback *cb; 344 345 if (!glock->pool) { 346 /* already destroyed?! */ 347 return PJ_EINVAL; 348 } 349 350 /* Release all chained locks */ 351 lck = glock->lock_list.next; 352 while (lck != &glock->lock_list) { 353 if (lck->lock != glock->own_lock) { 354 unsigned i; 355 for (i=0; i<glock->owner_cnt; ++i) 356 pj_lock_release(lck->lock); 357 } 358 lck = lck->next; 359 } 360 361 /* Call callbacks */ 362 cb = glock->destroy_list.next; 363 while (cb != &glock->destroy_list) { 364 grp_destroy_callback *next = cb->next; 365 cb->handler(cb->comp); 366 cb = next; 367 } 368 369 pj_lock_destroy(glock->own_lock); 370 pj_atomic_destroy(glock->ref_cnt); 371 glock->pool = NULL; 372 pj_pool_release(pool); 373 374 return PJ_SUCCESS; 375 } 376 377 378 PJ_DEF(pj_status_t) pj_grp_lock_create( pj_pool_t *pool, 379 const pj_grp_lock_config *cfg, 380 pj_grp_lock_t **p_grp_lock) 381 { 382 pj_grp_lock_t *glock; 383 grp_lock_item *own_lock; 384 pj_status_t status; 385 386 PJ_ASSERT_RETURN(pool && p_grp_lock, PJ_EINVAL); 387 388 PJ_UNUSED_ARG(cfg); 389 390 pool = pj_pool_create(pool->factory, "glck%p", 512, 512, NULL); 391 if (!pool) 392 return PJ_ENOMEM; 393 394 glock = PJ_POOL_ZALLOC_T(pool, pj_grp_lock_t); 395 glock->base.lock_object = glock; 396 glock->base.acquire = &grp_lock_acquire; 397 glock->base.tryacquire = &grp_lock_tryacquire; 398 glock->base.release = &grp_lock_release; 399 glock->base.destroy = &grp_lock_destroy; 400 401 glock->pool = pool; 402 pj_list_init(&glock->lock_list); 403 pj_list_init(&glock->destroy_list); 404 #if PJ_GRP_LOCK_DEBUG 405 pj_list_init(&glock->ref_list); 406 pj_list_init(&glock->ref_free_list); 407 #endif 408 409 status = pj_atomic_create(pool, 0, &glock->ref_cnt); 410 if (status != PJ_SUCCESS) 411 goto on_error; 412 413 status = pj_lock_create_recursive_mutex(pool, pool->obj_name, 414 &glock->own_lock); 415 if (status != PJ_SUCCESS) 416 goto on_error; 417 418 own_lock = PJ_POOL_ZALLOC_T(pool, grp_lock_item); 419 own_lock->lock = glock->own_lock; 420 pj_list_push_back(&glock->lock_list, own_lock); 421 422 *p_grp_lock = glock; 423 return PJ_SUCCESS; 424 425 on_error: 426 grp_lock_destroy(glock); 427 return status; 428 } 429 430 PJ_DEF(pj_status_t) pj_grp_lock_destroy( pj_grp_lock_t *grp_lock) 431 { 432 return grp_lock_destroy(grp_lock); 433 } 434 435 PJ_DEF(pj_status_t) pj_grp_lock_acquire( pj_grp_lock_t *grp_lock) 436 { 437 return grp_lock_acquire(grp_lock); 438 } 439 440 PJ_DEF(pj_status_t) pj_grp_lock_tryacquire( pj_grp_lock_t *grp_lock) 441 { 442 return grp_lock_tryacquire(grp_lock); 443 } 444 445 PJ_DEF(pj_status_t) pj_grp_lock_release( pj_grp_lock_t *grp_lock) 446 { 447 return grp_lock_release(grp_lock); 448 } 449 450 PJ_DEF(pj_status_t) pj_grp_lock_replace( pj_grp_lock_t *old, 451 pj_grp_lock_t *new) 452 { 453 grp_destroy_callback *ocb; 454 455 /* Move handlers from old to new */ 456 ocb = old->destroy_list.next; 457 while (ocb != &old->destroy_list) { 458 grp_destroy_callback *ncb; 459 460 ncb = PJ_POOL_ALLOC_T(new->pool, grp_destroy_callback); 461 ncb->comp = ocb->comp; 462 ncb->handler = ocb->handler; 463 pj_list_push_back(&new->destroy_list, ncb); 464 465 ocb = ocb->next; 466 } 467 468 pj_list_init(&old->destroy_list); 469 470 grp_lock_destroy(old); 471 return PJ_SUCCESS; 472 } 473 474 PJ_DEF(pj_status_t) pj_grp_lock_add_handler( pj_grp_lock_t *glock, 475 pj_pool_t *pool, 476 void *comp, 477 void (*destroy)(void *comp)) 478 { 479 grp_destroy_callback *cb; 480 481 grp_lock_acquire(glock); 482 483 if (pool == NULL) 484 pool = glock->pool; 485 486 cb = PJ_POOL_ZALLOC_T(pool, grp_destroy_callback); 487 cb->comp = comp; 488 cb->handler = destroy; 489 pj_list_push_back(&glock->destroy_list, cb); 490 491 grp_lock_release(glock); 492 return PJ_SUCCESS; 493 } 494 495 PJ_DEF(pj_status_t) pj_grp_lock_del_handler( pj_grp_lock_t *glock, 496 void *comp, 497 void (*destroy)(void *comp)) 498 { 499 grp_destroy_callback *cb; 500 501 grp_lock_acquire(glock); 502 503 cb = glock->destroy_list.next; 504 while (cb != &glock->destroy_list) { 505 if (cb->comp == comp && cb->handler == destroy) 506 break; 507 cb = cb->next; 508 } 509 510 if (cb != &glock->destroy_list) 511 pj_list_erase(cb); 512 513 grp_lock_release(glock); 514 return PJ_SUCCESS; 515 } 516 517 static pj_status_t grp_lock_add_ref(pj_grp_lock_t *glock) 518 { 519 pj_atomic_inc(glock->ref_cnt); 520 return PJ_SUCCESS; 521 } 522 523 static pj_status_t grp_lock_dec_ref(pj_grp_lock_t *glock) 524 { 525 int cnt; /* for debugging */ 526 if ((cnt=pj_atomic_dec_and_get(glock->ref_cnt)) == 0) { 527 grp_lock_destroy(glock); 528 return PJ_EGONE; 529 } 530 pj_assert(cnt > 0); 531 pj_grp_lock_dump(glock); 532 return PJ_SUCCESS; 533 } 534 535 #if PJ_GRP_LOCK_DEBUG 536 PJ_DEF(pj_status_t) pj_grp_lock_add_ref_dbg(pj_grp_lock_t *glock, 537 const char *file, 538 int line) 539 { 540 grp_lock_ref *ref; 541 pj_status_t status; 542 543 pj_enter_critical_section(); 544 if (!pj_list_empty(&glock->ref_free_list)) { 545 ref = glock->ref_free_list.next; 546 pj_list_erase(ref); 547 } else { 548 ref = PJ_POOL_ALLOC_T(glock->pool, grp_lock_ref); 549 } 550 551 ref->file = file; 552 ref->line = line; 553 pj_list_push_back(&glock->ref_list, ref); 554 555 pj_leave_critical_section(); 556 557 status = grp_lock_add_ref(glock); 558 559 if (status != PJ_SUCCESS) { 560 pj_enter_critical_section(); 561 pj_list_erase(ref); 562 pj_list_push_back(&glock->ref_free_list, ref); 563 pj_leave_critical_section(); 564 } 565 566 return status; 567 } 568 569 PJ_DEF(pj_status_t) pj_grp_lock_dec_ref_dbg(pj_grp_lock_t *glock, 570 const char *file, 571 int line) 572 { 573 grp_lock_ref *ref; 574 575 pj_enter_critical_section(); 576 /* Find the same source file */ 577 ref = glock->ref_list.next; 578 while (ref != &glock->ref_list) { 579 if (strcmp(ref->file, file) == 0) { 580 pj_list_erase(ref); 581 pj_list_push_back(&glock->ref_free_list, ref); 582 break; 583 } 584 ref = ref->next; 585 } 586 pj_leave_critical_section(); 587 588 if (ref == &glock->ref_list) { 589 PJ_LOG(2,(THIS_FILE, "pj_grp_lock_dec_ref_dbg() could not find " 590 "matching ref for %s", file)); 591 } 592 593 return grp_lock_dec_ref(glock); 594 } 595 #else 596 PJ_DEF(pj_status_t) pj_grp_lock_add_ref(pj_grp_lock_t *glock) 597 { 598 return grp_lock_add_ref(glock); 599 } 600 601 PJ_DEF(pj_status_t) pj_grp_lock_dec_ref(pj_grp_lock_t *glock) 602 { 603 return grp_lock_dec_ref(glock); 604 } 605 #endif 606 607 PJ_DEF(int) pj_grp_lock_get_ref(pj_grp_lock_t *glock) 608 { 609 return pj_atomic_get(glock->ref_cnt); 610 } 611 612 PJ_DEF(pj_status_t) pj_grp_lock_chain_lock( pj_grp_lock_t *glock, 613 pj_lock_t *lock, 614 int pos) 615 { 616 grp_lock_item *lck, *new_lck; 617 unsigned i; 618 619 grp_lock_acquire(glock); 620 621 for (i=0; i<glock->owner_cnt; ++i) 622 pj_lock_acquire(lock); 623 624 lck = glock->lock_list.next; 625 while (lck != &glock->lock_list) { 626 if (lck->prio >= pos) 627 break; 628 lck = lck->next; 629 } 630 631 new_lck = PJ_POOL_ZALLOC_T(glock->pool, grp_lock_item); 632 new_lck->prio = pos; 633 new_lck->lock = lock; 634 pj_list_insert_before(lck, new_lck); 635 636 /* this will also release the new lock */ 637 grp_lock_release(glock); 638 return PJ_SUCCESS; 639 } 640 641 PJ_DEF(pj_status_t) pj_grp_lock_unchain_lock( pj_grp_lock_t *glock, 642 pj_lock_t *lock) 643 { 644 grp_lock_item *lck; 645 646 grp_lock_acquire(glock); 647 648 lck = glock->lock_list.next; 649 while (lck != &glock->lock_list) { 650 if (lck->lock == lock) 651 break; 652 lck = lck->next; 653 } 654 655 if (lck != &glock->lock_list) { 656 unsigned i; 657 658 pj_list_erase(lck); 659 for (i=0; i<glock->owner_cnt; ++i) 660 pj_lock_release(lck->lock); 661 } 662 663 grp_lock_release(glock); 664 return PJ_SUCCESS; 665 } 666 667 PJ_DEF(void) pj_grp_lock_dump(pj_grp_lock_t *grp_lock) 668 { 669 #if PJ_GRP_LOCK_DEBUG 670 grp_lock_ref *ref = grp_lock->ref_list.next; 671 char info_buf[1000]; 672 pj_str_t info; 673 674 info.ptr = info_buf; 675 info.slen = 0; 676 677 pj_grp_lock_acquire(grp_lock); 678 pj_enter_critical_section(); 679 680 while (ref != &grp_lock->ref_list && info.slen < sizeof(info_buf)) { 681 char *start = info.ptr + info.slen; 682 int max_len = sizeof(info_buf) - info.slen; 683 int len; 684 685 len = pj_ansi_snprintf(start, max_len, "%s:%d ", ref->file, ref->line); 686 if (len < 1 || len > max_len) { 687 len = strlen(ref->file); 688 if (len > max_len - 1) 689 len = max_len - 1; 690 691 memcpy(start, ref->file, len); 692 start[len++] = ' '; 693 } 694 695 info.slen += len; 696 697 ref = ref->next; 698 } 699 700 if (ref != &grp_lock->ref_list) { 701 int i; 702 for (i=0; i<4; ++i) 703 info_buf[sizeof(info_buf)-i-1] = '.'; 704 } 705 info.ptr[info.slen-1] = '\0'; 706 707 pj_leave_critical_section(); 708 pj_grp_lock_release(grp_lock); 709 710 PJ_LOG(4,(THIS_FILE, "Group lock %p, ref_cnt=%d. Reference holders: %s", 711 grp_lock, pj_grp_lock_get_ref(grp_lock), info.ptr)); 712 #endif 713 } -
pjproject/trunk/pjlib/src/pj/os_core_unix.c
r3999 r4359 98 98 struct pj_event_t 99 99 { 100 char obj_name[PJ_MAX_OBJ_NAME]; 100 enum event_state { 101 EV_STATE_OFF, 102 EV_STATE_SET, 103 EV_STATE_PULSED 104 } state; 105 106 pj_mutex_t mutex; 107 pthread_cond_t cond; 108 109 pj_bool_t auto_reset; 110 unsigned threads_waiting; 111 unsigned threads_to_release; 101 112 }; 102 113 #endif /* PJ_HAS_EVENT_OBJ */ … … 1701 1712 pj_event_t **ptr_event) 1702 1713 { 1703 pj_assert(!"Not supported!"); 1704 PJ_UNUSED_ARG(pool); 1705 PJ_UNUSED_ARG(name); 1706 PJ_UNUSED_ARG(manual_reset); 1707 PJ_UNUSED_ARG(initial); 1708 PJ_UNUSED_ARG(ptr_event); 1709 return PJ_EINVALIDOP; 1714 pj_event_t *event; 1715 1716 event = PJ_POOL_ALLOC_T(pool, pj_event_t); 1717 1718 init_mutex(&event->mutex, name, PJ_MUTEX_SIMPLE); 1719 pthread_cond_init(&event->cond, 0); 1720 event->auto_reset = !manual_reset; 1721 event->threads_waiting = 0; 1722 1723 if (initial) { 1724 event->state = EV_STATE_SET; 1725 event->threads_to_release = 1; 1726 } else { 1727 event->state = EV_STATE_OFF; 1728 event->threads_to_release = 0; 1729 } 1730 1731 *ptr_event = event; 1732 return PJ_SUCCESS; 1733 } 1734 1735 static void event_on_one_release(pj_event_t *event) 1736 { 1737 if (event->state == EV_STATE_SET) { 1738 if (event->auto_reset) { 1739 event->threads_to_release = 0; 1740 event->state = EV_STATE_OFF; 1741 } else { 1742 /* Manual reset remains on */ 1743 } 1744 } else { 1745 if (event->auto_reset) { 1746 /* Only release one */ 1747 event->threads_to_release = 0; 1748 event->state = EV_STATE_OFF; 1749 } else { 1750 event->threads_to_release--; 1751 pj_assert(event->threads_to_release >= 0); 1752 if (event->threads_to_release==0) 1753 event->state = EV_STATE_OFF; 1754 } 1755 } 1710 1756 } 1711 1757 … … 1715 1761 PJ_DEF(pj_status_t) pj_event_wait(pj_event_t *event) 1716 1762 { 1717 PJ_UNUSED_ARG(event); 1718 return PJ_EINVALIDOP; 1763 pthread_mutex_lock(&event->mutex.mutex); 1764 event->threads_waiting++; 1765 while (event->state == EV_STATE_OFF) 1766 pthread_cond_wait(&event->cond, &event->mutex.mutex); 1767 event->threads_waiting--; 1768 event_on_one_release(event); 1769 pthread_mutex_unlock(&event->mutex.mutex); 1770 return PJ_SUCCESS; 1719 1771 } 1720 1772 … … 1724 1776 PJ_DEF(pj_status_t) pj_event_trywait(pj_event_t *event) 1725 1777 { 1726 PJ_UNUSED_ARG(event); 1727 return PJ_EINVALIDOP; 1778 pj_status_t status; 1779 1780 pthread_mutex_lock(&event->mutex.mutex); 1781 status = event->state != EV_STATE_OFF ? PJ_SUCCESS : -1; 1782 if (status==PJ_SUCCESS) { 1783 event_on_one_release(event); 1784 } 1785 pthread_mutex_unlock(&event->mutex.mutex); 1786 1787 return status; 1728 1788 } 1729 1789 … … 1733 1793 PJ_DEF(pj_status_t) pj_event_set(pj_event_t *event) 1734 1794 { 1735 PJ_UNUSED_ARG(event); 1736 return PJ_EINVALIDOP; 1795 pthread_mutex_lock(&event->mutex.mutex); 1796 event->threads_to_release = 1; 1797 event->state = EV_STATE_SET; 1798 if (event->auto_reset) 1799 pthread_cond_signal(&event->cond); 1800 else 1801 pthread_cond_broadcast(&event->cond); 1802 pthread_mutex_unlock(&event->mutex.mutex); 1803 return PJ_SUCCESS; 1737 1804 } 1738 1805 … … 1742 1809 PJ_DEF(pj_status_t) pj_event_pulse(pj_event_t *event) 1743 1810 { 1744 PJ_UNUSED_ARG(event); 1745 return PJ_EINVALIDOP; 1811 pthread_mutex_lock(&event->mutex.mutex); 1812 if (event->threads_waiting) { 1813 event->threads_to_release = event->auto_reset ? 1 : 1814 event->threads_waiting; 1815 event->state = EV_STATE_PULSED; 1816 if (event->threads_to_release==1) 1817 pthread_cond_signal(&event->cond); 1818 else 1819 pthread_cond_broadcast(&event->cond); 1820 } 1821 pthread_mutex_unlock(&event->mutex.mutex); 1822 return PJ_SUCCESS; 1746 1823 } 1747 1824 … … 1751 1828 PJ_DEF(pj_status_t) pj_event_reset(pj_event_t *event) 1752 1829 { 1753 PJ_UNUSED_ARG(event); 1754 return PJ_EINVALIDOP; 1830 pthread_mutex_lock(&event->mutex.mutex); 1831 event->state = EV_STATE_OFF; 1832 event->threads_to_release = 0; 1833 pthread_mutex_unlock(&event->mutex.mutex); 1834 return PJ_SUCCESS; 1755 1835 } 1756 1836 … … 1760 1840 PJ_DEF(pj_status_t) pj_event_destroy(pj_event_t *event) 1761 1841 { 1762 PJ_UNUSED_ARG(event); 1763 return PJ_EINVALIDOP; 1842 pj_mutex_destroy(&event->mutex); 1843 pthread_cond_destroy(&event->cond); 1844 return PJ_SUCCESS; 1764 1845 } 1765 1846 -
pjproject/trunk/pjlib/src/pj/timer.c
r4281 r4359 36 36 #include <pj/lock.h> 37 37 #include <pj/log.h> 38 #include <pj/rand.h> 38 39 39 40 #define THIS_FILE "timer.c" … … 452 453 entry->user_data = user_data; 453 454 entry->cb = cb; 455 entry->_grp_lock = NULL; 454 456 455 457 return entry; 456 458 } 459 460 #if PJ_TIMER_DEBUG 461 static pj_status_t schedule_w_grp_lock_dbg(pj_timer_heap_t *ht, 462 pj_timer_entry *entry, 463 const pj_time_val *delay, 464 pj_bool_t set_id, 465 int id_val, 466 pj_grp_lock_t *grp_lock, 467 const char *src_file, 468 int src_line) 469 #else 470 static pj_status_t schedule_w_grp_lock(pj_timer_heap_t *ht, 471 pj_timer_entry *entry, 472 const pj_time_val *delay, 473 pj_bool_t set_id, 474 int id_val, 475 pj_grp_lock_t *grp_lock) 476 #endif 477 { 478 pj_status_t status; 479 pj_time_val expires; 480 481 PJ_ASSERT_RETURN(ht && entry && delay, PJ_EINVAL); 482 PJ_ASSERT_RETURN(entry->cb != NULL, PJ_EINVAL); 483 484 /* Prevent same entry from being scheduled more than once */ 485 PJ_ASSERT_RETURN(entry->_timer_id < 1, PJ_EINVALIDOP); 486 487 #if PJ_TIMER_DEBUG 488 entry->src_file = src_file; 489 entry->src_line = src_line; 490 #endif 491 pj_gettickcount(&expires); 492 PJ_TIME_VAL_ADD(expires, *delay); 493 494 lock_timer_heap(ht); 495 status = schedule_entry(ht, entry, &expires); 496 if (status == PJ_SUCCESS) { 497 if (set_id) 498 entry->id = id_val; 499 entry->_grp_lock = grp_lock; 500 if (entry->_grp_lock) { 501 pj_grp_lock_add_ref(entry->_grp_lock); 502 } 503 } 504 unlock_timer_heap(ht); 505 506 return status; 507 } 508 457 509 458 510 #if PJ_TIMER_DEBUG … … 462 514 const char *src_file, 463 515 int src_line) 516 { 517 return schedule_w_grp_lock_dbg(ht, entry, delay, PJ_FALSE, 1, NULL, 518 src_file, src_line); 519 } 520 521 PJ_DEF(pj_status_t) pj_timer_heap_schedule_w_grp_lock_dbg( 522 pj_timer_heap_t *ht, 523 pj_timer_entry *entry, 524 const pj_time_val *delay, 525 int id_val, 526 pj_grp_lock_t *grp_lock, 527 const char *src_file, 528 int src_line) 529 { 530 return schedule_w_grp_lock_dbg(ht, entry, delay, PJ_TRUE, id_val, 531 grp_lock, src_file, src_line); 532 } 533 464 534 #else 465 535 PJ_DEF(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht, 466 pj_timer_entry *entry, 467 const pj_time_val *delay) 536 pj_timer_entry *entry, 537 const pj_time_val *delay) 538 { 539 return schedule_w_grp_lock(ht, entry, delay, PJ_FALSE, 1, NULL); 540 } 541 542 PJ_DEF(pj_status_t) pj_timer_heap_schedule_w_grp_lock(pj_timer_heap_t *ht, 543 pj_timer_entry *entry, 544 const pj_time_val *delay, 545 int id_val, 546 pj_grp_lock_t *grp_lock) 547 { 548 return schedule_w_grp_lock(ht, entry, delay, PJ_TRUE, id_val, grp_lock); 549 } 468 550 #endif 469 { 470 pj_status_t status; 471 pj_time_val expires; 472 473 PJ_ASSERT_RETURN(ht && entry && delay, PJ_EINVAL); 474 PJ_ASSERT_RETURN(entry->cb != NULL, PJ_EINVAL); 475 476 /* Prevent same entry from being scheduled more than once */ 477 PJ_ASSERT_RETURN(entry->_timer_id < 1, PJ_EINVALIDOP); 478 479 #if PJ_TIMER_DEBUG 480 entry->src_file = src_file; 481 entry->src_line = src_line; 482 #endif 483 pj_gettickcount(&expires); 484 PJ_TIME_VAL_ADD(expires, *delay); 485 551 552 static int cancel_timer(pj_timer_heap_t *ht, 553 pj_timer_entry *entry, 554 pj_bool_t set_id, 555 int id_val) 556 { 557 int count; 558 559 PJ_ASSERT_RETURN(ht && entry, PJ_EINVAL); 560 486 561 lock_timer_heap(ht); 487 status = schedule_entry(ht, entry, &expires); 562 count = cancel(ht, entry, 1); 563 if (set_id) { 564 entry->id = id_val; 565 } 566 if (entry->_grp_lock) { 567 pj_grp_lock_t *grp_lock = entry->_grp_lock; 568 entry->_grp_lock = NULL; 569 pj_grp_lock_dec_ref(grp_lock); 570 } 488 571 unlock_timer_heap(ht); 489 572 490 return status;573 return count; 491 574 } 492 575 … … 494 577 pj_timer_entry *entry) 495 578 { 496 int count; 497 498 PJ_ASSERT_RETURN(ht && entry, PJ_EINVAL); 499 500 lock_timer_heap(ht); 501 count = cancel(ht, entry, 1); 502 unlock_timer_heap(ht); 503 504 return count; 579 return cancel_timer(ht, entry, PJ_FALSE, 0); 580 } 581 582 PJ_DEF(int) pj_timer_heap_cancel_if_active(pj_timer_heap_t *ht, 583 pj_timer_entry *entry, 584 int id_val) 585 { 586 return cancel_timer(ht, entry, PJ_TRUE, id_val); 505 587 } 506 588 … … 528 610 { 529 611 pj_timer_entry *node = remove_node(ht, 0); 612 pj_grp_lock_t *grp_lock; 613 530 614 ++count; 531 615 616 grp_lock = node->_grp_lock; 617 node->_grp_lock = NULL; 618 532 619 unlock_timer_heap(ht); 620 621 PJ_RACE_ME(5); 622 533 623 if (node->cb) 534 624 (*node->cb)(ht, node); 625 626 if (grp_lock) 627 pj_grp_lock_dec_ref(grp_lock); 628 535 629 lock_timer_heap(ht); 536 630 }
Note: See TracChangeset
for help on using the changeset viewer.