- Timestamp:
- Aug 4, 2008 10:52:51 AM (16 years ago)
- Location:
- pjproject/trunk/pjsip/src
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjsip/src/pjsip/sip_transport_tcp.c
r2180 r2188 22 22 #include <pj/compat/socket.h> 23 23 #include <pj/addr_resolv.h> 24 #include <pj/activesock.h> 24 25 #include <pj/assert.h> 25 #include <pj/ioqueue.h>26 26 #include <pj/lock.h> 27 27 #include <pj/log.h> … … 47 47 48 48 /* 49 * This structure is "descendant" of pj_ioqueue_op_key_t, and it is used to50 * track pending/asynchronous accept() operation. TCP transport may have51 * more than one pending accept() operations, depending on the value of52 * async_cnt.53 */54 struct pending_accept55 {56 pj_ioqueue_op_key_t op_key;57 struct tcp_listener *listener;58 unsigned index;59 pj_pool_t *pool;60 pj_sock_t new_sock;61 int addr_len;62 pj_sockaddr_in local_addr;63 pj_sockaddr_in remote_addr;64 };65 66 67 /*68 49 * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the 69 50 * SIP transport factory). … … 75 56 pjsip_endpoint *endpt; 76 57 pjsip_tpmgr *tpmgr; 77 pj_sock_t sock; 78 pj_ioqueue_key_t *key; 79 unsigned async_cnt; 80 struct pending_accept *accept_op[MAX_ASYNC_CNT]; 58 pj_activesock_t *asock; 81 59 }; 82 60 … … 115 93 pj_status_t close_reason; 116 94 pj_sock_t sock; 117 pj_ ioqueue_key_t *key;95 pj_activesock_t *asock; 118 96 pj_bool_t has_pending_connect; 119 97 … … 140 118 141 119 /* This callback is called when pending accept() operation completes. */ 142 static void on_accept_complete( pj_ioqueue_key_t *key, 143 pj_ioqueue_op_key_t *op_key, 144 pj_sock_t sock, 145 pj_status_t status); 146 147 /* Handle accept() completion */ 148 static pj_status_t handle_accept(pj_ioqueue_key_t *key, 149 pj_ioqueue_op_key_t *op_key, 150 pj_sock_t sock, 151 pj_status_t status); 120 static pj_bool_t on_accept_complete(pj_activesock_t *asock, 121 pj_sock_t newsock, 122 const pj_sockaddr_t *src_addr, 123 int src_addr_len); 152 124 153 125 /* This callback is called by transport manager to destroy listener */ … … 186 158 const pj_sockaddr_in *addr ) 187 159 { 188 enum { M = 48 }; 189 host_port->host.ptr = (char*) pj_pool_alloc(pool, M); 190 host_port->host.slen = pj_ansi_snprintf( host_port->host.ptr, M, "%s", 191 pj_inet_ntoa(addr->sin_addr)); 192 host_port->port = pj_ntohs(addr->sin_port); 160 host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4); 161 pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 2); 162 host_port->host.slen = pj_ansi_strlen(host_port->host.ptr); 163 host_port->port = pj_sockaddr_get_port(addr); 193 164 } 194 165 … … 210 181 { 211 182 pj_pool_t *pool; 183 pj_sock_t sock = PJ_INVALID_SOCKET; 212 184 struct tcp_listener *listener; 213 pj_ioqueue_callback listener_cb; 185 pj_activesock_cfg asock_cfg; 186 pj_activesock_cb listener_cb; 214 187 pj_sockaddr_in *listener_addr; 215 188 int addr_len; 216 unsigned i;217 189 pj_status_t status; 218 190 … … 245 217 listener->factory.flag = 246 218 pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); 247 listener->sock = PJ_INVALID_SOCKET;248 219 249 220 pj_ansi_strcpy(listener->factory.obj_name, "tcplis"); … … 256 227 257 228 /* Create and bind socket */ 258 status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, 259 &listener->sock); 229 status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock); 260 230 if (status != PJ_SUCCESS) 261 231 goto on_error; … … 268 238 } 269 239 270 status = pj_sock_bind(listener->sock, listener_addr, 271 sizeof(pj_sockaddr_in)); 240 status = pj_sock_bind(sock, listener_addr, sizeof(pj_sockaddr_in)); 272 241 if (status != PJ_SUCCESS) 273 242 goto on_error; … … 275 244 /* Retrieve the bound address */ 276 245 addr_len = sizeof(pj_sockaddr_in); 277 status = pj_sock_getsockname( listener->sock, listener_addr, &addr_len);246 status = pj_sock_getsockname(sock, listener_addr, &addr_len); 278 247 if (status != PJ_SUCCESS) 279 248 goto on_error; … … 321 290 322 291 /* Start listening to the address */ 323 status = pj_sock_listen( listener->sock, PJSIP_TCP_TRANSPORT_BACKLOG);292 status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG); 324 293 if (status != PJ_SUCCESS) 325 294 goto on_error; 326 295 327 296 328 /* Register socket to ioqeuue */ 297 /* Create active socket */ 298 if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; 299 pj_activesock_cfg_default(&asock_cfg); 300 asock_cfg.async_cnt = async_cnt; 301 329 302 pj_bzero(&listener_cb, sizeof(listener_cb)); 330 303 listener_cb.on_accept_complete = &on_accept_complete; 331 status = pj_ioqueue_register_sock(pool, pjsip_endpt_get_ioqueue(endpt), 332 listener->sock, listener, 333 &listener_cb, &listener->key); 334 if (status != PJ_SUCCESS) 335 goto on_error; 304 status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg, 305 pjsip_endpt_get_ioqueue(endpt), 306 &listener_cb, listener, 307 &listener->asock); 336 308 337 309 /* Register to transport manager */ … … 348 320 } 349 321 350 351 322 /* Start pending accept() operations */ 352 if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; 353 listener->async_cnt = async_cnt; 354 355 for (i=0; i<async_cnt; ++i) { 356 pj_pool_t *pool; 357 358 pool = pjsip_endpt_create_pool(endpt, "tcps%p", POOL_TP_INIT, 359 POOL_TP_INIT); 360 if (!pool) { 361 status = PJ_ENOMEM; 362 goto on_error; 363 } 364 365 listener->accept_op[i] = PJ_POOL_ZALLOC_T(pool, 366 struct pending_accept); 367 pj_ioqueue_op_key_init(&listener->accept_op[i]->op_key, 368 sizeof(listener->accept_op[i]->op_key)); 369 listener->accept_op[i]->pool = pool; 370 listener->accept_op[i]->listener = listener; 371 listener->accept_op[i]->index = i; 372 373 status = handle_accept(listener->key, &listener->accept_op[i]->op_key, 374 listener->sock, PJ_EPENDING); 375 if (status != PJ_SUCCESS) 376 goto on_error; 377 } 323 status = pj_activesock_start_accept(listener->asock, pool); 324 if (status != PJ_SUCCESS) 325 goto on_error; 378 326 379 327 PJ_LOG(4,(listener->factory.obj_name, … … 389 337 390 338 on_error: 339 if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET) 340 pj_sock_close(sock); 391 341 lis_destroy(&listener->factory); 392 342 return status; … … 411 361 { 412 362 struct tcp_listener *listener = (struct tcp_listener *)factory; 413 unsigned i;414 363 415 364 if (listener->is_registered) { … … 418 367 } 419 368 420 if (listener->key) { 421 pj_ioqueue_unregister(listener->key); 422 listener->key = NULL; 423 listener->sock = PJ_INVALID_SOCKET; 424 } 425 426 if (listener->sock != PJ_INVALID_SOCKET) { 427 pj_sock_close(listener->sock); 428 listener->sock = PJ_INVALID_SOCKET; 369 if (listener->asock) { 370 pj_activesock_close(listener->asock); 371 listener->asock = NULL; 429 372 } 430 373 … … 432 375 pj_lock_destroy(listener->factory.lock); 433 376 listener->factory.lock = NULL; 434 }435 436 for (i=0; i<PJ_ARRAY_SIZE(listener->accept_op); ++i) {437 if (listener->accept_op[i] && listener->accept_op[i]->pool) {438 pj_pool_t *pool = listener->accept_op[i]->pool;439 listener->accept_op[i]->pool = NULL;440 pj_pool_release(pool);441 }442 377 } 443 378 … … 481 416 pj_status_t reason); 482 417 483 /* Callback from ioqueue on incoming packet */ 484 static void on_read_complete(pj_ioqueue_key_t *key, 485 pj_ioqueue_op_key_t *op_key, 486 pj_ssize_t bytes_read); 487 488 /* Callback from ioqueue when packet is sent */ 489 static void on_write_complete(pj_ioqueue_key_t *key, 490 pj_ioqueue_op_key_t *op_key, 491 pj_ssize_t bytes_sent); 492 493 /* Callback from ioqueue when connect completes */ 494 static void on_connect_complete(pj_ioqueue_key_t *key, 495 pj_status_t status); 418 /* Callback on incoming data */ 419 static pj_bool_t on_data_read(pj_activesock_t *asock, 420 void *data, 421 pj_size_t size, 422 pj_status_t status, 423 pj_size_t *remainder); 424 425 /* Callback when packet is sent */ 426 static pj_bool_t on_data_sent(pj_activesock_t *asock, 427 pj_ioqueue_op_key_t *send_key, 428 pj_ssize_t sent); 429 430 /* Callback when connect completes */ 431 static pj_bool_t on_connect_complete(pj_activesock_t *asock, 432 pj_status_t status); 496 433 497 434 /* TCP keep-alive timer callback */ … … 511 448 struct tcp_transport *tcp; 512 449 pj_ioqueue_t *ioqueue; 513 pj_ioqueue_callback tcp_callback; 450 pj_activesock_cfg asock_cfg; 451 pj_activesock_cb tcp_callback; 514 452 const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA; 515 453 pj_status_t status; … … 529 467 */ 530 468 tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport); 469 tcp->is_server = is_server; 531 470 tcp->sock = sock; 532 tcp->is_server = is_server;533 471 /*tcp->listener = listener;*/ 534 472 pj_list_init(&tcp->delayed_list); … … 570 508 571 509 572 /* Register socket to ioqueue */ 573 pj_bzero(&tcp_callback, sizeof(pj_ioqueue_callback)); 574 tcp_callback.on_read_complete = &on_read_complete; 575 tcp_callback.on_write_complete = &on_write_complete; 510 /* Create active socket */ 511 pj_activesock_cfg_default(&asock_cfg); 512 asock_cfg.async_cnt = 1; 513 514 pj_bzero(&tcp_callback, sizeof(tcp_callback)); 515 tcp_callback.on_data_read = &on_data_read; 516 tcp_callback.on_data_sent = &on_data_sent; 576 517 tcp_callback.on_connect_complete = &on_connect_complete; 577 518 578 519 ioqueue = pjsip_endpt_get_ioqueue(listener->endpt); 579 status = pj_ ioqueue_register_sock(pool, ioqueue, sock,580 tcp, &tcp_callback, &tcp->key);520 status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg, 521 ioqueue, &tcp_callback, tcp, &tcp->asock); 581 522 if (status != PJ_SUCCESS) { 582 523 goto on_error; … … 628 569 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 629 570 630 /* send to ioqueue! */571 /* send! */ 631 572 size = tdata->buf.cur - tdata->buf.start; 632 status = pj_ioqueue_send(tcp->key, op_key, 633 tdata->buf.start, &size, 0); 634 573 status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start, 574 &size, 0); 635 575 if (status != PJ_EPENDING) { 636 on_ write_complete(tcp->key, op_key, size);576 on_data_sent(tcp->asock, op_key, size); 637 577 } 638 578 … … 694 634 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 695 635 696 on_ write_complete(tcp->key, op_key, -reason);636 on_data_sent(tcp->asock, op_key, -reason); 697 637 } 698 638 … … 702 642 } 703 643 704 if (tcp-> key) {705 pj_ ioqueue_unregister(tcp->key);706 tcp-> key= NULL;644 if (tcp->asock) { 645 pj_activesock_close(tcp->asock); 646 tcp->asock = NULL; 707 647 tcp->sock = PJ_INVALID_SOCKET; 708 } 709 710 if (tcp->sock != PJ_INVALID_SOCKET) { 648 } else if (tcp->sock != PJ_INVALID_SOCKET) { 711 649 pj_sock_close(tcp->sock); 712 650 tcp->sock = PJ_INVALID_SOCKET; … … 760 698 pj_ssize_t size; 761 699 pj_sockaddr_in *rem_addr; 700 void *readbuf[1]; 762 701 pj_status_t status; 763 702 … … 788 727 789 728 size = sizeof(tcp->rdata.pkt_info.packet); 790 status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key,791 tcp->rdata.pkt_info.packet, &size,792 PJ_IOQUEUE_ALWAYS_ASYNC);729 readbuf[0] = tcp->rdata.pkt_info.packet; 730 status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size, 731 readbuf, 0); 793 732 if (status != PJ_SUCCESS && status != PJ_EPENDING) { 794 PJ_LOG(4, (tcp->base.obj_name, "ioqueue recv() error, status=%d", 733 PJ_LOG(4, (tcp->base.obj_name, 734 "pj_activesock_start_read() error, status=%d", 795 735 status)); 796 736 return status; … … 862 802 /* Start asynchronous connect() operation */ 863 803 tcp->has_pending_connect = PJ_TRUE; 864 status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in)); 804 status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr, 805 sizeof(pj_sockaddr_in)); 865 806 if (status == PJ_SUCCESS) { 866 807 tcp->has_pending_connect = PJ_FALSE; … … 874 815 */ 875 816 addr_len = sizeof(pj_sockaddr_in); 876 if (pj_sock_getsockname( tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) {817 if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) { 877 818 pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; 878 819 … … 909 850 910 851 /* 911 * This callback is called by ioqueue when pending accept() operation has 912 * completed. 913 */ 914 static void on_accept_complete( pj_ioqueue_key_t *key, 915 pj_ioqueue_op_key_t *op_key, 916 pj_sock_t sock, 917 pj_status_t status) 918 { 919 handle_accept(key, op_key, sock, status); 920 } 921 922 923 /* Handle accept() completion */ 924 static pj_status_t handle_accept(pj_ioqueue_key_t *key, 925 pj_ioqueue_op_key_t *op_key, 926 pj_sock_t sock, 927 pj_status_t status) 852 * This callback is called by active socket when pending accept() operation 853 * has completed. 854 */ 855 static pj_bool_t on_accept_complete(pj_activesock_t *asock, 856 pj_sock_t sock, 857 const pj_sockaddr_t *src_addr, 858 int src_addr_len) 928 859 { 929 860 struct tcp_listener *listener; 930 861 struct tcp_transport *tcp; 931 struct pending_accept *accept_op; 932 int err_cnt = 0; 933 934 listener = (struct tcp_listener*) pj_ioqueue_get_user_data(key); 935 accept_op = (struct pending_accept*) op_key; 936 937 /* 938 * Loop while there is immediate connection or when there is error. 862 char addr[PJ_INET6_ADDRSTRLEN+10]; 863 pj_status_t status; 864 865 PJ_UNUSED_ARG(src_addr_len); 866 867 listener = (struct tcp_listener*) pj_activesock_get_user_data(asock); 868 869 PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE); 870 871 PJ_LOG(4,(listener->factory.obj_name, 872 "TCP listener %.*s:%d: got incoming TCP connection " 873 "from %s, sock=%d", 874 (int)listener->factory.addr_name.host.slen, 875 listener->factory.addr_name.host.ptr, 876 listener->factory.addr_name.port, 877 pj_sockaddr_print(src_addr, addr, sizeof(addr), 3), 878 sock)); 879 880 /* 881 * Incoming connection! 882 * Create TCP transport for the new socket. 939 883 */ 940 do { 941 if (status == PJ_EPENDING) { 942 /* 943 * This can only happen when this function is called during 944 * initialization to kick off asynchronous accept(). 945 */ 946 947 } else if (status != PJ_SUCCESS) { 948 949 /* 950 * Error in accept(). 951 */ 952 tcp_perror(listener->factory.obj_name, "Error in accept()", 953 status); 954 955 /* 956 * Prevent endless accept() error loop by limiting the 957 * number of consecutive errors. Once the number of errors 958 * is equal to maximum, we treat this as permanent error, and 959 * we stop the accept() operation. 960 */ 961 ++err_cnt; 962 if (err_cnt >= 20) { 963 PJ_LOG(1, (listener->factory.obj_name, 964 "Too many errors, LISTENER IS STOPPING!")); 965 return status; 884 status = tcp_create( listener, NULL, sock, PJ_TRUE, 885 (const pj_sockaddr_in*)&listener->factory.local_addr, 886 (const pj_sockaddr_in*)src_addr, &tcp); 887 if (status == PJ_SUCCESS) { 888 status = tcp_start_read(tcp); 889 if (status != PJ_SUCCESS) { 890 PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); 891 tcp_destroy(&tcp->base, status); 892 } else { 893 /* Start keep-alive timer */ 894 if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) { 895 pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0}; 896 pjsip_endpt_schedule_timer(listener->endpt, 897 &tcp->ka_timer, 898 &delay); 899 tcp->ka_timer.id = PJ_TRUE; 900 pj_gettimeofday(&tcp->last_activity); 966 901 } 967 968 } else {969 pj_pool_t *pool;970 struct pending_accept *new_op;971 972 if (sock == PJ_INVALID_SOCKET) {973 sock = accept_op->new_sock;974 }975 976 if (sock == PJ_INVALID_SOCKET) {977 pj_assert(!"Should not happen. status should be error");978 goto next_accept;979 }980 981 PJ_LOG(4,(listener->factory.obj_name,982 "TCP listener %.*s:%d: got incoming TCP connection "983 "from %s:%d, sock=%d",984 (int)listener->factory.addr_name.host.slen,985 listener->factory.addr_name.host.ptr,986 listener->factory.addr_name.port,987 pj_inet_ntoa(accept_op->remote_addr.sin_addr),988 pj_ntohs(accept_op->remote_addr.sin_port),989 sock));990 991 /* Create new accept_opt */992 pool = pjsip_endpt_create_pool(listener->endpt, "tcps%p",993 POOL_TP_INIT, POOL_TP_INC);994 new_op = PJ_POOL_ZALLOC_T(pool, struct pending_accept);995 new_op->pool = pool;996 new_op->listener = listener;997 new_op->index = accept_op->index;998 pj_ioqueue_op_key_init(&new_op->op_key, sizeof(new_op->op_key));999 listener->accept_op[accept_op->index] = new_op;1000 1001 /*1002 * Incoming connections!1003 * Create TCP transport for the new socket.1004 */1005 status = tcp_create( listener, accept_op->pool, sock, PJ_TRUE,1006 &accept_op->local_addr,1007 &accept_op->remote_addr, &tcp);1008 if (status == PJ_SUCCESS) {1009 status = tcp_start_read(tcp);1010 if (status != PJ_SUCCESS) {1011 PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));1012 tcp_destroy(&tcp->base, status);1013 } else {1014 /* Start keep-alive timer */1015 if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {1016 pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0};1017 pjsip_endpt_schedule_timer(listener->endpt,1018 &tcp->ka_timer,1019 &delay);1020 tcp->ka_timer.id = PJ_TRUE;1021 pj_gettimeofday(&tcp->last_activity);1022 }1023 }1024 }1025 1026 accept_op = new_op;1027 902 } 1028 1029 next_accept: 1030 /* 1031 * Start the next asynchronous accept() operation. 1032 */ 1033 accept_op->addr_len = sizeof(pj_sockaddr_in); 1034 accept_op->new_sock = PJ_INVALID_SOCKET; 1035 1036 status = pj_ioqueue_accept(listener->key, 1037 &accept_op->op_key, 1038 &accept_op->new_sock, 1039 &accept_op->local_addr, 1040 &accept_op->remote_addr, 1041 &accept_op->addr_len); 1042 1043 /* 1044 * Loop while we have immediate connection or when there is error. 1045 */ 1046 1047 } while (status != PJ_EPENDING); 1048 1049 return PJ_SUCCESS; 903 } 904 905 return PJ_TRUE; 1050 906 } 1051 907 … … 1054 910 * Callback from ioqueue when packet is sent. 1055 911 */ 1056 static void on_write_complete(pj_ioqueue_key_t *key,1057 pj_ioqueue_op_key_t *op_key, 1058 912 static pj_bool_t on_data_sent(pj_activesock_t *asock, 913 pj_ioqueue_op_key_t *op_key, 914 pj_ssize_t bytes_sent) 1059 915 { 1060 916 struct tcp_transport *tcp = (struct tcp_transport*) 1061 pj_ ioqueue_get_user_data(key);917 pj_activesock_get_user_data(asock); 1062 918 pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; 1063 919 … … 1079 935 /* Mark last activity time */ 1080 936 pj_gettimeofday(&tcp->last_activity); 937 1081 938 } 1082 939 … … 1092 949 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 1093 950 pjsip_transport_shutdown(&tcp->base); 1094 } 1095 951 952 return PJ_FALSE; 953 } 954 955 return PJ_TRUE; 1096 956 } 1097 957 … … 1167 1027 */ 1168 1028 size = tdata->buf.cur - tdata->buf.start; 1169 status = pj_ ioqueue_send(tcp->key,1170 (pj_ioqueue_op_key_t*)&tdata->op_key,1171 tdata->buf.start, &size, 0);1029 status = pj_activesock_send(tcp->asock, 1030 (pj_ioqueue_op_key_t*)&tdata->op_key, 1031 tdata->buf.start, &size, 0); 1172 1032 1173 1033 if (status != PJ_EPENDING) { … … 1213 1073 * Callback from ioqueue that an incoming data is received from the socket. 1214 1074 */ 1215 static void on_read_complete(pj_ioqueue_key_t *key, 1216 pj_ioqueue_op_key_t *op_key, 1217 pj_ssize_t bytes_read) 1075 static pj_bool_t on_data_read(pj_activesock_t *asock, 1076 void *data, 1077 pj_size_t size, 1078 pj_status_t status, 1079 pj_size_t *remainder) 1218 1080 { 1219 1081 enum { MAX_IMMEDIATE_PACKET = 10 }; 1220 pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; 1221 pjsip_rx_data *rdata = rdata_op_key->rdata; 1222 struct tcp_transport *tcp = 1223 (struct tcp_transport*)rdata->tp_info.transport; 1224 int i; 1225 pj_status_t status; 1082 struct tcp_transport *tcp; 1083 pjsip_rx_data *rdata; 1084 1085 PJ_UNUSED_ARG(data); 1086 1087 tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock); 1088 rdata = &tcp->rdata; 1226 1089 1227 1090 /* Don't do anything if transport is closing. */ 1228 1091 if (tcp->is_closing) { 1229 1092 tcp->is_closing++; 1230 return; 1231 } 1232 1233 /* 1234 * The idea of the loop is to process immediate data received by 1235 * pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When 1236 * i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to 1237 * complete asynchronously, to allow other sockets to get their data. 1093 return PJ_FALSE; 1094 } 1095 1096 /* Houston, we have packet! Report the packet to transport manager 1097 * to be parsed. 1238 1098 */ 1239 for (i=0;; ++i) { 1240 pj_uint32_t flags; 1241 1242 /* Houston, we have packet! Report the packet to transport manager 1243 * to be parsed. 1099 if (status == PJ_SUCCESS) { 1100 pj_size_t size_eaten; 1101 1102 /* Mark this as an activity */ 1103 pj_gettimeofday(&tcp->last_activity); 1104 1105 pj_assert((void*)rdata->pkt_info.packet == data); 1106 1107 /* Init pkt_info part. */ 1108 rdata->pkt_info.len = size; 1109 rdata->pkt_info.zero = 0; 1110 pj_gettimeofday(&rdata->pkt_info.timestamp); 1111 1112 /* Report to transport manager. 1113 * The transport manager will tell us how many bytes of the packet 1114 * have been processed (as valid SIP message). 1244 1115 */ 1245 if (bytes_read > 0) { 1246 pj_size_t size_eaten; 1247 1248 /* Mark this as an activity */ 1249 pj_gettimeofday(&tcp->last_activity); 1250 1251 /* Init pkt_info part. */ 1252 rdata->pkt_info.len += bytes_read; 1253 rdata->pkt_info.zero = 0; 1254 pj_gettimeofday(&rdata->pkt_info.timestamp); 1255 1256 /* Report to transport manager. 1257 * The transport manager will tell us how many bytes of the packet 1258 * have been processed (as valid SIP message). 1259 */ 1260 size_eaten = 1261 pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, 1262 rdata); 1263 1264 pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); 1265 1266 /* Move unprocessed data to the front of the buffer */ 1267 if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) { 1268 pj_memmove(rdata->pkt_info.packet, 1269 rdata->pkt_info.packet + size_eaten, 1270 rdata->pkt_info.len - size_eaten); 1271 } 1272 1273 rdata->pkt_info.len -= size_eaten; 1274 1275 } else if (bytes_read == 0) { 1276 1277 /* Transport is closed */ 1278 PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); 1279 1280 /* We can not destroy the transport since high level objects may 1281 * still keep reference to this transport. So we can only 1282 * instruct transport manager to gracefully start the shutdown 1283 * procedure for this transport. 1284 */ 1285 if (tcp->close_reason==PJ_SUCCESS) 1286 tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); 1287 pjsip_transport_shutdown(&tcp->base); 1288 1289 return; 1290 1291 //} else if (bytes_read < 0) { 1292 } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && 1293 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && 1294 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) 1295 { 1296 1297 /* Socket error. */ 1298 PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset")); 1299 1300 /* We can not destroy the transport since high level objects may 1301 * still keep reference to this transport. So we can only 1302 * instruct transport manager to gracefully start the shutdown 1303 * procedure for this transport. 1304 */ 1305 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read; 1306 pjsip_transport_shutdown(&tcp->base); 1307 1308 return; 1116 size_eaten = 1117 pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, 1118 rdata); 1119 1120 pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); 1121 1122 /* Move unprocessed data to the front of the buffer */ 1123 *remainder = size - size_eaten; 1124 if (*remainder > 0 && *remainder != size) { 1125 pj_memmove(rdata->pkt_info.packet, 1126 rdata->pkt_info.packet + size_eaten, 1127 *remainder); 1309 1128 } 1310 1129 1311 if (i >= MAX_IMMEDIATE_PACKET) { 1312 /* Receive quota reached. Force ioqueue_recv() to 1313 * return PJ_EPENDING 1314 */ 1315 flags = PJ_IOQUEUE_ALWAYS_ASYNC; 1316 } else { 1317 flags = 0; 1318 } 1319 1320 /* Reset pool. */ 1321 pj_pool_reset(rdata->tp_info.pool); 1322 1323 /* Read next packet. */ 1324 bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len; 1325 rdata->pkt_info.src_addr_len = sizeof(pj_sockaddr_in); 1326 status = pj_ioqueue_recv(key, op_key, 1327 rdata->pkt_info.packet+rdata->pkt_info.len, 1328 &bytes_read, flags); 1329 1330 if (status == PJ_SUCCESS) { 1331 1332 /* Continue loop. */ 1333 pj_assert(i < MAX_IMMEDIATE_PACKET); 1334 1335 } else if (status == PJ_EPENDING) { 1336 break; 1337 1338 } else { 1339 /* Socket error */ 1340 PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset")); 1341 1342 /* We can not destroy the transport since high level objects may 1343 * still keep reference to this transport. So we can only 1344 * instruct transport manager to gracefully start the shutdown 1345 * procedure for this transport. 1346 */ 1347 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 1348 pjsip_transport_shutdown(&tcp->base); 1349 1350 return; 1351 } 1352 } 1130 } else { 1131 1132 /* Transport is closed */ 1133 PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); 1134 1135 /* We can not destroy the transport since high level objects may 1136 * still keep reference to this transport. So we can only 1137 * instruct transport manager to gracefully start the shutdown 1138 * procedure for this transport. 1139 */ 1140 if (tcp->close_reason==PJ_SUCCESS) 1141 tcp->close_reason = status; 1142 pjsip_transport_shutdown(&tcp->base); 1143 1144 return PJ_FALSE; 1145 1146 } 1147 1148 /* Reset pool. */ 1149 pj_pool_reset(rdata->tp_info.pool); 1150 1151 return PJ_TRUE; 1353 1152 } 1354 1153 … … 1357 1156 * Callback from ioqueue when asynchronous connect() operation completes. 1358 1157 */ 1359 static void on_connect_complete(pj_ioqueue_key_t *key,1360 1158 static pj_bool_t on_connect_complete(pj_activesock_t *asock, 1159 pj_status_t status) 1361 1160 { 1362 1161 struct tcp_transport *tcp; … … 1364 1163 int addrlen; 1365 1164 1366 tcp = (struct tcp_transport*) pj_ ioqueue_get_user_data(key);1165 tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock); 1367 1166 1368 1167 /* Mark that pending connect() operation has completed. */ … … 1384 1183 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; 1385 1184 1386 on_ write_complete(tcp->key, op_key, -status);1185 on_data_sent(tcp->asock, op_key, -status); 1387 1186 } 1388 1187 … … 1394 1193 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 1395 1194 pjsip_transport_shutdown(&tcp->base); 1396 return ;1195 return PJ_FALSE; 1397 1196 } 1398 1197 … … 1433 1232 if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; 1434 1233 pjsip_transport_shutdown(&tcp->base); 1435 return ;1234 return PJ_FALSE; 1436 1235 } 1437 1236 … … 1447 1246 pj_gettimeofday(&tcp->last_activity); 1448 1247 } 1248 1249 return PJ_TRUE; 1449 1250 } 1450 1251 … … 1483 1284 /* Send the data */ 1484 1285 size = tcp->ka_pkt.slen; 1485 status = pj_ ioqueue_send(tcp->key, &tcp->ka_op_key.key,1486 1286 status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key, 1287 tcp->ka_pkt.ptr, &size, 0); 1487 1288 1488 1289 if (status != PJ_SUCCESS && status != PJ_EPENDING) { -
pjproject/trunk/pjsip/src/test-pjsip/transport_tcp_test.c
r2039 r2188 84 84 /* Check again that reference counter is 1. */ 85 85 if (pj_atomic_get(tcp->ref_cnt) != 1) 86 return -70; 86 return -40; 87 88 /* Load test */ 89 if (transport_load_test(url) != 0) 90 return -60; 87 91 88 92 /* Basic transport's send/receive loopback test. */ -
pjproject/trunk/pjsip/src/test-pjsip/transport_test.c
r2039 r2188 642 642 return 0; 643 643 } 644 645 /////////////////////////////////////////////////////////////////////////////// 646 /* 647 * Transport load testing 648 */ 649 static pj_bool_t load_on_rx_request(pjsip_rx_data *rdata); 650 651 static struct mod_load_test 652 { 653 pjsip_module mod; 654 pj_uint32_t next_seq; 655 pj_bool_t err; 656 } mod_load = 657 { 658 { 659 NULL, NULL, /* prev and next */ 660 { "mod-load-test", 13}, /* Name. */ 661 -1, /* Id */ 662 PJSIP_MOD_PRIORITY_TSX_LAYER-1, /* Priority */ 663 NULL, /* load() */ 664 NULL, /* start() */ 665 NULL, /* stop() */ 666 NULL, /* unload() */ 667 &load_on_rx_request, /* on_rx_request() */ 668 NULL, /* on_rx_response() */ 669 NULL, /* tsx_handler() */ 670 } 671 }; 672 673 674 static pj_bool_t load_on_rx_request(pjsip_rx_data *rdata) 675 { 676 if (rdata->msg_info.cseq->cseq != mod_load.next_seq) { 677 PJ_LOG(1,("THIS_FILE", " err: expecting cseq %u, got %u", 678 mod_load.next_seq, rdata->msg_info.cseq->cseq)); 679 mod_load.err = PJ_TRUE; 680 mod_load.next_seq = rdata->msg_info.cseq->cseq + 1; 681 } else 682 mod_load.next_seq++; 683 return PJ_TRUE; 684 } 685 686 int transport_load_test(char *target_url) 687 { 688 enum { COUNT = 2000 }; 689 unsigned i; 690 pj_status_t status; 691 692 /* exhaust packets */ 693 do { 694 pj_time_val delay = {1, 0}; 695 i = 0; 696 pjsip_endpt_handle_events2(endpt, &delay, &i); 697 } while (i != 0); 698 699 PJ_LOG(3,(THIS_FILE, " transport load test...")); 700 701 if (mod_load.mod.id == -1) { 702 status = pjsip_endpt_register_module( endpt, &mod_load.mod); 703 if (status != PJ_SUCCESS) { 704 app_perror("error registering module", status); 705 return -1; 706 } 707 } 708 mod_load.err = PJ_FALSE; 709 mod_load.next_seq = 0; 710 711 for (i=0; i<COUNT && !mod_load.err; ++i) { 712 pj_str_t target, from, call_id; 713 pjsip_tx_data *tdata; 714 715 target = pj_str(target_url); 716 from = pj_str("<sip:user@host>"); 717 call_id = pj_str("thecallid"); 718 status = pjsip_endpt_create_request(endpt, &pjsip_invite_method, 719 &target, &from, 720 &target, &from, &call_id, 721 i, NULL, &tdata ); 722 if (status != PJ_SUCCESS) { 723 app_perror("error creating request", status); 724 goto on_return; 725 } 726 727 status = pjsip_endpt_send_request_stateless(endpt, tdata, NULL, NULL); 728 if (status != PJ_SUCCESS) { 729 app_perror("error sending request", status); 730 goto on_return; 731 } 732 } 733 734 do { 735 pj_time_val delay = {1, 0}; 736 i = 0; 737 pjsip_endpt_handle_events2(endpt, &delay, &i); 738 } while (i != 0); 739 740 if (mod_load.next_seq != COUNT) { 741 PJ_LOG(1,("THIS_FILE", " err: expecting %u msg, got only %u", 742 COUNT, mod_load.next_seq)); 743 status = -2; 744 goto on_return; 745 } 746 747 on_return: 748 if (mod_load.mod.id != -1) { 749 pjsip_endpt_unregister_module( endpt, &mod_load.mod); 750 mod_load.mod.id = -1; 751 } 752 if (status != PJ_SUCCESS || mod_load.err) { 753 return -2; 754 } 755 PJ_LOG(3,(THIS_FILE, " success")); 756 return 0; 757 } 758 759
Note: See TracChangeset
for help on using the changeset viewer.