Changeset 11 for pjproject/main/pjlib/src/pjlib-test/ioq_perf.c
- Timestamp:
- Nov 6, 2005 9:37:47 AM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/main/pjlib/src/pjlib-test/ioq_perf.c
r6 r11 35 35 typedef struct test_item 36 36 { 37 pj_sock_t server_fd, 38 client_fd; 39 pj_ioqueue_t *ioqueue; 40 pj_ioqueue_key_t *server_key, 41 *client_key; 42 pj_size_t buffer_size; 43 char *outgoing_buffer; 44 char *incoming_buffer; 45 pj_size_t bytes_sent, 46 bytes_recv; 37 pj_sock_t server_fd, 38 client_fd; 39 pj_ioqueue_t *ioqueue; 40 pj_ioqueue_key_t *server_key, 41 *client_key; 42 pj_ioqueue_op_key_t recv_op, 43 send_op; 44 int has_pending_send; 45 pj_size_t buffer_size; 46 char *outgoing_buffer; 47 char *incoming_buffer; 48 pj_size_t bytes_sent, 49 bytes_recv; 47 50 } test_item; 48 51 … … 50 53 * Increment item->bytes_recv and ready to read the next data. 51 54 */ 52 static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) 55 static void on_read_complete(pj_ioqueue_key_t *key, 56 pj_ioqueue_op_key_t *op_key, 57 pj_ssize_t bytes_read) 53 58 { 54 59 test_item *item = pj_ioqueue_get_user_data(key); 55 60 pj_status_t rc; 61 int data_is_available = 1; 56 62 57 63 //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); 58 64 59 if (thread_quit_flag) 60 return; 61 62 if (bytes_read < 0) { 63 pj_status_t rc = -bytes_read; 64 char errmsg[128]; 65 66 if (rc != last_error) { 67 last_error = rc; 68 pj_strerror(rc, errmsg, sizeof(errmsg)); 69 PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", 70 bytes_read, errmsg)); 71 PJ_LOG(3,(THIS_FILE, 72 ".....additional info: total read=%u, total written=%u", 73 item->bytes_recv, item->bytes_sent)); 74 } else { 75 last_error_counter++; 76 } 77 bytes_read = 0; 78 79 } else if (bytes_read == 0) { 80 PJ_LOG(3,(THIS_FILE, "...socket has closed!")); 81 } 82 83 item->bytes_recv += bytes_read; 65 do { 66 if (thread_quit_flag) 67 return; 68 69 if (bytes_read < 0) { 70 pj_status_t rc = -bytes_read; 71 char errmsg[128]; 72 73 if (rc != last_error) { 74 last_error = rc; 75 pj_strerror(rc, errmsg, sizeof(errmsg)); 76 PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", 77 bytes_read, errmsg)); 78 PJ_LOG(3,(THIS_FILE, 79 ".....additional info: total read=%u, total written=%u", 80 item->bytes_recv, item->bytes_sent)); 81 } else { 82 last_error_counter++; 83 } 84 bytes_read = 0; 85 86 } else if (bytes_read == 0) { 87 PJ_LOG(3,(THIS_FILE, "...socket has closed!")); 88 } 89 90 item->bytes_recv += bytes_read; 84 91 85 /* To assure that the test quits, even if main thread 86 * doesn't have time to run. 87 */ 88 if (item->bytes_recv > item->buffer_size * 10000) 89 thread_quit_flag = 1; 90 91 rc = pj_ioqueue_recv( item->ioqueue, item->server_key, 92 item->incoming_buffer, item->buffer_size, 0 ); 93 94 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 95 if (rc != last_error) { 96 last_error = rc; 97 app_perror("...error: read error", rc); 98 } else { 99 last_error_counter++; 100 } 101 } 92 /* To assure that the test quits, even if main thread 93 * doesn't have time to run. 94 */ 95 if (item->bytes_recv > item->buffer_size * 10000) 96 thread_quit_flag = 1; 97 98 bytes_read = item->buffer_size; 99 rc = pj_ioqueue_recv( key, op_key, 100 item->incoming_buffer, &bytes_read, 0 ); 101 102 if (rc == PJ_SUCCESS) { 103 data_is_available = 1; 104 } else if (rc == PJ_EPENDING) { 105 data_is_available = 0; 106 } else { 107 data_is_available = 0; 108 if (rc != last_error) { 109 last_error = rc; 110 app_perror("...error: read error", rc); 111 } else { 112 last_error_counter++; 113 } 114 } 115 116 if (!item->has_pending_send) { 117 pj_ssize_t sent = item->buffer_size; 118 rc = pj_ioqueue_send(item->client_key, &item->send_op, 119 item->outgoing_buffer, &sent, 0); 120 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 121 app_perror("...error: write error", rc); 122 } 123 124 item->has_pending_send = (rc==PJ_EPENDING); 125 } 126 127 } while (data_is_available); 102 128 } 103 129 … … 105 131 * Increment item->bytes_sent and write the next data. 106 132 */ 107 static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) 133 static void on_write_complete(pj_ioqueue_key_t *key, 134 pj_ioqueue_op_key_t *op_key, 135 pj_ssize_t bytes_sent) 108 136 { 109 137 test_item *item = pj_ioqueue_get_user_data(key); … … 114 142 return; 115 143 144 item->has_pending_send = 0; 116 145 item->bytes_sent += bytes_sent; 117 146 … … 123 152 pj_status_t rc; 124 153 125 rc = pj_ioqueue_write(item->ioqueue, item->client_key, 126 item->outgoing_buffer, item->buffer_size); 154 bytes_sent = item->buffer_size; 155 rc = pj_ioqueue_send( item->client_key, op_key, 156 item->outgoing_buffer, &bytes_sent, 0); 127 157 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 128 158 app_perror("...error: write error", rc); 129 159 } 160 161 item->has_pending_send = (rc==PJ_EPENDING); 130 162 } 131 163 } … … 192 224 193 225 TRACE_((THIS_FILE, " creating ioqueue..")); 194 rc = pj_ioqueue_create(pool, sockpair_cnt*2, thread_cnt,&ioqueue);226 rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); 195 227 if (rc != PJ_SUCCESS) { 196 228 app_perror("...error: unable to create ioqueue", rc); … … 200 232 /* Initialize each producer-consumer pair. */ 201 233 for (i=0; i<sockpair_cnt; ++i) { 234 pj_ssize_t bytes; 202 235 203 236 items[i].ioqueue = ioqueue; … … 243 276 /* Start reading. */ 244 277 TRACE_((THIS_FILE, " pj_ioqueue_recv..")); 245 rc = pj_ioqueue_recv(ioqueue, items[i].server_key, 246 items[i].incoming_buffer, items[i].buffer_size, 278 bytes = items[i].buffer_size; 279 rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op, 280 items[i].incoming_buffer, &bytes, 247 281 0); 248 if (rc != PJ_ SUCCESS && rc != PJ_EPENDING) {282 if (rc != PJ_EPENDING) { 249 283 app_perror("...error: pj_ioqueue_recv", rc); 250 284 return -73; … … 253 287 /* Start writing. */ 254 288 TRACE_((THIS_FILE, " pj_ioqueue_write..")); 255 rc = pj_ioqueue_write(ioqueue, items[i].client_key, 256 items[i].outgoing_buffer, items[i].buffer_size); 289 bytes = items[i].buffer_size; 290 rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op, 291 items[i].outgoing_buffer, &bytes, 0); 257 292 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { 258 293 app_perror("...error: pj_ioqueue_write", rc); … … 260 295 } 261 296 297 items[i].has_pending_send = (rc==PJ_EPENDING); 262 298 } 263 299 … … 325 361 TRACE_((THIS_FILE, " closing all sockets..")); 326 362 for (i=0; i<sockpair_cnt; ++i) { 327 pj_ioqueue_unregister(i oqueue, items[i].server_key);328 pj_ioqueue_unregister(i oqueue, items[i].client_key);363 pj_ioqueue_unregister(items[i].server_key); 364 pj_ioqueue_unregister(items[i].client_key); 329 365 pj_sock_close(items[i].server_fd); 330 366 pj_sock_close(items[i].client_fd);
Note: See TracChangeset
for help on using the changeset viewer.