Changeset 14
- Timestamp:
- Nov 6, 2005 4:50:38 PM (19 years ago)
- Location:
- pjproject/main/pjlib
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/main/pjlib/build/Makefile
r2 r14 61 61 # Gather all flags. 62 62 # 63 export _CFLAGS := -O2 $(CC_CFLAGS) $(OS_CFLAGS) $(HOST_CFLAGS) $(M_CFLAGS) \63 export _CFLAGS := -O2 -g $(CC_CFLAGS) $(OS_CFLAGS) $(HOST_CFLAGS) $(M_CFLAGS) \ 64 64 $(CFLAGS) $(CC_INC)../include 65 65 export _CXXFLAGS:= $(_CFLAGS) $(CC_CXXFLAGS) $(OS_CXXFLAGS) $(M_CXXFLAGS) \ … … 99 99 100 100 export CC_OUT CC AR RANLIB HOST_MV HOST_RM HOST_RMDIR HOST_MKDIR OBJEXT LD LDOUT 101 102 101 ############################################################################### 103 102 # Main entry … … 125 124 dep: depend 126 125 127 pjlib: 126 pjlib: ../include/pj/config_site.h 128 127 $(MAKE) -f $(RULES_MAK) APP=PJLIB app=pjlib $(PJLIB_LIB) 129 128 129 ../include/pj/config_site.h: 130 touch ../include/pj/config_site.h 131 130 132 pjlib-test: 131 133 $(MAKE) -f $(RULES_MAK) APP=TEST app=pjlib-test $(TEST_EXE) -
pjproject/main/pjlib/build/os-linux.mak
r2 r14 15 15 pool_policy_malloc.o sock_bsd.o sock_select.o 16 16 17 export PJLIB_OBJS += ioqueue_select.o18 #export PJLIB_OBJS += ioqueue_epoll.o17 #export PJLIB_OBJS += ioqueue_select.o 18 export PJLIB_OBJS += ioqueue_epoll.o 19 19 20 20 # -
pjproject/main/pjlib/src/pj/ioqueue_common_abs.c
r13 r14 1 1 /* $Id$ */ 2 2 3 #include <pj/ioqueue.h> 4 #include <pj/errno.h> 5 #include <pj/list.h> 6 #include <pj/sock.h> 7 #include <pj/lock.h> 8 #include <pj/assert.h> 9 #include <pj/string.h> 10 3 /* 4 * ioqueue_common_abs.c 5 * 6 * This contains common functionalities to emulate proactor pattern with 7 * various event dispatching mechanisms (e.g. select, epoll). 8 * 9 * This file will be included by the appropriate ioqueue implementation. 10 * This file is NOT supposed to be compiled as stand-alone source. 11 */ 11 12 12 13 static void ioqueue_init( pj_ioqueue_t *ioqueue ) … … 326 327 327 328 /* Call callback. */ 328 if (h->cb.on_accept_complete) 329 if (h->cb.on_accept_complete) { 329 330 (*h->cb.on_accept_complete)(h, 330 331 (pj_ioqueue_op_key_t*)accept_op, 331 332 *accept_op->accept_fd, rc); 333 } 332 334 333 335 } … … 337 339 struct read_operation *read_op; 338 340 pj_ssize_t bytes_read; 339 340 pj_assert(!pj_list_empty(&h->read_list));341 341 342 342 /* Get one pending read operation from the list. */ … … 378 378 // &bytes_read, NULL); 379 379 # elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) 380 bytes_read = read(h->fd, h->rd_buf, bytes_read);380 bytes_read = read(h->fd, read_op->buf, bytes_read); 381 381 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 382 382 # elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 383 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);383 bytes_read = sys_read(h->fd, read_op->buf, bytes_read); 384 384 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; 385 385 # else -
pjproject/main/pjlib/src/pj/ioqueue_common_abs.h
r13 r14 15 15 */ 16 16 #if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) 17 # error " Error reporting must be enabled for this functionto work!"17 # error "Proper error reporting must be enabled for ioqueue to work!" 18 18 #endif 19 19 … … 106 106 pj_sock_t fd, 107 107 enum ioqueue_event_type event_type); 108 -
pjproject/main/pjlib/src/pj/ioqueue_epoll.c
r4 r14 1 1 /* $Id$ 2 *3 2 */ 4 3 /* … … 31 30 # define epoll_data data.ptr 32 31 # define epoll_data_type void* 33 # define ioctl_val_type unsigned long *32 # define ioctl_val_type unsigned long 34 33 # define getsockopt_val_ptr int* 35 34 # define os_getsockopt getsockopt … … 127 126 #define THIS_FILE "ioq_epoll" 128 127 129 #define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \130 (op & PJ_IOQUEUE_OP_RECV) || \131 (op & PJ_IOQUEUE_OP_RECV_FROM))132 #define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \133 (op & PJ_IOQUEUE_OP_SEND) || \134 (op & PJ_IOQUEUE_OP_SEND_TO))135 136 137 #if PJ_HAS_TCP138 # define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT)139 # define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT)140 #else141 # define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0142 # define PJ_IOQUEUE_IS_CONNECT_OP(op) 0143 #endif144 145 146 128 //#define TRACE_(expr) PJ_LOG(3,expr) 147 129 #define TRACE_(expr) 148 130 131 /* 132 * Include common ioqueue abstraction. 133 */ 134 #include "ioqueue_common_abs.h" 149 135 150 136 /* … … 153 139 struct pj_ioqueue_key_t 154 140 { 155 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) 156 pj_sock_t fd; 157 pj_ioqueue_operation_e op; 158 void *user_data; 159 pj_ioqueue_callback cb; 160 161 void *rd_buf; 162 unsigned rd_flags; 163 pj_size_t rd_buflen; 164 void *wr_buf; 165 pj_size_t wr_buflen; 166 167 pj_sockaddr_t *rmt_addr; 168 int *rmt_addrlen; 169 170 pj_sockaddr_t *local_addr; 171 int *local_addrlen; 172 173 pj_sock_t *accept_fd; 141 DECLARE_COMMON_KEY 174 142 }; 175 143 … … 179 147 struct pj_ioqueue_t 180 148 { 181 pj_lock_t *lock;182 pj_bool_t auto_delete_lock; 149 DECLARE_COMMON_IOQUEUE 150 183 151 unsigned max, count; 184 152 pj_ioqueue_key_t hlist; … … 186 154 }; 187 155 156 /* Include implementation for common abstraction after we declare 157 * pj_ioqueue_key_t and pj_ioqueue_t. 158 */ 159 #include "ioqueue_common_abs.c" 160 188 161 /* 189 162 * pj_ioqueue_create() … … 193 166 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 194 167 pj_size_t max_fd, 195 int max_threads,196 168 pj_ioqueue_t **p_ioqueue) 197 169 { 198 pj_ioqueue_t *ioque ;170 pj_ioqueue_t *ioqueue; 199 171 pj_status_t rc; 200 201 PJ_UNUSED_ARG(max_threads); 202 203 if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { 204 pj_assert(!"max_fd too large"); 205 return PJ_EINVAL; 206 } 207 208 ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 209 ioque->max = max_fd; 210 ioque->count = 0; 211 pj_list_init(&ioque->hlist); 212 213 rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock); 172 pj_lock_t *lock; 173 174 /* Check that arguments are valid. */ 175 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 176 max_fd > 0, PJ_EINVAL); 177 178 /* Check that size of pj_ioqueue_op_key_t is sufficient */ 179 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= 180 sizeof(union operation_key), PJ_EBUG); 181 182 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); 183 184 ioqueue_init(ioqueue); 185 186 ioqueue->max = max_fd; 187 ioqueue->count = 0; 188 pj_list_init(&ioqueue->hlist); 189 190 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); 214 191 if (rc != PJ_SUCCESS) 215 192 return rc; 216 193 217 ioque->auto_delete_lock = PJ_TRUE; 218 ioque->epfd = os_epoll_create(max_fd); 219 if (ioque->epfd < 0) { 194 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); 195 if (rc != PJ_SUCCESS) 196 return rc; 197 198 ioqueue->epfd = os_epoll_create(max_fd); 199 if (ioqueue->epfd < 0) { 200 ioqueue_destroy(ioqueue); 220 201 return PJ_RETURN_OS_ERROR(pj_get_native_os_error()); 221 202 } 222 203 223 PJ_LOG(4, ("pjlib", " select() I/O Queue created (%p)", ioque));224 225 *p_ioqueue = ioque ;204 PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue)); 205 206 *p_ioqueue = ioqueue; 226 207 return PJ_SUCCESS; 227 208 } … … 232 213 * Destroy ioqueue. 233 214 */ 234 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque) 235 { 236 PJ_ASSERT_RETURN(ioque, PJ_EINVAL); 237 PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP); 238 239 pj_lock_acquire(ioque->lock); 240 os_close(ioque->epfd); 241 ioque->epfd = 0; 242 if (ioque->auto_delete_lock) 243 pj_lock_destroy(ioque->lock); 244 245 return PJ_SUCCESS; 246 } 247 248 /* 249 * pj_ioqueue_set_lock() 250 */ 251 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, 252 pj_lock_t *lock, 253 pj_bool_t auto_delete ) 254 { 255 PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); 256 257 if (ioque->auto_delete_lock) { 258 pj_lock_destroy(ioque->lock); 259 } 260 261 ioque->lock = lock; 262 ioque->auto_delete_lock = auto_delete; 263 264 return PJ_SUCCESS; 265 } 266 215 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) 216 { 217 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); 218 PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP); 219 220 pj_lock_acquire(ioqueue->lock); 221 os_close(ioqueue->epfd); 222 ioqueue->epfd = 0; 223 return ioqueue_destroy(ioqueue); 224 } 267 225 268 226 /* … … 272 230 */ 273 231 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 274 pj_ioqueue_t *ioque ,232 pj_ioqueue_t *ioqueue, 275 233 pj_sock_t sock, 276 234 void *user_data, … … 284 242 pj_status_t rc = PJ_SUCCESS; 285 243 286 PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&244 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && 287 245 cb && p_key, PJ_EINVAL); 288 246 289 pj_lock_acquire(ioque ->lock);290 291 if (ioque ->count >= ioque->max) {247 pj_lock_acquire(ioqueue->lock); 248 249 if (ioqueue->count >= ioqueue->max) { 292 250 rc = PJ_ETOOMANY; 293 251 TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files")); … … 306 264 /* Create key. */ 307 265 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 308 key->fd = sock; 309 key->user_data = user_data; 310 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 266 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 267 if (rc != PJ_SUCCESS) { 268 key = NULL; 269 goto on_return; 270 } 311 271 312 272 /* os_epoll_ctl. */ 313 273 ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; 314 274 ev.epoll_data = (epoll_data_type)key; 315 status = os_epoll_ctl(ioque ->epfd, EPOLL_CTL_ADD, sock, &ev);275 status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev); 316 276 if (status < 0) { 317 277 rc = pj_get_os_error(); 278 key = NULL; 318 279 TRACE_((THIS_FILE, 319 280 "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", … … 323 284 324 285 /* Register */ 325 pj_list_insert_before(&ioque ->hlist, key);326 ++ioque ->count;286 pj_list_insert_before(&ioqueue->hlist, key); 287 ++ioqueue->count; 327 288 328 289 on_return: 329 290 *p_key = key; 330 pj_lock_release(ioque ->lock);291 pj_lock_release(ioqueue->lock); 331 292 332 293 return rc; … … 338 299 * Unregister handle from ioqueue. 339 300 */ 340 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_ t *ioque,341 pj_ioqueue_key_t *key) 342 { 301 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) 302 { 303 pj_ioqueue_t *ioqueue; 343 304 struct epoll_event ev; 344 305 int status; 345 306 346 PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); 347 348 pj_lock_acquire(ioque->lock); 349 350 pj_assert(ioque->count > 0); 351 --ioque->count; 307 PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL); 308 309 ioqueue = key->ioqueue; 310 pj_lock_acquire(ioqueue->lock); 311 312 pj_assert(ioqueue->count > 0); 313 --ioqueue->count; 352 314 pj_list_erase(key); 353 315 354 316 ev.events = 0; 355 317 ev.epoll_data = (epoll_data_type)key; 356 status = os_epoll_ctl( ioque ->epfd, EPOLL_CTL_DEL, key->fd, &ev);318 status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev); 357 319 if (status != 0) { 358 320 pj_status_t rc = pj_get_os_error(); 359 pj_lock_release(ioque ->lock);321 pj_lock_release(ioqueue->lock); 360 322 return rc; 361 323 } 362 324 363 pj_lock_release(ioque->lock); 325 pj_lock_release(ioqueue->lock); 326 327 /* Destroy the key. */ 328 ioqueue_destroy_key(key); 329 364 330 return PJ_SUCCESS; 365 331 } 366 332 367 /* 368 * pj_ioqueue_get_user_data() 369 * 370 * Obtain value associated with a key. 371 */ 372 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) 373 { 374 PJ_ASSERT_RETURN(key != NULL, NULL); 375 return key->user_data; 376 } 377 333 /* ioqueue_remove_from_set() 334 * This function is called from ioqueue_dispatch_event() to instruct 335 * the ioqueue to remove the specified descriptor from ioqueue's descriptor 336 * set for the specified event. 337 */ 338 static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, 339 pj_sock_t fd, 340 enum ioqueue_event_type event_type) 341 { 342 } 343 344 /* 345 * ioqueue_add_to_set() 346 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc 347 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor 348 * set for the specified event. 349 */ 350 static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, 351 pj_sock_t fd, 352 enum ioqueue_event_type event_type ) 353 { 354 } 378 355 379 356 /* … … 381 358 * 382 359 */ 383 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque , const pj_time_val *timeout)360 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 384 361 { 385 362 int i, count, processed; 386 struct epoll_event events[ 16];363 struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 387 364 int msec; 365 struct queue { 366 pj_ioqueue_key_t *key; 367 enum ioqueue_event_type event_type; 368 } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 388 369 389 370 PJ_CHECK_STACK(); … … 391 372 msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000; 392 373 393 count = os_epoll_wait( ioque ->epfd, events, PJ_ARRAY_SIZE(events), msec);374 count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec); 394 375 if (count <= 0) 395 376 return count; 396 377 397 378 /* Lock ioqueue. */ 398 pj_lock_acquire(ioque->lock); 399 400 processed = 0; 401 402 for (i=0; i<count; ++i) { 379 pj_lock_acquire(ioqueue->lock); 380 381 for (processed=0, i=0; i<count; ++i) { 403 382 pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) 404 383 events[i].epoll_data; 405 pj_status_t rc;406 384 407 385 /* 408 * Check for completion of read operations.386 * Check readability. 409 387 */ 410 if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) { 411 pj_ssize_t bytes_read = h->rd_buflen; 412 413 if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { 414 rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0, 415 h->rmt_addr, h->rmt_addrlen); 416 } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { 417 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); 418 } else { 419 bytes_read = os_read( h->fd, h->rd_buf, bytes_read); 420 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); 421 } 422 423 if (rc != PJ_SUCCESS) { 424 bytes_read = -rc; 425 } 426 427 h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | 428 PJ_IOQUEUE_OP_RECV_FROM); 429 430 /* Call callback. */ 431 (*h->cb.on_read_complete)(h, bytes_read); 432 388 if ((events[i].events & EPOLLIN) && 389 (key_has_pending_read(h) || key_has_pending_accept(h))) { 390 queue[processed].key = h; 391 queue[processed].event_type = READABLE_EVENT; 433 392 ++processed; 434 393 } 394 435 395 /* 436 * Check for completion of accept() operation.396 * Check for writeability. 437 397 */ 438 else if ((events[i].events & EPOLLIN) && 439 (h->op & PJ_IOQUEUE_OP_ACCEPT)) 440 { 441 /* accept() must be the only operation specified on 442 * server socket 443 */ 444 pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT); 445 446 rc = pj_sock_accept( h->fd, h->accept_fd, 447 h->rmt_addr, h->rmt_addrlen); 448 if (rc==PJ_SUCCESS && h->local_addr) { 449 rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, 450 h->local_addrlen); 451 } 452 453 h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); 454 455 /* Call callback. */ 456 (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); 457 398 if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) { 399 queue[processed].key = h; 400 queue[processed].event_type = WRITEABLE_EVENT; 458 401 ++processed; 459 402 } 460 403 461 /*462 * Check for completion of write operations.463 */464 if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) {465 /* Completion of write(), send(), or sendto() operation. */466 467 /* Clear operation. */468 h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND |469 PJ_IOQUEUE_OP_SEND_TO);470 471 /* Call callback. */472 /* All data must have been sent? */473 (*h->cb.on_write_complete)(h, h->wr_buflen);474 475 ++processed;476 }477 404 #if PJ_HAS_TCP 478 405 /* 479 406 * Check for completion of connect() operation. 480 407 */ 481 else if ((events[i].events & EPOLLOUT) && 482 (h->op & PJ_IOQUEUE_OP_CONNECT)) 483 { 484 /* Completion of connect() operation */ 485 pj_ssize_t bytes_transfered; 486 487 /* from connect(2): 488 * On Linux, use getsockopt to read the SO_ERROR option at 489 * level SOL_SOCKET to determine whether connect() completed 490 * successfully (if SO_ERROR is zero). 491 */ 492 int value; 493 socklen_t vallen = sizeof(value); 494 int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 495 (getsockopt_val_ptr)&value, &vallen); 496 if (gs_rc != 0) { 497 /* Argh!! What to do now??? 498 * Just indicate that the socket is connected. The 499 * application will get error as soon as it tries to use 500 * the socket to send/receive. 501 */ 502 bytes_transfered = 0; 503 } else { 504 bytes_transfered = value; 505 } 506 507 /* Clear operation. */ 508 h->op &= (~PJ_IOQUEUE_OP_CONNECT); 509 510 /* Call callback. */ 511 (*h->cb.on_connect_complete)(h, bytes_transfered); 512 408 if ((events[i].events & EPOLLOUT) && (h->connecting)) { 409 queue[processed].key = h; 410 queue[processed].event_type = WRITEABLE_EVENT; 513 411 ++processed; 514 412 } … … 518 416 * Check for error condition. 519 417 */ 520 if (events[i].events & EPOLLERR) { 521 if (h->op & PJ_IOQUEUE_OP_CONNECT) { 522 h->op &= ~PJ_IOQUEUE_OP_CONNECT; 523 524 /* Call callback. */ 525 (*h->cb.on_connect_complete)(h, -1); 526 527 ++processed; 528 } 529 } 530 } 531 532 pj_lock_release(ioque->lock); 418 if (events[i].events & EPOLLERR && (h->connecting)) { 419 queue[processed].key = h; 420 queue[processed].event_type = EXCEPTION_EVENT; 421 ++processed; 422 } 423 } 424 pj_lock_release(ioqueue->lock); 425 426 /* Now process the events. */ 427 for (i=0; i<processed; ++i) { 428 switch (queue[i].event_type) { 429 case READABLE_EVENT: 430 ioqueue_dispatch_read_event(ioqueue, queue[i].key); 431 break; 432 case WRITEABLE_EVENT: 433 ioqueue_dispatch_write_event(ioqueue, queue[i].key); 434 break; 435 case EXCEPTION_EVENT: 436 ioqueue_dispatch_exception_event(ioqueue, queue[i].key); 437 break; 438 case NO_EVENT: 439 pj_assert(!"Invalid event!"); 440 break; 441 } 442 } 533 443 534 444 return processed; 535 445 } 536 446 537 /*538 * pj_ioqueue_read()539 *540 * Start asynchronous read from the descriptor.541 */542 PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,543 pj_ioqueue_key_t *key,544 void *buffer,545 pj_size_t buflen)546 {547 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);548 PJ_CHECK_STACK();549 550 /* For consistency with other ioqueue implementation, we would reject551 * if descriptor has already been submitted for reading before.552 */553 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&554 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&555 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),556 PJ_EBUSY);557 558 pj_lock_acquire(ioque->lock);559 560 key->op |= PJ_IOQUEUE_OP_READ;561 key->rd_flags = 0;562 key->rd_buf = buffer;563 key->rd_buflen = buflen;564 565 pj_lock_release(ioque->lock);566 return PJ_EPENDING;567 }568 569 570 /*571 * pj_ioqueue_recv()572 *573 * Start asynchronous recv() from the socket.574 */575 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,576 pj_ioqueue_key_t *key,577 void *buffer,578 pj_size_t buflen,579 unsigned flags )580 {581 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);582 PJ_CHECK_STACK();583 584 /* For consistency with other ioqueue implementation, we would reject585 * if descriptor has already been submitted for reading before.586 */587 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&588 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&589 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),590 PJ_EBUSY);591 592 pj_lock_acquire(ioque->lock);593 594 key->op |= PJ_IOQUEUE_OP_RECV;595 key->rd_buf = buffer;596 key->rd_buflen = buflen;597 key->rd_flags = flags;598 599 pj_lock_release(ioque->lock);600 return PJ_EPENDING;601 }602 603 /*604 * pj_ioqueue_recvfrom()605 *606 * Start asynchronous recvfrom() from the socket.607 */608 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,609 pj_ioqueue_key_t *key,610 void *buffer,611 pj_size_t buflen,612 unsigned flags,613 pj_sockaddr_t *addr,614 int *addrlen)615 {616 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);617 PJ_CHECK_STACK();618 619 /* For consistency with other ioqueue implementation, we would reject620 * if descriptor has already been submitted for reading before.621 */622 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&623 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&624 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),625 PJ_EBUSY);626 627 pj_lock_acquire(ioque->lock);628 629 key->op |= PJ_IOQUEUE_OP_RECV_FROM;630 key->rd_buf = buffer;631 key->rd_buflen = buflen;632 key->rd_flags = flags;633 key->rmt_addr = addr;634 key->rmt_addrlen = addrlen;635 636 pj_lock_release(ioque->lock);637 return PJ_EPENDING;638 }639 640 /*641 * pj_ioqueue_write()642 *643 * Start asynchronous write() to the descriptor.644 */645 PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,646 pj_ioqueue_key_t *key,647 const void *data,648 pj_size_t datalen)649 {650 pj_status_t rc;651 pj_ssize_t sent;652 653 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);654 PJ_CHECK_STACK();655 656 /* For consistency with other ioqueue implementation, we would reject657 * if descriptor has already been submitted for writing before.658 */659 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&660 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&661 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),662 PJ_EBUSY);663 664 sent = datalen;665 /* sent would be -1 after pj_sock_send() if it returns error. */666 rc = pj_sock_send(key->fd, data, &sent, 0);667 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {668 return rc;669 }670 671 pj_lock_acquire(ioque->lock);672 673 key->op |= PJ_IOQUEUE_OP_WRITE;674 key->wr_buf = NULL;675 key->wr_buflen = datalen;676 677 pj_lock_release(ioque->lock);678 679 return PJ_EPENDING;680 }681 682 /*683 * pj_ioqueue_send()684 *685 * Start asynchronous send() to the descriptor.686 */687 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,688 pj_ioqueue_key_t *key,689 const void *data,690 pj_size_t datalen,691 unsigned flags)692 {693 pj_status_t rc;694 pj_ssize_t sent;695 696 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);697 PJ_CHECK_STACK();698 699 /* For consistency with other ioqueue implementation, we would reject700 * if descriptor has already been submitted for writing before.701 */702 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&703 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&704 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),705 PJ_EBUSY);706 707 sent = datalen;708 /* sent would be -1 after pj_sock_send() if it returns error. */709 rc = pj_sock_send(key->fd, data, &sent, flags);710 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {711 return rc;712 }713 714 pj_lock_acquire(ioque->lock);715 716 key->op |= PJ_IOQUEUE_OP_SEND;717 key->wr_buf = NULL;718 key->wr_buflen = datalen;719 720 pj_lock_release(ioque->lock);721 722 return PJ_EPENDING;723 }724 725 726 /*727 * pj_ioqueue_sendto()728 *729 * Start asynchronous write() to the descriptor.730 */731 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,732 pj_ioqueue_key_t *key,733 const void *data,734 pj_size_t datalen,735 unsigned flags,736 const pj_sockaddr_t *addr,737 int addrlen)738 {739 pj_status_t rc;740 pj_ssize_t sent;741 742 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);743 PJ_CHECK_STACK();744 745 /* For consistency with other ioqueue implementation, we would reject746 * if descriptor has already been submitted for writing before.747 */748 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&749 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&750 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),751 PJ_EBUSY);752 753 sent = datalen;754 /* sent would be -1 after pj_sock_sendto() if it returns error. */755 rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);756 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {757 return rc;758 }759 760 pj_lock_acquire(ioque->lock);761 762 key->op |= PJ_IOQUEUE_OP_SEND_TO;763 key->wr_buf = NULL;764 key->wr_buflen = datalen;765 766 pj_lock_release(ioque->lock);767 return PJ_EPENDING;768 }769 770 #if PJ_HAS_TCP771 /*772 * Initiate overlapped accept() operation.773 */774 PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,775 pj_ioqueue_key_t *key,776 pj_sock_t *new_sock,777 pj_sockaddr_t *local,778 pj_sockaddr_t *remote,779 int *addrlen)780 {781 /* check parameters. All must be specified! */782 pj_assert(ioqueue && key && new_sock);783 784 /* Server socket must have no other operation! */785 pj_assert(key->op == 0);786 787 pj_lock_acquire(ioqueue->lock);788 789 key->op = PJ_IOQUEUE_OP_ACCEPT;790 key->accept_fd = new_sock;791 key->rmt_addr = remote;792 key->rmt_addrlen = addrlen;793 key->local_addr = local;794 key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */795 796 pj_lock_release(ioqueue->lock);797 return PJ_EPENDING;798 }799 800 /*801 * Initiate overlapped connect() operation (well, it's non-blocking actually,802 * since there's no overlapped version of connect()).803 */804 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,805 pj_ioqueue_key_t *key,806 const pj_sockaddr_t *addr,807 int addrlen )808 {809 pj_status_t rc;810 811 /* check parameters. All must be specified! */812 PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);813 814 /* Connecting socket must have no other operation! */815 PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);816 817 rc = pj_sock_connect(key->fd, addr, addrlen);818 if (rc == PJ_SUCCESS) {819 /* Connected! */820 return PJ_SUCCESS;821 } else {822 if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) ||823 rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))824 {825 /* Pending! */826 pj_lock_acquire(ioqueue->lock);827 key->op = PJ_IOQUEUE_OP_CONNECT;828 pj_lock_release(ioqueue->lock);829 return PJ_EPENDING;830 } else {831 /* Error! */832 return rc;833 }834 }835 }836 #endif /* PJ_HAS_TCP */837 -
pjproject/main/pjlib/src/pj/ioqueue_select.c
r12 r14 206 206 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); 207 207 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); 208 if (rc != PJ_SUCCESS) 209 return rc; 208 if (rc != PJ_SUCCESS) { 209 key = NULL; 210 goto on_return; 211 } 210 212 211 213 /* Register */ … … 381 383 struct event 382 384 { 383 pj_ioqueue_key_t 384 enum event_typeevent_type;385 pj_ioqueue_key_t *key; 386 enum ioqueue_event_type event_type; 385 387 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; 386 388 … … 453 455 event[counter].key = h; 454 456 event[counter].event_type = READABLE_EVENT; 455 ++counter; } 457 ++counter; 458 } 456 459 457 460 #if PJ_HAS_TCP … … 483 486 break; 484 487 case NO_EVENT: 485 default:486 488 pj_assert(!"Invalid event!"); 487 489 break; -
pjproject/main/pjlib/src/pj/os_core_unix.c
r5 r14 716 716 if (type == PJ_MUTEX_SIMPLE) { 717 717 #if defined(PJ_LINUX) && PJ_LINUX!=0 718 extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int); 718 719 rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_FAST_NP); 719 720 #else … … 722 723 } else { 723 724 #if defined(PJ_LINUX) && PJ_LINUX!=0 725 extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int); 724 726 rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP); 725 727 #else -
pjproject/main/pjlib/src/pj/sock_bsd.c
r12 r14 105 105 PJ_DEF(char*) pj_inet_ntoa(pj_in_addr inaddr) 106 106 { 107 return inet_ntoa(*(struct in_addr*)&inaddr); 107 struct in_addr addr; 108 addr.s_addr = inaddr.s_addr; 109 return inet_ntoa(addr); 108 110 } 109 111 -
pjproject/main/pjlib/src/pjlib-test/ioq_perf.c
r12 r14 77 77 bytes_read, errmsg)); 78 78 PJ_LOG(3,(THIS_FILE, 79 ".....additional info: total read=%u, total written=%u",79 ".....additional info: total read=%u, total sent=%u", 80 80 item->bytes_recv, item->bytes_sent)); 81 81 } else { … … 108 108 if (rc != last_error) { 109 109 last_error = rc; 110 app_perror("...error: read error ", rc);110 app_perror("...error: read error(1)", rc); 111 111 } else { 112 112 last_error_counter++;
Note: See TracChangeset
for help on using the changeset viewer.