Changeset 349


Ignore:
Timestamp:
Mar 22, 2006 11:49:19 AM (19 years ago)
Author:
bennylp
Message:

Fixed bug in ioqueue with IO Completion Port backend, where events may still be called after key is unregistered

Location:
pjproject/trunk/pjlib
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjlib/build/pjlib.dsp

    r313 r349  
    239239 
    240240SOURCE=..\src\pj\ioqueue_winnt.c 
     241 
     242!IF  "$(CFG)" == "pjlib - Win32 Release" 
     243 
     244!ELSEIF  "$(CFG)" == "pjlib - Win32 Debug" 
     245 
     246!ENDIF  
     247 
    241248# End Source File 
    242249# Begin Source File 
  • pjproject/trunk/pjlib/build/pjlib_test.dsp

    r126 r349  
    129129 
    130130SOURCE="..\src\pjlib-test\main.c" 
    131  
    132 !IF  "$(CFG)" == "pjlib_test - Win32 Release" 
    133  
    134 !ELSEIF  "$(CFG)" == "pjlib_test - Win32 Debug" 
    135  
    136 # PROP Exclude_From_Build 1 
    137  
    138 !ENDIF  
    139  
    140131# End Source File 
    141132# Begin Source File 
     
    147138 
    148139SOURCE="..\src\pjlib-test\main_win32.c" 
     140# PROP Exclude_From_Build 1 
    149141# End Source File 
    150142# Begin Source File 
  • pjproject/trunk/pjlib/include/pj/ioqueue.h

    r66 r349  
    327327 * Note that asynchronous connect operation will automatically be  
    328328 * cancelled during the unregistration. 
     329 * 
     330 * Also note that when I/O Completion Port backend is used, application 
     331 * MUST close the handle immediately after unregistering the key. This is 
     332 * because there is no unregistering API for IOCP. The only way to 
     333 * unregister the handle from IOCP is to close the handle. 
    329334 * 
    330335 * @param key       The key that was previously obtained from registration. 
  • pjproject/trunk/pjlib/src/pj/ioqueue_winnt.c

    r83 r349  
    100100}; 
    101101 
     102enum { POST_QUIT_LEN = 0xFFFFDEADUL }; 
     103 
    102104/* 
    103105 * Structure for individual socket. 
     
    113115#endif 
    114116    pj_ioqueue_callback cb; 
     117    pj_bool_t           has_quit_signal; 
    115118}; 
    116119 
     
    393396} 
    394397 
    395 /* 
    396  * pj_ioqueue_unregister() 
    397  */ 
    398 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 
    399 { 
    400     PJ_ASSERT_RETURN(key, PJ_EINVAL); 
    401  
    402 #if PJ_HAS_TCP 
    403     if (key->connecting) { 
    404         unsigned pos; 
    405         pj_ioqueue_t *ioqueue; 
    406  
    407         ioqueue = key->ioqueue; 
    408  
    409         /* Erase from connecting_handles */ 
    410         pj_lock_acquire(ioqueue->lock); 
    411         for (pos=0; pos < ioqueue->connecting_count; ++pos) { 
    412             if (ioqueue->connecting_keys[pos] == key) { 
    413                 erase_connecting_socket(ioqueue, pos); 
    414                 break; 
    415             } 
    416         } 
    417         key->connecting = 0; 
    418         pj_lock_release(ioqueue->lock); 
    419     } 
    420 #endif 
    421     if (key->hnd_type == HND_IS_FILE) { 
    422         CloseHandle(key->hnd); 
    423     } 
    424     return PJ_SUCCESS; 
    425 } 
    426398 
    427399/* 
     
    450422} 
    451423 
    452 /* 
    453  * pj_ioqueue_poll() 
    454  * 
    455  * Poll for events. 
    456  */ 
    457 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 
    458 { 
    459     DWORD dwMsec, dwBytesTransfered, dwKey; 
     424 
     425 
     426/* 
     427 * Internal function to poll the I/O Completion Port, execute callback,  
     428 * and return the key and bytes transfered of the last operation. 
     429 */ 
     430static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,  
     431                            pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key ) 
     432{ 
     433    DWORD dwBytesTransfered, dwKey; 
    460434    generic_overlapped *pOv; 
    461435    pj_ioqueue_key_t *key; 
    462     int connect_count; 
    463436    pj_ssize_t size_status = -1; 
    464     BOOL rcGetQueued;; 
    465  
    466     PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 
    467  
    468     /* Check the connecting array. */ 
    469 #if PJ_HAS_TCP 
    470     connect_count = check_connecting(ioqueue); 
    471 #endif 
    472  
    473     /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ 
    474     dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; 
     437    BOOL rcGetQueued; 
    475438 
    476439    /* Poll for completion status. */ 
    477     rcGetQueued = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, 
     440    rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered, 
    478441                                            &dwKey, (OVERLAPPED**)&pOv,  
    479                                             dwMsec); 
     442                                            dwTimeout); 
    480443 
    481444    /* The return value is: 
     
    488451        key = (pj_ioqueue_key_t*)dwKey; 
    489452        size_status = dwBytesTransfered; 
     453 
     454        /* Report to caller regardless */ 
     455        if (p_bytes) 
     456            *p_bytes = size_status; 
     457        if (p_key) 
     458            *p_key = key; 
     459 
     460        /* If size_status is POST_QUIT_LEN, mark the key as quitting */ 
     461        if (size_status == POST_QUIT_LEN) { 
     462            key->has_quit_signal = 1; 
     463            return PJ_TRUE; 
     464        } 
     465 
     466        /* We shouldn't call callbacks if key is quitting.  
     467         * But this should have been taken care by unregister function 
     468         * (the unregister function should have cleared out the callbacks) 
     469         */ 
     470 
     471        /* Carry out the callback */ 
    490472        switch (pOv->operation) { 
    491473        case PJ_IOQUEUE_OP_READ: 
     
    523505            break; 
    524506        } 
    525         return connect_count+1; 
     507        return PJ_TRUE; 
    526508    } 
    527509 
    528510    /* No event was queued. */ 
    529     return connect_count; 
     511    return PJ_FALSE; 
     512} 
     513 
     514/* 
     515 * pj_ioqueue_unregister() 
     516 */ 
     517PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 
     518{ 
     519    pj_ssize_t polled_len; 
     520    pj_ioqueue_key_t *polled_key; 
     521    generic_overlapped ov; 
     522    BOOL rc; 
     523 
     524    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
     525 
     526#if PJ_HAS_TCP 
     527    if (key->connecting) { 
     528        unsigned pos; 
     529        pj_ioqueue_t *ioqueue; 
     530 
     531        ioqueue = key->ioqueue; 
     532 
     533        /* Erase from connecting_handles */ 
     534        pj_lock_acquire(ioqueue->lock); 
     535        for (pos=0; pos < ioqueue->connecting_count; ++pos) { 
     536            if (ioqueue->connecting_keys[pos] == key) { 
     537                erase_connecting_socket(ioqueue, pos); 
     538                break; 
     539            } 
     540        } 
     541        key->connecting = 0; 
     542        pj_lock_release(ioqueue->lock); 
     543    } 
     544#endif 
     545 
     546 
     547    /* Unregistering handle from IOCP is pretty tricky. 
     548     * 
     549     * Even after the socket has been closed, GetQueuedCompletionStatus 
     550     * may still return events for the handle. This will likely to 
     551     * cause crash in pjlib, because the key associated with the handle 
     552     * most likely will have been destroyed. 
     553     * 
     554     * The solution is to poll the IOCP until we're sure that there are 
     555     * no further events for the handle. 
     556     */ 
     557 
     558    /* Clear up callbacks for the key.  
     559     * We don't want the callback to be called for this key. 
     560     */ 
     561    key->cb.on_read_complete = NULL; 
     562    key->cb.on_write_complete = NULL; 
     563    key->cb.on_accept_complete = NULL; 
     564    key->cb.on_connect_complete = NULL; 
     565 
     566    /* Init overlapped struct */ 
     567    pj_memset(&ov, 0, sizeof(ov)); 
     568    ov.operation = PJ_IOQUEUE_OP_READ; 
     569 
     570    /* Post queued completion status with a special length. */ 
     571    rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN, 
     572                                     (DWORD)key, &ov.overlapped); 
     573 
     574    /* Poll IOCP until has_quit_signal is set in the key. 
     575     * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN 
     576     * is detected. We need to have this flag because POST_QUIT_LEN may be 
     577     * detected by other threads. 
     578     */ 
     579    do { 
     580        polled_len = 0; 
     581        polled_key = NULL; 
     582 
     583        rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key); 
     584 
     585    } while (rc && !key->has_quit_signal); 
     586 
     587 
     588    /* Close handle if this is a file. */ 
     589    if (key->hnd_type == HND_IS_FILE) { 
     590        CloseHandle(key->hnd); 
     591    } 
     592 
     593    return PJ_SUCCESS; 
     594} 
     595 
     596/* 
     597 * pj_ioqueue_poll() 
     598 * 
     599 * Poll for events. 
     600 */ 
     601PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) 
     602{ 
     603    DWORD dwMsec; 
     604    int connect_count = 0; 
     605    pj_bool_t has_event; 
     606 
     607    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); 
     608 
     609    /* Check the connecting array. */ 
     610#if PJ_HAS_TCP 
     611    connect_count = check_connecting(ioqueue); 
     612#endif 
     613 
     614    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ 
     615    dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; 
     616 
     617    /* Poll for completion status. */ 
     618    has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); 
     619 
     620    /* Return number of events. */ 
     621    return connect_count + has_event; 
    530622} 
    531623 
  • pjproject/trunk/pjlib/src/pjlib-test/ioq_udp.c

    r126 r349  
    314314} 
    315315 
     316 
     317static void on_read_complete(pj_ioqueue_key_t *key,  
     318                             pj_ioqueue_op_key_t *op_key,  
     319                             pj_ssize_t bytes_read) 
     320{ 
     321    unsigned *p_packet_cnt = pj_ioqueue_get_user_data(key); 
     322 
     323    PJ_UNUSED_ARG(op_key); 
     324    PJ_UNUSED_ARG(bytes_read); 
     325 
     326    (*p_packet_cnt)++; 
     327} 
     328 
     329/* 
     330 * unregister_test() 
     331 * Check if callback is still called after socket has been unregistered or  
     332 * closed. 
     333 */  
     334static int unregister_test(void) 
     335{ 
     336    enum { RPORT = 50000, SPORT = 50001 }; 
     337    pj_pool_t *pool; 
     338    pj_ioqueue_t *ioqueue; 
     339    pj_sock_t ssock; 
     340    pj_sock_t rsock; 
     341    int addrlen; 
     342    pj_sockaddr_in addr; 
     343    pj_ioqueue_key_t *key; 
     344    pj_ioqueue_op_key_t opkey; 
     345    pj_ioqueue_callback cb; 
     346    unsigned packet_cnt; 
     347    char sendbuf[10], recvbuf[10]; 
     348    pj_ssize_t bytes; 
     349    pj_time_val timeout; 
     350    pj_status_t status; 
     351 
     352    pool = pj_pool_create(mem, "test", 4000, 4000, NULL); 
     353    if (!pool) { 
     354        app_perror("Unable to create pool", PJ_ENOMEM); 
     355        return -100; 
     356    } 
     357 
     358    status = pj_ioqueue_create(pool, 16, &ioqueue); 
     359    if (status != PJ_SUCCESS) { 
     360        app_perror("Error creating ioqueue", status); 
     361        return -110; 
     362    } 
     363 
     364    /* Create sender socket */ 
     365    status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, SPORT, &ssock); 
     366    if (status != PJ_SUCCESS) { 
     367        app_perror("Error initializing socket", status); 
     368        return -120; 
     369    } 
     370 
     371    /* Create receiver socket. */ 
     372    status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, RPORT, &rsock); 
     373    if (status != PJ_SUCCESS) { 
     374        app_perror("Error initializing socket", status); 
     375        return -130; 
     376    } 
     377 
     378    /* Register rsock to ioqueue. */ 
     379    pj_memset(&cb, 0, sizeof(cb)); 
     380    cb.on_read_complete = &on_read_complete; 
     381    packet_cnt = 0; 
     382    status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt, 
     383                                      &cb, &key); 
     384    if (status != PJ_SUCCESS) { 
     385        app_perror("Error registering to ioqueue", status); 
     386        return -140; 
     387    } 
     388 
     389    /* Init operation key. */ 
     390    pj_ioqueue_op_key_init(&opkey, sizeof(opkey)); 
     391 
     392    /* Start reading. */ 
     393    bytes = sizeof(recvbuf); 
     394    status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0); 
     395    if (status != PJ_EPENDING) { 
     396        app_perror("Expecting PJ_EPENDING, but got this", status); 
     397        return -150; 
     398    } 
     399 
     400    /* Init destination address. */ 
     401    addrlen = sizeof(addr); 
     402    status = pj_sock_getsockname(rsock, &addr, &addrlen); 
     403    if (status != PJ_SUCCESS) { 
     404        app_perror("getsockname error", status); 
     405        return -160; 
     406    } 
     407 
     408    /* Override address with 127.0.0.1, since getsockname will return 
     409     * zero in the address field. 
     410     */ 
     411    addr.sin_addr = pj_inet_addr2("127.0.0.1"); 
     412 
     413    /* Init buffer to send */ 
     414    pj_ansi_strcpy(sendbuf, "Hello0123"); 
     415 
     416    /* Send one packet. */ 
     417    bytes = sizeof(sendbuf); 
     418    status = pj_sock_sendto(ssock, sendbuf, &bytes, 0, 
     419                            &addr, sizeof(addr)); 
     420 
     421    if (status != PJ_SUCCESS) { 
     422        app_perror("sendto error", status); 
     423        return -170; 
     424    } 
     425 
     426    /* Check if packet is received. */ 
     427    timeout.sec = 1; timeout.msec = 0; 
     428    pj_ioqueue_poll(ioqueue, &timeout); 
     429 
     430    if (packet_cnt != 1) { 
     431        return -180; 
     432    } 
     433 
     434    /* Just to make sure things are settled.. */ 
     435    pj_thread_sleep(100); 
     436 
     437    /* Start reading again. */ 
     438    bytes = sizeof(recvbuf); 
     439    status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0); 
     440    if (status != PJ_EPENDING) { 
     441        app_perror("Expecting PJ_EPENDING, but got this", status); 
     442        return -190; 
     443    } 
     444 
     445    /* Reset packet counter */ 
     446    packet_cnt = 0; 
     447 
     448    /* Send one packet. */ 
     449    bytes = sizeof(sendbuf); 
     450    status = pj_sock_sendto(ssock, sendbuf, &bytes, 0, 
     451                            &addr, sizeof(addr)); 
     452 
     453    if (status != PJ_SUCCESS) { 
     454        app_perror("sendto error", status); 
     455        return -200; 
     456    } 
     457 
     458    /* Now unregister and close socket. */ 
     459    pj_ioqueue_unregister(key); 
     460    pj_sock_close(rsock); 
     461 
     462    /* Poll ioqueue. */ 
     463    timeout.sec = 1; timeout.msec = 0; 
     464    pj_ioqueue_poll(ioqueue, &timeout); 
     465 
     466    /* Must NOT receive any packets after socket is closed! */ 
     467    if (packet_cnt > 0) { 
     468        PJ_LOG(3,(THIS_FILE, "....errror: not expecting to receive packet " 
     469                             "after socket has been closed")); 
     470        return -210; 
     471    } 
     472 
     473    /* Success */ 
     474    pj_sock_close(ssock); 
     475    pj_ioqueue_destroy(ioqueue); 
     476 
     477    pj_pool_release(pool); 
     478 
     479    return 0; 
     480} 
     481 
     482 
    316483/* 
    317484 * Testing with many handles. 
     
    626793    PJ_LOG(3, (THIS_FILE, "....compliance test ok")); 
    627794 
     795 
     796    PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name())); 
     797    if ((status=unregister_test()) != 0) { 
     798        return status; 
     799    } 
     800    PJ_LOG(3, (THIS_FILE, "....unregister test ok")); 
     801 
    628802    if ((status=many_handles_test()) != 0) { 
    629803        return status; 
  • pjproject/trunk/pjlib/src/pjlib-test/main_win32.c

    r126 r349  
    5656    PJ_UNUSED_ARG(len); 
    5757    SendMessage(hwndLog, EM_REPLACESEL, FALSE,  
    58                 (LPARAM)PJ_STRING_TO_NATIVE(data,wdata)); 
     58                (LPARAM)PJ_STRING_TO_NATIVE(data,wdata,256)); 
    5959} 
    6060 
Note: See TracChangeset for help on using the changeset viewer.