Ignore:
Timestamp:
Nov 6, 2005 9:37:47 AM (18 years ago)
Author:
bennylp
Message:

Changed ioqueue to allow simultaneous operations on the same key

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/main/pjlib/src/pjlib-test/ioq_perf.c

    r6 r11  
    3535typedef struct test_item 
    3636{ 
    37     pj_sock_t       server_fd,  
    38                     client_fd; 
    39     pj_ioqueue_t    *ioqueue; 
    40     pj_ioqueue_key_t *server_key, 
    41                    *client_key; 
    42     pj_size_t       buffer_size; 
    43     char           *outgoing_buffer; 
    44     char           *incoming_buffer; 
    45     pj_size_t       bytes_sent,  
    46                     bytes_recv; 
     37    pj_sock_t            server_fd,  
     38                         client_fd; 
     39    pj_ioqueue_t        *ioqueue; 
     40    pj_ioqueue_key_t    *server_key, 
     41                        *client_key; 
     42    pj_ioqueue_op_key_t  recv_op, 
     43                         send_op; 
     44    int                  has_pending_send; 
     45    pj_size_t            buffer_size; 
     46    char                *outgoing_buffer; 
     47    char                *incoming_buffer; 
     48    pj_size_t            bytes_sent,  
     49                         bytes_recv; 
    4750} test_item; 
    4851 
     
    5053 * Increment item->bytes_recv and ready to read the next data. 
    5154 */ 
    52 static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) 
     55static void on_read_complete(pj_ioqueue_key_t *key,  
     56                             pj_ioqueue_op_key_t *op_key, 
     57                             pj_ssize_t bytes_read) 
    5358{ 
    5459    test_item *item = pj_ioqueue_get_user_data(key); 
    5560    pj_status_t rc; 
     61    int data_is_available = 1; 
    5662 
    5763    //TRACE_((THIS_FILE, "     read complete, bytes_read=%d", bytes_read)); 
    5864 
    59     if (thread_quit_flag) 
    60         return; 
    61  
    62     if (bytes_read < 0) { 
    63         pj_status_t rc = -bytes_read; 
    64         char errmsg[128]; 
    65  
    66         if (rc != last_error) { 
    67             last_error = rc; 
    68             pj_strerror(rc, errmsg, sizeof(errmsg)); 
    69             PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",  
    70                       bytes_read, errmsg)); 
    71             PJ_LOG(3,(THIS_FILE,  
    72                       ".....additional info: total read=%u, total written=%u", 
    73                       item->bytes_recv, item->bytes_sent)); 
    74         } else { 
    75             last_error_counter++; 
    76         } 
    77         bytes_read = 0; 
    78  
    79     } else if (bytes_read == 0) { 
    80         PJ_LOG(3,(THIS_FILE, "...socket has closed!")); 
    81     } 
    82  
    83     item->bytes_recv += bytes_read; 
     65    do { 
     66        if (thread_quit_flag) 
     67            return; 
     68 
     69        if (bytes_read < 0) { 
     70            pj_status_t rc = -bytes_read; 
     71            char errmsg[128]; 
     72 
     73            if (rc != last_error) { 
     74                last_error = rc; 
     75                pj_strerror(rc, errmsg, sizeof(errmsg)); 
     76                PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",  
     77                          bytes_read, errmsg)); 
     78                PJ_LOG(3,(THIS_FILE,  
     79                          ".....additional info: total read=%u, total written=%u", 
     80                          item->bytes_recv, item->bytes_sent)); 
     81            } else { 
     82                last_error_counter++; 
     83            } 
     84            bytes_read = 0; 
     85 
     86        } else if (bytes_read == 0) { 
     87            PJ_LOG(3,(THIS_FILE, "...socket has closed!")); 
     88        } 
     89 
     90        item->bytes_recv += bytes_read; 
    8491     
    85     /* To assure that the test quits, even if main thread 
    86      * doesn't have time to run. 
    87      */ 
    88     if (item->bytes_recv > item->buffer_size * 10000)  
    89         thread_quit_flag = 1; 
    90  
    91     rc = pj_ioqueue_recv( item->ioqueue, item->server_key, 
    92                           item->incoming_buffer, item->buffer_size, 0 ); 
    93  
    94     if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
    95         if (rc != last_error) { 
    96             last_error = rc; 
    97             app_perror("...error: read error", rc); 
    98         } else { 
    99             last_error_counter++; 
    100         } 
    101     } 
     92        /* To assure that the test quits, even if main thread 
     93         * doesn't have time to run. 
     94         */ 
     95        if (item->bytes_recv > item->buffer_size * 10000)  
     96            thread_quit_flag = 1; 
     97 
     98        bytes_read = item->buffer_size; 
     99        rc = pj_ioqueue_recv( key, op_key, 
     100                              item->incoming_buffer, &bytes_read, 0 ); 
     101 
     102        if (rc == PJ_SUCCESS) { 
     103            data_is_available = 1; 
     104        } else if (rc == PJ_EPENDING) { 
     105            data_is_available = 0; 
     106        } else { 
     107            data_is_available = 0; 
     108            if (rc != last_error) { 
     109                last_error = rc; 
     110                app_perror("...error: read error", rc); 
     111            } else { 
     112                last_error_counter++; 
     113            } 
     114        } 
     115 
     116        if (!item->has_pending_send) { 
     117            pj_ssize_t sent = item->buffer_size; 
     118            rc = pj_ioqueue_send(item->client_key, &item->send_op, 
     119                                 item->outgoing_buffer, &sent, 0); 
     120            if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
     121                app_perror("...error: write error", rc); 
     122            } 
     123 
     124            item->has_pending_send = (rc==PJ_EPENDING); 
     125        } 
     126 
     127    } while (data_is_available); 
    102128} 
    103129 
     
    105131 * Increment item->bytes_sent and write the next data. 
    106132 */ 
    107 static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) 
     133static void on_write_complete(pj_ioqueue_key_t *key,  
     134                              pj_ioqueue_op_key_t *op_key, 
     135                              pj_ssize_t bytes_sent) 
    108136{ 
    109137    test_item *item = pj_ioqueue_get_user_data(key); 
     
    114142        return; 
    115143 
     144    item->has_pending_send = 0; 
    116145    item->bytes_sent += bytes_sent; 
    117146 
     
    123152        pj_status_t rc; 
    124153 
    125         rc = pj_ioqueue_write(item->ioqueue, item->client_key,  
    126                               item->outgoing_buffer, item->buffer_size); 
     154        bytes_sent = item->buffer_size; 
     155        rc = pj_ioqueue_send( item->client_key, op_key, 
     156                              item->outgoing_buffer, &bytes_sent, 0); 
    127157        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
    128158            app_perror("...error: write error", rc); 
    129159        } 
     160 
     161        item->has_pending_send = (rc==PJ_EPENDING); 
    130162    } 
    131163} 
     
    192224 
    193225    TRACE_((THIS_FILE, "     creating ioqueue..")); 
    194     rc = pj_ioqueue_create(pool, sockpair_cnt*2, thread_cnt, &ioqueue); 
     226    rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); 
    195227    if (rc != PJ_SUCCESS) { 
    196228        app_perror("...error: unable to create ioqueue", rc); 
     
    200232    /* Initialize each producer-consumer pair. */ 
    201233    for (i=0; i<sockpair_cnt; ++i) { 
     234        pj_ssize_t bytes; 
    202235 
    203236        items[i].ioqueue = ioqueue; 
     
    243276        /* Start reading. */ 
    244277        TRACE_((THIS_FILE, "      pj_ioqueue_recv..")); 
    245         rc = pj_ioqueue_recv(ioqueue, items[i].server_key, 
    246                              items[i].incoming_buffer, items[i].buffer_size, 
     278        bytes = items[i].buffer_size; 
     279        rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op, 
     280                             items[i].incoming_buffer, &bytes, 
    247281                             0); 
    248         if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
     282        if (rc != PJ_EPENDING) { 
    249283            app_perror("...error: pj_ioqueue_recv", rc); 
    250284            return -73; 
     
    253287        /* Start writing. */ 
    254288        TRACE_((THIS_FILE, "      pj_ioqueue_write..")); 
    255         rc = pj_ioqueue_write(ioqueue, items[i].client_key, 
    256                               items[i].outgoing_buffer, items[i].buffer_size); 
     289        bytes = items[i].buffer_size; 
     290        rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op, 
     291                             items[i].outgoing_buffer, &bytes, 0); 
    257292        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 
    258293            app_perror("...error: pj_ioqueue_write", rc); 
     
    260295        } 
    261296 
     297        items[i].has_pending_send = (rc==PJ_EPENDING); 
    262298    } 
    263299 
     
    325361    TRACE_((THIS_FILE, "     closing all sockets..")); 
    326362    for (i=0; i<sockpair_cnt; ++i) { 
    327         pj_ioqueue_unregister(ioqueue, items[i].server_key); 
    328         pj_ioqueue_unregister(ioqueue, items[i].client_key); 
     363        pj_ioqueue_unregister(items[i].server_key); 
     364        pj_ioqueue_unregister(items[i].client_key); 
    329365        pj_sock_close(items[i].server_fd); 
    330366        pj_sock_close(items[i].client_fd); 
Note: See TracChangeset for help on using the changeset viewer.