Ignore:
Timestamp:
Nov 6, 2005 9:37:47 AM (18 years ago)
Author:
bennylp
Message:

Changed ioqueue to allow simultaneous operations on the same key

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/main/pjlib/src/pjlib-test/ioq_udp.c

    r6 r11  
    3535#define TRACE_(msg)         PJ_LOG(3,(THIS_FILE,"....." msg)) 
    3636 
    37 static pj_ssize_t callback_read_size, 
    38                   callback_write_size, 
    39                   callback_accept_status, 
    40                   callback_connect_status; 
    41 static pj_ioqueue_key_t *callback_read_key, 
    42                         *callback_write_key, 
    43                         *callback_accept_key, 
    44                         *callback_connect_key; 
    45  
    46 static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) 
     37static pj_ssize_t            callback_read_size, 
     38                             callback_write_size, 
     39                             callback_accept_status, 
     40                             callback_connect_status; 
     41static pj_ioqueue_key_t     *callback_read_key, 
     42                            *callback_write_key, 
     43                            *callback_accept_key, 
     44                            *callback_connect_key; 
     45static pj_ioqueue_op_key_t  *callback_read_op, 
     46                            *callback_write_op, 
     47                            *callback_accept_op; 
     48 
     49static void on_ioqueue_read(pj_ioqueue_key_t *key,  
     50                            pj_ioqueue_op_key_t *op_key, 
     51                            pj_ssize_t bytes_read) 
    4752{ 
    4853    callback_read_key = key; 
     54    callback_read_op = op_key; 
    4955    callback_read_size = bytes_read; 
    5056} 
    5157 
    52 static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written) 
     58static void on_ioqueue_write(pj_ioqueue_key_t *key,  
     59                             pj_ioqueue_op_key_t *op_key, 
     60                             pj_ssize_t bytes_written) 
    5361{ 
    5462    callback_write_key = key; 
     63    callback_write_op = op_key; 
    5564    callback_write_size = bytes_written; 
    5665} 
    5766 
    58 static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, int status) 
     67static void on_ioqueue_accept(pj_ioqueue_key_t *key,  
     68                              pj_ioqueue_op_key_t *op_key, 
     69                              pj_sock_t sock, int status) 
    5970{ 
    6071    PJ_UNUSED_ARG(sock); 
    6172    callback_accept_key = key; 
     73    callback_accept_op = op_key; 
    6274    callback_accept_status = status; 
    6375} 
     
    8294#  define S_ADDR s_addr 
    8395#endif 
    84  
    85 /* 
    86  * native_format_test() 
    87  * This is just a simple test to verify that various structures in sock.h 
    88  * are really compatible with operating system's definitions. 
    89  */ 
    90 static int native_format_test(void) 
    91 { 
    92     pj_status_t rc; 
    93  
    94     // Test that PJ_INVALID_SOCKET is working. 
    95     { 
    96         pj_sock_t sock; 
    97         rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, -1, &sock); 
    98         if (rc == PJ_SUCCESS) 
    99             return -1020; 
    100     } 
    101  
    102     // Previous func will set errno var. 
    103     pj_set_os_error(PJ_SUCCESS); 
    104  
    105     return 0; 
    106 } 
    10796 
    10897/* 
     
    120109    pj_ioqueue_t *ioque = NULL; 
    121110    pj_ioqueue_key_t *skey, *ckey; 
     111    pj_ioqueue_op_key_t read_op, write_op; 
    122112    int bufsize = BUF_MIN_SIZE; 
    123113    pj_ssize_t bytes, status = -1; 
     
    158148    // Create I/O Queue. 
    159149    TRACE_("create ioqueue..."); 
    160     rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,  
    161                            PJ_IOQUEUE_DEFAULT_THREADS, &ioque); 
     150    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 
    162151    if (rc != PJ_SUCCESS) { 
    163152        status=-20; goto on_error; 
     
    196185    TRACE_("start recvfrom..."); 
    197186    addrlen = sizeof(addr); 
    198     bytes = pj_ioqueue_recvfrom(ioque, skey, recv_buf, bufsize, 0, 
    199                                 &addr, &addrlen); 
    200     if (bytes < 0 && bytes != PJ_EPENDING) { 
     187    bytes = bufsize; 
     188    rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0, 
     189                             &addr, &addrlen); 
     190    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
     191        app_perror("...error: pj_ioqueue_recvfrom", rc); 
    201192        status=-28; goto on_error; 
    202     } else if (bytes == PJ_EPENDING) { 
     193    } else if (rc == PJ_EPENDING) { 
    203194        recv_pending = 1; 
    204195        PJ_LOG(3, (THIS_FILE,  
     
    212203    // Write must return the number of bytes. 
    213204    TRACE_("start sendto..."); 
    214     bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, &addr,  
    215                               sizeof(addr)); 
    216     if (bytes != bufsize && bytes != PJ_EPENDING) { 
    217         PJ_LOG(1,(THIS_FILE,  
    218                   "......error: sendto returned %d", bytes)); 
     205    bytes = bufsize; 
     206    rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &addr,  
     207                           sizeof(addr)); 
     208    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
     209        app_perror("...error: pj_ioqueue_sendto", rc); 
    219210        status=-30; goto on_error; 
    220     } else if (bytes == PJ_EPENDING) { 
     211    } else if (rc == PJ_EPENDING) { 
    221212        send_pending = 1; 
    222213        PJ_LOG(3, (THIS_FILE,  
     
    233224    callback_read_key = callback_write_key =  
    234225        callback_accept_key = callback_connect_key = NULL; 
     226    callback_read_op = callback_write_op = NULL; 
    235227 
    236228    // Poll if pending. 
    237     while (send_pending && recv_pending) { 
     229    while (send_pending || recv_pending) { 
    238230        int rc; 
    239231        pj_time_val timeout = { 5, 0 }; 
     
    254246                status=-61; goto on_error; 
    255247            } 
    256  
    257248            if (callback_read_key != skey) { 
    258249                status=-65; goto on_error; 
     250            } 
     251            if (callback_read_op != &read_op) { 
     252                status=-66; goto on_error; 
    259253            } 
    260254 
     
    271265                status=-73; goto on_error; 
    272266            } 
    273  
    274267            if (callback_write_key != ckey) { 
    275268                status=-75; goto on_error; 
     269            } 
     270            if (callback_write_op != &write_op) { 
     271                status=-76; goto on_error; 
    276272            } 
    277273 
     
    327323     
    328324    /* Create IOQueue */ 
    329     rc = pj_ioqueue_create(pool, MAX, 
    330                            PJ_IOQUEUE_DEFAULT_THREADS, 
    331                            &ioqueue); 
     325    rc = pj_ioqueue_create(pool, MAX, &ioqueue); 
    332326    if (rc != PJ_SUCCESS || ioqueue == NULL) { 
    333327        app_perror("...error in pj_ioqueue_create", rc); 
     
    359353 
    360354    for (i=0; i<count; ++i) { 
    361         rc = pj_ioqueue_unregister(ioqueue, key[i]); 
     355        rc = pj_ioqueue_unregister(key[i]); 
    362356        if (rc != PJ_SUCCESS) { 
    363357            app_perror("...error in pj_ioqueue_unregister", rc); 
     
    394388    pj_pool_t *pool = NULL; 
    395389    pj_sock_t *inactive_sock=NULL; 
     390    pj_ioqueue_op_key_t *inactive_read_op; 
    396391    char *send_buf, *recv_buf; 
    397392    pj_ioqueue_t *ioque = NULL; 
     
    430425 
    431426    // Create I/O Queue. 
    432     rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,  
    433                            PJ_IOQUEUE_DEFAULT_THREADS, &ioque); 
     427    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); 
    434428    if (rc != PJ_SUCCESS) { 
    435429        app_perror("...error: pj_ioqueue_create()", rc); 
     
    441435    inactive_sock = (pj_sock_t*)pj_pool_alloc(pool,  
    442436                                    inactive_sock_count*sizeof(pj_sock_t)); 
     437    inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool, 
     438                              inactive_sock_count*sizeof(pj_ioqueue_op_key_t)); 
    443439    memset(&addr, 0, sizeof(addr)); 
    444440    addr.sin_family = PJ_AF_INET; 
    445441    for (i=0; i<inactive_sock_count; ++i) { 
     442        pj_ssize_t bytes; 
     443 
    446444        rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]); 
    447445        if (rc != PJ_SUCCESS || inactive_sock[i] < 0) { 
     
    464462            goto on_error; 
    465463        } 
    466         rc = pj_ioqueue_read(ioque, key, recv_buf, bufsize); 
     464        bytes = bufsize; 
     465        rc = pj_ioqueue_recv(key, &inactive_read_op[i], recv_buf, &bytes, 0); 
    467466        if ( rc < 0 && rc != PJ_EPENDING) { 
    468467            pj_sock_close(inactive_sock[i]); 
     
    497496    for (i=0; i<LOOP; ++i) { 
    498497        pj_ssize_t bytes; 
     498        pj_ioqueue_op_key_t read_op, write_op; 
    499499 
    500500        // Randomize send buffer. 
     
    502502 
    503503        // Start reading on the server side. 
    504         rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize); 
     504        bytes = bufsize; 
     505        rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0); 
    505506        if (rc < 0 && rc != PJ_EPENDING) { 
    506507            app_perror("...error: pj_ioqueue_read()", rc); 
     
    509510 
    510511        // Starts send on the client side. 
    511         bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, 
    512                                         &addr, sizeof(addr)); 
    513         if (bytes != bufsize && bytes != PJ_EPENDING) { 
     512        bytes = bufsize; 
     513        rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, 
     514                               &addr, sizeof(addr)); 
     515        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
    514516            app_perror("...error: pj_ioqueue_write()", bytes); 
    515517            rc = -1; 
     
    600602    int bufsize, sock_count; 
    601603 
    602     PJ_LOG(3, (THIS_FILE, "...format test")); 
    603     if ((status = native_format_test()) != 0) 
    604         return status; 
    605     PJ_LOG(3, (THIS_FILE, "....native format test ok")); 
    606  
    607604    PJ_LOG(3, (THIS_FILE, "...compliance test")); 
    608605    if ((status=compliance_test()) != 0) { 
Note: See TracChangeset for help on using the changeset viewer.