- Timestamp:
- Jul 29, 2008 8:15:15 PM (16 years ago)
- Location:
- pjproject/trunk/pjlib
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjlib/include/pj/activesock.h
r2177 r2185 202 202 int concurrency; 203 203 204 /** 205 * If this option is specified, the active socket will make sure that 206 * asynchronous send operation with stream oriented socket will only 207 * call the callback after all data has been sent. This means that the 208 * active socket will automatically resend the remaining data until 209 * all data has been sent. 210 * 211 * Please note that when this option is specified, it is possible that 212 * error is reported after partial data has been sent. Also setting 213 * this will disable the ioqueue concurrency for the socket. 214 * 215 * Default value is 1. 216 */ 217 pj_bool_t whole_data; 218 204 219 } pj_activesock_cfg; 205 220 -
pjproject/trunk/pjlib/include/pj/ioqueue.h
r2039 r2185 209 209 { 210 210 void *internal__[32]; /**< Internal I/O Queue data. */ 211 void *activesock_data; /**< Active socket data. */ 211 212 void *user_data; /**< Application data. */ 212 213 } pj_ioqueue_op_key_t; -
pjproject/trunk/pjlib/src/pj/activesock.c
r2177 r2185 52 52 }; 53 53 54 struct send_data 55 { 56 pj_uint8_t *data; 57 pj_ssize_t len; 58 pj_ssize_t sent; 59 unsigned flags; 60 }; 61 54 62 struct pj_activesock_t 55 63 { 56 64 pj_ioqueue_key_t *key; 57 65 pj_bool_t stream_oriented; 66 pj_bool_t whole_data; 58 67 pj_ioqueue_t *ioqueue; 59 68 void *user_data; … … 61 70 unsigned max_loop; 62 71 pj_activesock_cb cb; 72 73 struct send_data send_data; 63 74 64 75 struct read_op *read_op; … … 90 101 cfg->async_cnt = 1; 91 102 cfg->concurrency = -1; 103 cfg->whole_data = PJ_TRUE; 92 104 } 93 105 … … 116 128 asock->stream_oriented = (sock_type == pj_SOCK_STREAM()); 117 129 asock->async_count = (opt? opt->async_cnt : 1); 130 asock->whole_data = (opt? opt->whole_data : 1); 118 131 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP; 119 132 asock->user_data = user_data; … … 135 148 } 136 149 137 if (opt && opt->concurrency >= 0) { 150 if (asock->whole_data) { 151 /* Must disable concurrency otherwise there is a race condition */ 152 pj_ioqueue_set_concurrency(asock->key, 0); 153 } else if (opt && opt->concurrency >= 0) { 138 154 pj_ioqueue_set_concurrency(asock->key, opt->concurrency); 139 155 } … … 444 460 445 461 462 static pj_status_t send_remaining(pj_activesock_t *asock, 463 pj_ioqueue_op_key_t *send_key) 464 { 465 struct send_data *sd = (struct send_data*)send_key->activesock_data; 466 pj_status_t status; 467 468 do { 469 pj_ssize_t size; 470 471 size = sd->len - sd->sent; 472 status = pj_ioqueue_send(asock->key, send_key, 473 sd->data+sd->sent, &size, sd->flags); 474 if (status != PJ_SUCCESS) { 475 /* Pending or error */ 476 break; 477 } 478 479 sd->sent += size; 480 if (sd->sent == sd->len) { 481 /* The whole data has been sent. */ 482 return PJ_SUCCESS; 483 } 484 485 } while (sd->sent < sd->len); 486 487 return status; 488 } 489 490 446 491 PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, 447 492 pj_ioqueue_op_key_t *send_key, … … 452 497 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); 453 498 454 return pj_ioqueue_send(asock->key, send_key, data, size, flags); 499 send_key->activesock_data = NULL; 500 501 if (asock->whole_data) { 502 pj_ssize_t whole; 503 pj_status_t status; 504 505 whole = *size; 506 507 status = pj_ioqueue_send(asock->key, send_key, data, size, flags); 508 if (status != PJ_SUCCESS) { 509 /* Pending or error */ 510 return status; 511 } 512 513 if (*size == whole) { 514 /* The whole data has been sent. */ 515 return PJ_SUCCESS; 516 } 517 518 /* Data was partially sent */ 519 asock->send_data.data = (pj_uint8_t*)data; 520 asock->send_data.len = whole; 521 asock->send_data.sent = *size; 522 asock->send_data.flags = flags; 523 send_key->activesock_data = &asock->send_data; 524 525 /* Try again */ 526 status = send_remaining(asock, send_key); 527 if (status == PJ_SUCCESS) { 528 *size = whole; 529 } 530 return status; 531 532 } else { 533 return pj_ioqueue_send(asock->key, send_key, data, size, flags); 534 } 455 535 } 456 536 … … 479 559 480 560 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); 561 562 if (bytes_sent > 0 && op_key->activesock_data) { 563 /* whole_data is requested. Make sure we send all the data */ 564 struct send_data *sd = (struct send_data*)op_key->activesock_data; 565 566 sd->sent += bytes_sent; 567 if (sd->sent == sd->len) { 568 /* all has been sent */ 569 bytes_sent = sd->sent; 570 op_key->activesock_data = NULL; 571 } else { 572 /* send remaining data */ 573 pj_status_t status; 574 575 status = send_remaining(asock, op_key); 576 if (status == PJ_EPENDING) 577 return; 578 else if (status == PJ_SUCCESS) 579 bytes_sent = sd->sent; 580 else 581 bytes_sent = -status; 582 583 op_key->activesock_data = NULL; 584 } 585 } 481 586 482 587 if (asock->cb.on_data_sent) { -
pjproject/trunk/pjlib/src/pjlib-test/activesock.c
r2039 r2185 247 247 248 248 249 250 #define SIGNATURE 0xdeadbeef 251 struct tcp_pkt 252 { 253 pj_uint32_t signature; 254 pj_uint32_t seq; 255 char fill[513]; 256 }; 257 258 struct tcp_state 259 { 260 pj_bool_t err; 261 pj_bool_t sent; 262 pj_uint32_t next_recv_seq; 263 pj_uint8_t pkt[600]; 264 }; 265 266 struct send_key 267 { 268 pj_ioqueue_op_key_t op_key; 269 }; 270 271 272 static pj_bool_t tcp_on_data_read(pj_activesock_t *asock, 273 void *data, 274 pj_size_t size, 275 pj_status_t status, 276 pj_size_t *remainder) 277 { 278 struct tcp_state *st = (struct tcp_state*) pj_activesock_get_user_data(asock); 279 char *next = (char*) data; 280 281 if (status != PJ_SUCCESS && status != PJ_EPENDING) { 282 PJ_LOG(1,("", " err: status=%d", status)); 283 st->err = PJ_TRUE; 284 return PJ_FALSE; 285 } 286 287 while (size >= sizeof(struct tcp_pkt)) { 288 struct tcp_pkt *tcp_pkt = (struct tcp_pkt*) next; 289 290 if (tcp_pkt->signature != SIGNATURE) { 291 PJ_LOG(1,("", " err: invalid signature at seq=%d", 292 st->next_recv_seq)); 293 st->err = PJ_TRUE; 294 return PJ_FALSE; 295 } 296 if (tcp_pkt->seq != st->next_recv_seq) { 297 PJ_LOG(1,("", " err: wrong sequence")); 298 st->err = PJ_TRUE; 299 return PJ_FALSE; 300 } 301 302 st->next_recv_seq++; 303 next += sizeof(struct tcp_pkt); 304 size -= sizeof(struct tcp_pkt); 305 } 306 307 if (size) { 308 pj_memmove(data, next, size); 309 *remainder = size; 310 } 311 312 return PJ_TRUE; 313 } 314 315 static pj_bool_t tcp_on_data_sent(pj_activesock_t *asock, 316 pj_ioqueue_op_key_t *op_key, 317 pj_ssize_t sent) 318 { 319 struct tcp_state *st=(struct tcp_state*)pj_activesock_get_user_data(asock); 320 321 st->sent = 1; 322 323 if (sent < 1) { 324 st->err = PJ_TRUE; 325 return PJ_FALSE; 326 } 327 328 return PJ_TRUE; 329 } 330 331 static int tcp_perf_test(void) 332 { 333 enum { COUNT=100000 }; 334 pj_pool_t *pool = NULL; 335 pj_ioqueue_t *ioqueue = NULL; 336 pj_sock_t sock1=PJ_INVALID_SOCKET, sock2=PJ_INVALID_SOCKET; 337 pj_activesock_t *asock1 = NULL, *asock2 = NULL; 338 pj_activesock_cb cb; 339 struct tcp_state *state1, *state2; 340 unsigned i; 341 pj_status_t status; 342 343 pool = pj_pool_create(mem, "tcpperf", 256, 256, NULL); 344 345 status = app_socketpair(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock1, 346 &sock2); 347 if (status != PJ_SUCCESS) { 348 status = -100; 349 goto on_return; 350 } 351 352 status = pj_ioqueue_create(pool, 4, &ioqueue); 353 if (status != PJ_SUCCESS) { 354 status = -110; 355 goto on_return; 356 } 357 358 pj_bzero(&cb, sizeof(cb)); 359 cb.on_data_read = &tcp_on_data_read; 360 cb.on_data_sent = &tcp_on_data_sent; 361 362 state1 = PJ_POOL_ZALLOC_T(pool, struct tcp_state); 363 status = pj_activesock_create(pool, sock1, pj_SOCK_STREAM(), NULL, ioqueue, 364 &cb, state1, &asock1); 365 if (status != PJ_SUCCESS) { 366 status = -120; 367 goto on_return; 368 } 369 370 state2 = PJ_POOL_ZALLOC_T(pool, struct tcp_state); 371 status = pj_activesock_create(pool, sock2, pj_SOCK_STREAM(), NULL, ioqueue, 372 &cb, state2, &asock2); 373 if (status != PJ_SUCCESS) { 374 status = -130; 375 goto on_return; 376 } 377 378 status = pj_activesock_start_read(asock1, pool, 1000, 0); 379 if (status != PJ_SUCCESS) { 380 status = -140; 381 goto on_return; 382 } 383 384 /* Send packet as quickly as possible */ 385 for (i=0; i<COUNT && !state1->err && !state2->err; ++i) { 386 struct tcp_pkt *pkt; 387 struct send_key send_key[2], *op_key; 388 pj_ssize_t len; 389 390 pkt = (struct tcp_pkt*)state2->pkt; 391 pkt->signature = SIGNATURE; 392 pkt->seq = i; 393 pj_memset(pkt->fill, 'a', sizeof(pkt->fill)); 394 395 op_key = &send_key[i%2]; 396 pj_ioqueue_op_key_init(&op_key->op_key, sizeof(*op_key)); 397 398 state2->sent = PJ_FALSE; 399 len = sizeof(*pkt); 400 status = pj_activesock_send(asock2, &op_key->op_key, pkt, &len, 0); 401 if (status == PJ_EPENDING) { 402 do { 403 pj_ioqueue_poll(ioqueue, NULL); 404 } while (!state2->sent); 405 } else if (status != PJ_SUCCESS) { 406 PJ_LOG(1,("", " err: send status=%d", status)); 407 status = -180; 408 break; 409 } else if (status == PJ_SUCCESS) { 410 if (len != sizeof(*pkt)) { 411 PJ_LOG(1,("", " err: shouldn't report partial sent")); 412 status = -190; 413 break; 414 } 415 } 416 } 417 418 /* Wait until everything has been sent/received */ 419 if (state1->next_recv_seq < COUNT) { 420 pj_time_val delay = {0, 100}; 421 while (pj_ioqueue_poll(ioqueue, &delay) > 0) 422 ; 423 } 424 425 if (status == PJ_EPENDING) 426 status = PJ_SUCCESS; 427 428 if (status != 0) 429 goto on_return; 430 431 if (state1->err) { 432 status = -183; 433 goto on_return; 434 } 435 if (state2->err) { 436 status = -186; 437 goto on_return; 438 } 439 if (state1->next_recv_seq != COUNT) { 440 PJ_LOG(3,("", " err: only %u packets received, expecting %u", 441 state1->next_recv_seq, COUNT)); 442 status = -195; 443 goto on_return; 444 } 445 446 on_return: 447 if (asock2) 448 pj_activesock_close(asock2); 449 if (asock1) 450 pj_activesock_close(asock1); 451 if (ioqueue) 452 pj_ioqueue_destroy(ioqueue); 453 if (pool) 454 pj_pool_release(pool); 455 456 return status; 457 } 458 459 460 249 461 int activesock_test(void) 250 462 { 251 463 int ret; 464 465 ret = (int)&udp_ping_pong_test; 252 466 253 467 PJ_LOG(3,("", "..udp ping/pong test")); 254 468 ret = udp_ping_pong_test(); 469 if (ret != 0) 470 return ret; 471 472 PJ_LOG(3,("", "..tcp perf test")); 473 ret = tcp_perf_test(); 255 474 if (ret != 0) 256 475 return ret;
Note: See TracChangeset
for help on using the changeset viewer.