Changeset 349
- Timestamp:
- Mar 22, 2006 11:49:19 AM (19 years ago)
- Location:
- pjproject/trunk/pjlib
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/build/pjlib.dsp
r313 r349 239 239 240 240 SOURCE=..\src\pj\ioqueue_winnt.c 241 242 !IF "$(CFG)" == "pjlib - Win32 Release" 243 244 !ELSEIF "$(CFG)" == "pjlib - Win32 Debug" 245 246 !ENDIF 247 241 248 # End Source File 242 249 # Begin Source File -
pjproject/trunk/pjlib/build/pjlib_test.dsp
r126 r349 129 129 130 130 SOURCE="..\src\pjlib-test\main.c" 131 132 !IF "$(CFG)" == "pjlib_test - Win32 Release"133 134 !ELSEIF "$(CFG)" == "pjlib_test - Win32 Debug"135 136 # PROP Exclude_From_Build 1137 138 !ENDIF139 140 131 # End Source File 141 132 # Begin Source File … … 147 138 148 139 SOURCE="..\src\pjlib-test\main_win32.c" 140 # PROP Exclude_From_Build 1 149 141 # End Source File 150 142 # Begin Source File -
pjproject/trunk/pjlib/include/pj/ioqueue.h
r66 r349 327 327 * Note that asynchronous connect operation will automatically be 328 328 * cancelled during the unregistration. 329 * 330 * Also note that when I/O Completion Port backend is used, application 331 * MUST close the handle immediately after unregistering the key. This is 332 * because there is no unregistering API for IOCP. The only way to 333 * unregister the handle from IOCP is to close the handle. 329 334 * 330 335 * @param key The key that was previously obtained from registration. -
pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c
r83 r349 100 100 }; 101 101 102 enum { POST_QUIT_LEN = 0xFFFFDEADUL }; 103 102 104 /* 103 105 * Structure for individual socket. … … 113 115 #endif 114 116 pj_ioqueue_callback cb; 117 pj_bool_t has_quit_signal; 115 118 }; 116 119 … … 393 396 } 394 397 395 /*396 * pj_ioqueue_unregister()397 */398 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )399 {400 PJ_ASSERT_RETURN(key, PJ_EINVAL);401 402 #if PJ_HAS_TCP403 if (key->connecting) {404 unsigned pos;405 pj_ioqueue_t *ioqueue;406 407 ioqueue = key->ioqueue;408 409 /* Erase from connecting_handles */410 pj_lock_acquire(ioqueue->lock);411 for (pos=0; pos < ioqueue->connecting_count; ++pos) {412 if (ioqueue->connecting_keys[pos] == key) {413 erase_connecting_socket(ioqueue, pos);414 break;415 }416 }417 key->connecting = 0;418 pj_lock_release(ioqueue->lock);419 }420 #endif421 if (key->hnd_type == HND_IS_FILE) {422 CloseHandle(key->hnd);423 }424 return PJ_SUCCESS;425 }426 398 427 399 /* … … 450 422 } 451 423 452 /* 453 * pj_ioqueue_poll() 454 * 455 * Poll for events. 456 */ 457 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 458 { 459 DWORD dwMsec, dwBytesTransfered, dwKey; 424 425 426 /* 427 * Internal function to poll the I/O Completion Port, execute callback, 428 * and return the key and bytes transfered of the last operation. 429 */ 430 static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, 431 pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key ) 432 { 433 DWORD dwBytesTransfered, dwKey; 460 434 generic_overlapped *pOv; 461 435 pj_ioqueue_key_t *key; 462 int connect_count;463 436 pj_ssize_t size_status = -1; 464 BOOL rcGetQueued;; 465 466 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 467 468 /* Check the connecting array. */ 469 #if PJ_HAS_TCP 470 connect_count = check_connecting(ioqueue); 471 #endif 472 473 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ 474 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; 437 BOOL rcGetQueued; 475 438 476 439 /* Poll for completion status. */ 477 rcGetQueued = GetQueuedCompletionStatus( ioqueue->iocp, &dwBytesTransfered,440 rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered, 478 441 &dwKey, (OVERLAPPED**)&pOv, 479 dw Msec);442 dwTimeout); 480 443 481 444 /* The return value is: … … 488 451 key = (pj_ioqueue_key_t*)dwKey; 489 452 size_status = dwBytesTransfered; 453 454 /* Report to caller regardless */ 455 if (p_bytes) 456 *p_bytes = size_status; 457 if (p_key) 458 *p_key = key; 459 460 /* If size_status is POST_QUIT_LEN, mark the key as quitting */ 461 if (size_status == POST_QUIT_LEN) { 462 key->has_quit_signal = 1; 463 return PJ_TRUE; 464 } 465 466 /* We shouldn't call callbacks if key is quitting. 467 * But this should have been taken care by unregister function 468 * (the unregister function should have cleared out the callbacks) 469 */ 470 471 /* Carry out the callback */ 490 472 switch (pOv->operation) { 491 473 case PJ_IOQUEUE_OP_READ: … … 523 505 break; 524 506 } 525 return connect_count+1;507 return PJ_TRUE; 526 508 } 527 509 528 510 /* No event was queued. */ 529 return connect_count; 511 return PJ_FALSE; 512 } 513 514 /* 515 * pj_ioqueue_unregister() 516 */ 517 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 518 { 519 pj_ssize_t polled_len; 520 pj_ioqueue_key_t *polled_key; 521 generic_overlapped ov; 522 BOOL rc; 523 524 PJ_ASSERT_RETURN(key, PJ_EINVAL); 525 526 #if PJ_HAS_TCP 527 if (key->connecting) { 528 unsigned pos; 529 pj_ioqueue_t *ioqueue; 530 531 ioqueue = key->ioqueue; 532 533 /* Erase from connecting_handles */ 534 pj_lock_acquire(ioqueue->lock); 535 for (pos=0; pos < ioqueue->connecting_count; ++pos) { 536 if (ioqueue->connecting_keys[pos] == key) { 537 erase_connecting_socket(ioqueue, pos); 538 break; 539 } 540 } 541 key->connecting = 0; 542 pj_lock_release(ioqueue->lock); 543 } 544 #endif 545 546 547 /* Unregistering handle from IOCP is pretty tricky. 548 * 549 * Even after the socket has been closed, GetQueuedCompletionStatus 550 * may still return events for the handle. This will likely to 551 * cause crash in pjlib, because the key associated with the handle 552 * most likely will have been destroyed. 553 * 554 * The solution is to poll the IOCP until we're sure that there are 555 * no further events for the handle. 556 */ 557 558 /* Clear up callbacks for the key. 559 * We don't want the callback to be called for this key. 560 */ 561 key->cb.on_read_complete = NULL; 562 key->cb.on_write_complete = NULL; 563 key->cb.on_accept_complete = NULL; 564 key->cb.on_connect_complete = NULL; 565 566 /* Init overlapped struct */ 567 pj_memset(&ov, 0, sizeof(ov)); 568 ov.operation = PJ_IOQUEUE_OP_READ; 569 570 /* Post queued completion status with a special length. */ 571 rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN, 572 (DWORD)key, &ov.overlapped); 573 574 /* Poll IOCP until has_quit_signal is set in the key. 575 * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN 576 * is detected. We need to have this flag because POST_QUIT_LEN may be 577 * detected by other threads. 578 */ 579 do { 580 polled_len = 0; 581 polled_key = NULL; 582 583 rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key); 584 585 } while (rc && !key->has_quit_signal); 586 587 588 /* Close handle if this is a file. */ 589 if (key->hnd_type == HND_IS_FILE) { 590 CloseHandle(key->hnd); 591 } 592 593 return PJ_SUCCESS; 594 } 595 596 /* 597 * pj_ioqueue_poll() 598 * 599 * Poll for events. 600 */ 601 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 602 { 603 DWORD dwMsec; 604 int connect_count = 0; 605 pj_bool_t has_event; 606 607 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 608 609 /* Check the connecting array. */ 610 #if PJ_HAS_TCP 611 connect_count = check_connecting(ioqueue); 612 #endif 613 614 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ 615 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; 616 617 /* Poll for completion status. */ 618 has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 619 620 /* Return number of events. */ 621 return connect_count + has_event; 530 622 } 531 623 -
pjproject/trunk/pjlib/src/pjlib-test/ioq_udp.c
r126 r349 314 314 } 315 315 316 317 static void on_read_complete(pj_ioqueue_key_t *key, 318 pj_ioqueue_op_key_t *op_key, 319 pj_ssize_t bytes_read) 320 { 321 unsigned *p_packet_cnt = pj_ioqueue_get_user_data(key); 322 323 PJ_UNUSED_ARG(op_key); 324 PJ_UNUSED_ARG(bytes_read); 325 326 (*p_packet_cnt)++; 327 } 328 329 /* 330 * unregister_test() 331 * Check if callback is still called after socket has been unregistered or 332 * closed. 333 */ 334 static int unregister_test(void) 335 { 336 enum { RPORT = 50000, SPORT = 50001 }; 337 pj_pool_t *pool; 338 pj_ioqueue_t *ioqueue; 339 pj_sock_t ssock; 340 pj_sock_t rsock; 341 int addrlen; 342 pj_sockaddr_in addr; 343 pj_ioqueue_key_t *key; 344 pj_ioqueue_op_key_t opkey; 345 pj_ioqueue_callback cb; 346 unsigned packet_cnt; 347 char sendbuf[10], recvbuf[10]; 348 pj_ssize_t bytes; 349 pj_time_val timeout; 350 pj_status_t status; 351 352 pool = pj_pool_create(mem, "test", 4000, 4000, NULL); 353 if (!pool) { 354 app_perror("Unable to create pool", PJ_ENOMEM); 355 return -100; 356 } 357 358 status = pj_ioqueue_create(pool, 16, &ioqueue); 359 if (status != PJ_SUCCESS) { 360 app_perror("Error creating ioqueue", status); 361 return -110; 362 } 363 364 /* Create sender socket */ 365 status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, SPORT, &ssock); 366 if (status != PJ_SUCCESS) { 367 app_perror("Error initializing socket", status); 368 return -120; 369 } 370 371 /* Create receiver socket. */ 372 status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, RPORT, &rsock); 373 if (status != PJ_SUCCESS) { 374 app_perror("Error initializing socket", status); 375 return -130; 376 } 377 378 /* Register rsock to ioqueue. */ 379 pj_memset(&cb, 0, sizeof(cb)); 380 cb.on_read_complete = &on_read_complete; 381 packet_cnt = 0; 382 status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt, 383 &cb, &key); 384 if (status != PJ_SUCCESS) { 385 app_perror("Error registering to ioqueue", status); 386 return -140; 387 } 388 389 /* Init operation key. */ 390 pj_ioqueue_op_key_init(&opkey, sizeof(opkey)); 391 392 /* Start reading. */ 393 bytes = sizeof(recvbuf); 394 status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0); 395 if (status != PJ_EPENDING) { 396 app_perror("Expecting PJ_EPENDING, but got this", status); 397 return -150; 398 } 399 400 /* Init destination address. */ 401 addrlen = sizeof(addr); 402 status = pj_sock_getsockname(rsock, &addr, &addrlen); 403 if (status != PJ_SUCCESS) { 404 app_perror("getsockname error", status); 405 return -160; 406 } 407 408 /* Override address with 127.0.0.1, since getsockname will return 409 * zero in the address field. 410 */ 411 addr.sin_addr = pj_inet_addr2("127.0.0.1"); 412 413 /* Init buffer to send */ 414 pj_ansi_strcpy(sendbuf, "Hello0123"); 415 416 /* Send one packet. */ 417 bytes = sizeof(sendbuf); 418 status = pj_sock_sendto(ssock, sendbuf, &bytes, 0, 419 &addr, sizeof(addr)); 420 421 if (status != PJ_SUCCESS) { 422 app_perror("sendto error", status); 423 return -170; 424 } 425 426 /* Check if packet is received. */ 427 timeout.sec = 1; timeout.msec = 0; 428 pj_ioqueue_poll(ioqueue, &timeout); 429 430 if (packet_cnt != 1) { 431 return -180; 432 } 433 434 /* Just to make sure things are settled.. */ 435 pj_thread_sleep(100); 436 437 /* Start reading again. */ 438 bytes = sizeof(recvbuf); 439 status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0); 440 if (status != PJ_EPENDING) { 441 app_perror("Expecting PJ_EPENDING, but got this", status); 442 return -190; 443 } 444 445 /* Reset packet counter */ 446 packet_cnt = 0; 447 448 /* Send one packet. */ 449 bytes = sizeof(sendbuf); 450 status = pj_sock_sendto(ssock, sendbuf, &bytes, 0, 451 &addr, sizeof(addr)); 452 453 if (status != PJ_SUCCESS) { 454 app_perror("sendto error", status); 455 return -200; 456 } 457 458 /* Now unregister and close socket. */ 459 pj_ioqueue_unregister(key); 460 pj_sock_close(rsock); 461 462 /* Poll ioqueue. */ 463 timeout.sec = 1; timeout.msec = 0; 464 pj_ioqueue_poll(ioqueue, &timeout); 465 466 /* Must NOT receive any packets after socket is closed! */ 467 if (packet_cnt > 0) { 468 PJ_LOG(3,(THIS_FILE, "....errror: not expecting to receive packet " 469 "after socket has been closed")); 470 return -210; 471 } 472 473 /* Success */ 474 pj_sock_close(ssock); 475 pj_ioqueue_destroy(ioqueue); 476 477 pj_pool_release(pool); 478 479 return 0; 480 } 481 482 316 483 /* 317 484 * Testing with many handles. … … 626 793 PJ_LOG(3, (THIS_FILE, "....compliance test ok")); 627 794 795 796 PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name())); 797 if ((status=unregister_test()) != 0) { 798 return status; 799 } 800 PJ_LOG(3, (THIS_FILE, "....unregister test ok")); 801 628 802 if ((status=many_handles_test()) != 0) { 629 803 return status; -
pjproject/trunk/pjlib/src/pjlib-test/main_win32.c
r126 r349 56 56 PJ_UNUSED_ARG(len); 57 57 SendMessage(hwndLog, EM_REPLACESEL, FALSE, 58 (LPARAM)PJ_STRING_TO_NATIVE(data,wdata ));58 (LPARAM)PJ_STRING_TO_NATIVE(data,wdata,256)); 59 59 } 60 60
Note: See TracChangeset
for help on using the changeset viewer.