Changeset 54 for pjproject/trunk/pjsip/src/pjsip/sip_transport.c
- Timestamp:
- Nov 18, 2005 10:43:42 PM (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjsip/src/pjsip/sip_transport.c
r51 r54 30 30 #include <pj/pool.h> 31 31 #include <pj/assert.h> 32 33 #define MGR_IDLE_CHECK_INTERVAL 30 34 #define MGR_HASH_TABLE_SIZE PJSIP_MAX_DIALOG_COUNT 35 #define BACKLOG 5 36 #define DEFAULT_SO_SNDBUF (8 * 1024 * 1024) 37 #define DEFAULT_SO_RCVBUF (8 * 1024 * 1024) 38 39 #define LOG_TRANSPORT_MGR "trmgr" 40 #define THIS_FILE "sip_transport" 41 42 static void destroy_transport( pjsip_transport_mgr *mgr, pjsip_transport_t *tr ); 43 44 45 /** 46 * New TCP socket for accept. 47 */ 48 typedef struct incoming_socket_rec 49 { 50 pj_sock_t sock; 51 pj_sockaddr_in remote; 52 pj_sockaddr_in local; 53 int addrlen; 54 } incoming_socket_rec; 55 56 /** 57 * SIP Transport. 58 */ 59 struct pjsip_transport_t 60 { 61 /** Standard list members, for chaining the transport in the 62 * listener list. 63 */ 64 PJ_DECL_LIST_MEMBER(struct pjsip_transport_t); 65 66 /** Transport's pool. */ 67 pj_pool_t *pool; 68 69 /** Mutex */ 70 pj_mutex_t *tr_mutex; 71 72 /** Transport name for logging purpose */ 73 char obj_name[PJ_MAX_OBJ_NAME]; 74 75 /** Socket handle */ 76 pj_sock_t sock; 77 78 /** Transport type. */ 79 pjsip_transport_type_e type; 80 81 /** Flags to keep various states (see pjsip_transport_flags_e). */ 82 pj_uint32_t flag; 83 84 /** I/O Queue key */ 85 pj_ioqueue_key_t *key; 86 87 /** Accept key. */ 88 pj_ioqueue_op_key_t accept_op; 89 90 /** Receive data buffer */ 91 pjsip_rx_data *rdata; 92 93 /** Pointer to transport manager */ 94 pjsip_transport_mgr *mgr; 95 96 /** Reference counter, to prevent this transport from being closed while 97 * it's being used. 98 */ 99 pj_atomic_t *ref_cnt; 100 101 /** Local address. */ 102 pj_sockaddr_in local_addr; 103 104 /** Address name (what to put in Via address field). */ 105 pj_sockaddr_in addr_name; 106 107 /** Remote address (can be zero for UDP and for listeners). UDP listener 108 * bound to local loopback interface (127.0.0.1) has remote address set 109 * to 127.0.0.1 to prevent client from using it to send to remote hosts, 110 * because remote host then will receive 127.0.0.1 as the packet's 111 * source address. 112 */ 113 pj_sockaddr_in remote_addr; 114 115 /** Struct to save incoming socket information. */ 116 incoming_socket_rec accept_data; 117 118 /** When this transport should be closed. */ 119 pj_time_val close_time; 120 121 /** List of callbacks to be called when client attempt to use this 122 * transport while it's not connected (i.e. still connecting). 123 */ 124 pj_list cb_list; 125 }; 126 32 #include <pj/lock.h> 33 34 35 #define THIS_FILE "transport" 127 36 128 37 /* 129 38 * Transport manager. 130 39 */ 131 struct pjsip_t ransport_mgr132 { 133 pj_hash_table_t *t ransport_table;134 pj_ mutex_t *mutex;40 struct pjsip_tpmgr 41 { 42 pj_hash_table_t *table; 43 pj_lock_t *lock; 135 44 pjsip_endpoint *endpt; 136 45 pj_ioqueue_t *ioqueue; 137 pj_time_val next_idle_check; 138 pj_size_t send_buf_size; 139 pj_size_t recv_buf_size; 140 void (*message_callback)(pjsip_endpoint*, pjsip_rx_data *rdata); 46 pj_timer_heap_t *timer_heap; 47 pjsip_tpfactory factory_list; 48 void (*msg_cb)(pjsip_endpoint*, pj_status_t, pjsip_rx_data*); 141 49 }; 142 50 143 /* 144 * Transport role. 145 */ 146 typedef enum transport_role_e 147 { 148 TRANSPORT_ROLE_LISTENER, 149 TRANSPORT_ROLE_TRANSPORT, 150 } transport_role_e; 51 52 53 /***************************************************************************** 54 * 55 * GENERAL TRANSPORT (NAMES, TYPES, ETC.) 56 * 57 *****************************************************************************/ 58 59 /* 60 * Transport names. 61 */ 62 const struct 63 { 64 pjsip_transport_type_e type; 65 pj_uint16_t port; 66 pj_str_t name; 67 unsigned flag; 68 } transport_names[] = 69 { 70 { PJSIP_TRANSPORT_UNSPECIFIED, 0, {NULL, 0}, 0}, 71 { PJSIP_TRANSPORT_UDP, 5060, {"UDP", 3}, PJSIP_TRANSPORT_DATAGRAM}, 72 { PJSIP_TRANSPORT_TCP, 5060, {"TCP", 3}, PJSIP_TRANSPORT_RELIABLE}, 73 { PJSIP_TRANSPORT_TLS, 5061, {"TLS", 3}, PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE}, 74 { PJSIP_TRANSPORT_SCTP, 5060, {"SCTP", 4}, PJSIP_TRANSPORT_RELIABLE} 75 }; 76 77 78 /* 79 * Get transport type from name. 80 */ 81 PJ_DEF(pjsip_transport_type_e) 82 pjsip_transport_get_type_from_name(const pj_str_t *name) 83 { 84 unsigned i; 85 86 for (i=0; i<PJ_ARRAY_SIZE(transport_names); ++i) { 87 if (pj_stricmp(name, &transport_names[i].name) == 0) { 88 return transport_names[i].type; 89 } 90 } 91 92 pj_assert(!"Invalid transport name"); 93 return PJSIP_TRANSPORT_UNSPECIFIED; 94 } 95 96 97 /* 98 * Get the transport type for the specified flags. 99 */ 100 PJ_DEF(pjsip_transport_type_e) 101 pjsip_transport_get_type_from_flag(unsigned flag) 102 { 103 unsigned i; 104 105 for (i=0; i<PJ_ARRAY_SIZE(transport_names); ++i) { 106 if (transport_names[i].flag == flag) { 107 return transport_names[i].type; 108 } 109 } 110 111 pj_assert(!"Invalid transport type"); 112 return PJSIP_TRANSPORT_UNSPECIFIED; 113 } 114 115 PJ_DEF(unsigned) 116 pjsip_transport_get_flag_from_type( pjsip_transport_type_e type ) 117 { 118 PJ_ASSERT_RETURN(type < PJ_ARRAY_SIZE(transport_names), 0); 119 return transport_names[type].flag; 120 } 121 122 /* 123 * Get the default SIP port number for the specified type. 124 */ 125 PJ_DEF(int) 126 pjsip_transport_get_default_port_for_type(pjsip_transport_type_e type) 127 { 128 PJ_ASSERT_RETURN(type < PJ_ARRAY_SIZE(transport_names), 5060); 129 return transport_names[type].port; 130 } 131 132 133 /***************************************************************************** 134 * 135 * TRANSMIT DATA BUFFER MANIPULATION. 136 * 137 *****************************************************************************/ 138 139 /* 140 * Create new transmit buffer. 141 */ 142 PJ_DEF(pj_status_t) pjsip_tx_data_create( pjsip_tpmgr *mgr, 143 pjsip_tx_data **p_tdata ) 144 { 145 pj_pool_t *pool; 146 pjsip_tx_data *tdata; 147 pj_status_t status; 148 149 PJ_ASSERT_RETURN(mgr && p_tdata, PJ_EINVAL); 150 151 PJ_LOG(5, ("", "pjsip_tx_data_create")); 152 153 pool = pjsip_endpt_create_pool( mgr->endpt, "tdta%p", 154 PJSIP_POOL_LEN_TDATA, 155 PJSIP_POOL_INC_TDATA ); 156 if (!pool) 157 return PJ_ENOMEM; 158 159 tdata = pj_pool_zalloc(pool, sizeof(pjsip_tx_data)); 160 tdata->pool = pool; 161 tdata->mgr = mgr; 162 pj_snprintf(tdata->obj_name, PJ_MAX_OBJ_NAME, "tdta%p", tdata); 163 164 status = pj_atomic_create(tdata->pool, 0, &tdata->ref_cnt); 165 if (status != PJ_SUCCESS) { 166 pjsip_endpt_destroy_pool( mgr->endpt, tdata->pool ); 167 return status; 168 } 169 170 //status = pj_lock_create_simple_mutex(pool, "tdta%p", &tdata->lock); 171 status = pj_lock_create_null_mutex(pool, "tdta%p", &tdata->lock); 172 if (status != PJ_SUCCESS) { 173 pjsip_endpt_destroy_pool( mgr->endpt, tdata->pool ); 174 return status; 175 } 176 177 pj_ioqueue_op_key_init(&tdata->op_key, sizeof(tdata->op_key)); 178 179 *p_tdata = tdata; 180 return PJ_SUCCESS; 181 } 182 183 184 /* 185 * Add reference to tx buffer. 186 */ 187 PJ_DEF(void) pjsip_tx_data_add_ref( pjsip_tx_data *tdata ) 188 { 189 pj_atomic_inc(tdata->ref_cnt); 190 } 191 192 /* 193 * Decrease transport data reference, destroy it when the reference count 194 * reaches zero. 195 */ 196 PJ_DEF(void) pjsip_tx_data_dec_ref( pjsip_tx_data *tdata ) 197 { 198 pj_assert( pj_atomic_get(tdata->ref_cnt) > 0); 199 if (pj_atomic_dec_and_get(tdata->ref_cnt) <= 0) { 200 PJ_LOG(5,(tdata->obj_name, "destroying txdata")); 201 pj_atomic_destroy( tdata->ref_cnt ); 202 pj_lock_destroy( tdata->lock ); 203 pjsip_endpt_destroy_pool( tdata->mgr->endpt, tdata->pool ); 204 } 205 } 206 207 /* 208 * Invalidate the content of the print buffer to force the message to be 209 * re-printed when sent. 210 */ 211 PJ_DEF(void) pjsip_tx_data_invalidate_msg( pjsip_tx_data *tdata ) 212 { 213 tdata->buf.cur = tdata->buf.start; 214 } 215 216 PJ_DEF(pj_bool_t) pjsip_tx_data_is_valid( pjsip_tx_data *tdata ) 217 { 218 return tdata->buf.cur != tdata->buf.start; 219 } 220 221 222 223 /***************************************************************************** 224 * 225 * TRANSPORT KEY 226 * 227 *****************************************************************************/ 151 228 152 229 /* 153 230 * Transport key for indexing in the hash table. 154 * WATCH OUT FOR ALIGNMENT PROBLEM HERE!155 231 */ 156 232 typedef struct transport_key … … 162 238 } transport_key; 163 239 164 /* 165 * Transport callback. 166 */ 167 struct transport_callback 168 { 169 PJ_DECL_LIST_MEMBER(struct transport_callback); 170 171 /** User defined token to be passed to the callback. */ 172 void *token; 173 174 /** The callback function. */ 175 void (*cb)(pjsip_transport_t *tr, void *token, pj_status_t status); 176 177 }; 178 179 /* 180 * Transport names. 181 */ 182 const struct 183 { 184 pjsip_transport_type_e type; 185 pj_uint16_t port; 186 pj_str_t name; 187 } transport_names[] = 188 { 189 { PJSIP_TRANSPORT_UNSPECIFIED, 0, {NULL, 0}}, 190 { PJSIP_TRANSPORT_UDP, 5060, {"UDP", 3}}, 191 #if PJ_HAS_TCP 192 { PJSIP_TRANSPORT_TCP, 5060, {"TCP", 3}}, 193 { PJSIP_TRANSPORT_TLS, 5061, {"TLS", 3}}, 194 { PJSIP_TRANSPORT_SCTP, 5060, {"SCTP", 4}} 195 #endif 196 }; 197 198 static void on_ioqueue_read(pj_ioqueue_key_t *key, 199 pj_ioqueue_op_key_t *op_key, 200 pj_ssize_t bytes_read); 201 static void on_ioqueue_write(pj_ioqueue_key_t *key, 202 pj_ioqueue_op_key_t *op_key, 203 pj_ssize_t bytes_sent); 204 static void on_ioqueue_accept(pj_ioqueue_key_t *key, 205 pj_ioqueue_op_key_t *op_key, 206 pj_sock_t newsock, 207 int status); 208 static void on_ioqueue_connect(pj_ioqueue_key_t *key, 209 int status); 210 211 static pj_ioqueue_callback ioqueue_transport_callback = 212 { 213 &on_ioqueue_read, 214 &on_ioqueue_write, 215 &on_ioqueue_accept, 216 &on_ioqueue_connect 217 }; 218 219 static void init_key_from_transport(transport_key *key, 220 const pjsip_transport_t *tr) 221 { 222 /* This is to detect alignment problems. */ 223 pj_assert(sizeof(transport_key) == 8); 224 225 key->type = (pj_uint8_t)tr->type; 226 key->zero = 0; 227 key->addr = pj_sockaddr_in_get_addr(&tr->remote_addr).s_addr; 228 key->port = pj_sockaddr_in_get_port(&tr->remote_addr); 229 /* 230 if (key->port == 0) { 231 key->port = pj_sockaddr_in_get_port(&tr->local_addr); 232 } 233 */ 234 } 235 236 #if PJ_HAS_TCP 237 static void init_tcp_key(transport_key *key, pjsip_transport_type_e type, 238 const pj_sockaddr_in *addr) 239 { 240 /* This is to detect alignment problems. */ 241 pj_assert(sizeof(transport_key) == 8); 242 243 key->type = (pj_uint8_t)type; 244 key->zero = 0; 245 key->addr = pj_sockaddr_in_get_addr(addr).s_addr; 246 key->port = pj_sockaddr_in_get_port(addr); 247 } 248 #endif 249 250 static void init_udp_key(transport_key *key, pjsip_transport_type_e type, 251 const pj_sockaddr_in *addr) 252 { 253 PJ_UNUSED_ARG(addr); 254 255 /* This is to detect alignment problems. */ 256 pj_assert(sizeof(transport_key) == 8); 257 258 pj_memset(key, 0, sizeof(*key)); 259 key->type = (pj_uint8_t)type; 260 261 #if 0 /* Not sure why we need to make 127.0.0.1 a special case */ 262 if (addr->sin_addr.s_addr == inet_addr("127.0.0.1")) { 263 /* This looks more complicated than it is because key->addr uses 264 * the host version of the address (i.e. converted with ntohl()). 265 */ 266 pj_str_t localaddr = pj_str("127.0.0.1"); 267 pj_sockaddr_in addr; 268 pj_sockaddr_set_str_addr(&addr, &localaddr); 269 key->addr = pj_sockaddr_in_get_addr(&addr); 270 } 271 #endif 272 } 273 274 /* 275 * Get type format name (for pool name). 276 */ 277 static const char *transport_get_name_format( int type ) 278 { 279 switch (type) { 280 case PJSIP_TRANSPORT_UDP: 281 return " udp%p"; 282 #if PJ_HAS_TCP 283 case PJSIP_TRANSPORT_TCP: 284 return " tcp%p"; 285 case PJSIP_TRANSPORT_TLS: 286 return " tls%p"; 287 case PJSIP_TRANSPORT_SCTP: 288 return "sctp%p"; 289 #endif 290 } 291 pj_assert(0); 292 return 0; 293 } 294 295 /* 296 * Get the default SIP port number for the specified type. 297 */ 298 PJ_DEF(int) pjsip_transport_get_default_port_for_type(pjsip_transport_type_e type) 299 { 300 return transport_names[type].port; 301 } 302 303 /* 304 * Get transport name. 305 */ 306 static const char *get_type_name(int type) 307 { 308 return transport_names[type].name.ptr; 309 } 310 311 /* 312 * Get transport type from name. 313 */ 314 PJ_DEF(pjsip_transport_type_e) 315 pjsip_transport_get_type_from_name(const pj_str_t *name) 316 { 317 unsigned i; 318 319 for (i=0; i<PJ_ARRAY_SIZE(transport_names); ++i) { 320 if (pj_stricmp(name, &transport_names[i].name) == 0) { 321 return transport_names[i].type; 322 } 323 } 324 return PJSIP_TRANSPORT_UNSPECIFIED; 325 } 326 327 /* 328 * Create new transmit buffer. 329 */ 330 pj_status_t pjsip_tx_data_create( pjsip_transport_mgr *mgr, 331 pjsip_tx_data **p_tdata ) 332 { 333 pj_pool_t *pool; 334 pjsip_tx_data *tdata; 240 241 /***************************************************************************** 242 * 243 * TRANSPORT 244 * 245 *****************************************************************************/ 246 247 static void transport_send_callback(pjsip_transport *transport, 248 void *token, 249 pj_status_t status) 250 { 251 pjsip_tx_data *tdata = token; 252 253 PJ_UNUSED_ARG(transport); 254 255 /* Mark pending off so that app can resend/reuse txdata from inside 256 * the callback. 257 */ 258 tdata->is_pending = 0; 259 260 /* Call callback, if any. */ 261 if (tdata->cb) { 262 (*tdata->cb)(tdata->token, tdata, status); 263 } 264 265 /* Decrement reference count. */ 266 pjsip_tx_data_dec_ref(tdata); 267 } 268 269 /* 270 * Send a SIP message using the specified transport. 271 */ 272 PJ_DEF(pj_status_t) pjsip_transport_send( pjsip_transport *tr, 273 pjsip_tx_data *tdata, 274 const pj_sockaddr_in *addr, 275 void *token, 276 void (*cb)(void *token, 277 pjsip_tx_data *tdata, 278 pj_status_t)) 279 { 335 280 pj_status_t status; 336 281 337 PJ_LOG(5, ("", "pjsip_tx_data_create")); 338 339 PJ_ASSERT_RETURN(mgr && p_tdata, PJ_EINVAL); 340 341 pool = pjsip_endpt_create_pool( mgr->endpt, "ptdt%p", 342 PJSIP_POOL_LEN_TDATA, 343 PJSIP_POOL_INC_TDATA ); 344 if (!pool) { 345 return PJ_ENOMEM; 346 } 347 tdata = pj_pool_calloc(pool, 1, sizeof(pjsip_tx_data)); 348 tdata->pool = pool; 349 tdata->mgr = mgr; 350 pj_sprintf(tdata->obj_name,"txd%p", tdata); 351 352 status = pj_atomic_create(tdata->pool, 0, &tdata->ref_cnt); 353 if (status != PJ_SUCCESS) { 354 pjsip_endpt_destroy_pool( mgr->endpt, tdata->pool ); 355 return status; 356 } 357 358 *p_tdata = tdata; 359 return PJ_SUCCESS; 360 } 361 362 /* 363 * Add reference to tx buffer. 364 */ 365 PJ_DEF(void) pjsip_tx_data_add_ref( pjsip_tx_data *tdata ) 366 { 367 pj_atomic_inc(tdata->ref_cnt); 368 } 369 370 /* 371 * Decrease transport data reference, destroy it when the reference count 372 * reaches zero. 373 */ 374 PJ_DEF(void) pjsip_tx_data_dec_ref( pjsip_tx_data *tdata ) 375 { 376 pj_assert( pj_atomic_get(tdata->ref_cnt) > 0); 377 if (pj_atomic_dec_and_get(tdata->ref_cnt) <= 0) { 378 PJ_LOG(6,(tdata->obj_name, "destroying txdata")); 379 pj_atomic_destroy( tdata->ref_cnt ); 380 pjsip_endpt_destroy_pool( tdata->mgr->endpt, tdata->pool ); 381 } 382 } 383 384 /* 385 * Invalidate the content of the print buffer to force the message to be 386 * re-printed when sent. 387 */ 388 PJ_DEF(void) pjsip_tx_data_invalidate_msg( pjsip_tx_data *tdata ) 389 { 390 tdata->buf.cur = tdata->buf.start; 391 } 392 393 /* 394 * Get the transport type. 395 */ 396 PJ_DEF(pjsip_transport_type_e) pjsip_transport_get_type( const pjsip_transport_t * tr) 397 { 398 return tr->type; 399 } 400 401 /* 402 * Get transport type from transport flag. 403 */ 404 PJ_DEF(pjsip_transport_type_e) pjsip_get_transport_type_from_flag(unsigned flag) 405 { 406 #if PJ_HAS_TCP 407 if (flag & PJSIP_TRANSPORT_SECURE) { 408 return PJSIP_TRANSPORT_TLS; 409 } else if (flag & PJSIP_TRANSPORT_RELIABLE) { 410 return PJSIP_TRANSPORT_TCP; 411 } else 412 #else 413 PJ_UNUSED_ARG(flag); 414 #endif 415 { 416 return PJSIP_TRANSPORT_UDP; 417 } 418 } 419 420 /* 421 * Get the transport type name. 422 */ 423 PJ_DEF(const char *) pjsip_transport_get_type_name( const pjsip_transport_t * tr) 424 { 425 return get_type_name(tr->type); 426 } 427 428 /* 429 * Get the transport's object name. 430 */ 431 PJ_DEF(const char*) pjsip_transport_get_obj_name( const pjsip_transport_t *tr ) 432 { 433 return tr->obj_name; 434 } 435 436 /* 437 * Get the transport's reference counter. 438 */ 439 PJ_DEF(int) pjsip_transport_get_ref_cnt( const pjsip_transport_t *tr ) 440 { 441 return pj_atomic_get(tr->ref_cnt); 442 } 443 444 /* 445 * Get transport local address. 446 */ 447 PJ_DEF(const pj_sockaddr_in*) pjsip_transport_get_local_addr( pjsip_transport_t *tr ) 448 { 449 return &tr->local_addr; 450 } 451 452 /* 453 * Get address name. 454 */ 455 PJ_DEF(const pj_sockaddr_in*) pjsip_transport_get_addr_name (pjsip_transport_t *tr) 456 { 457 return &tr->addr_name; 458 } 459 460 /* 461 * Get transport remote address. 462 */ 463 PJ_DEF(const pj_sockaddr_in*) pjsip_transport_get_remote_addr( const pjsip_transport_t *tr ) 464 { 465 return &tr->remote_addr; 466 } 467 468 /* 469 * Get transport flag. 470 */ 471 PJ_DEF(unsigned) pjsip_transport_get_flag( const pjsip_transport_t * tr ) 472 { 473 return tr->flag; 474 } 475 476 /* 477 * Add reference to the specified transport. 478 */ 479 PJ_DEF(void) pjsip_transport_add_ref( pjsip_transport_t * tr ) 480 { 481 pj_atomic_inc(tr->ref_cnt); 482 } 483 484 /* 485 * Decrease the reference time of the transport. 486 */ 487 PJ_DEF(void) pjsip_transport_dec_ref( pjsip_transport_t *tr ) 488 { 489 pj_assert(tr->ref_cnt > 0); 490 if (pj_atomic_dec_and_get(tr->ref_cnt) == 0) { 491 pj_gettimeofday(&tr->close_time); 492 tr->close_time.sec += PJSIP_TRANSPORT_CLOSE_TIMEOUT; 493 } 494 } 495 496 /* 497 * Open the underlying transport. 498 */ 499 static pj_status_t create_socket( pjsip_transport_type_e type, 500 pj_sockaddr_in *local, 501 pj_sock_t *p_sock) 502 { 503 int sock_family; 504 int sock_type; 505 int sock_proto; 506 int len; 507 pj_status_t status; 508 pj_sock_t sock; 509 510 /* Set socket parameters */ 511 if (type == PJSIP_TRANSPORT_UDP) { 512 sock_family = PJ_AF_INET; 513 sock_type = PJ_SOCK_DGRAM; 514 sock_proto = 0; 515 516 #if PJ_HAS_TCP 517 } else if (type == PJSIP_TRANSPORT_TCP) { 518 sock_family = PJ_AF_INET; 519 sock_type = PJ_SOCK_STREAM; 520 sock_proto = 0; 521 #endif 522 } else { 523 return PJ_EINVAL; 524 } 525 526 /* Create socket. */ 527 status = pj_sock_socket( sock_family, sock_type, sock_proto, &sock); 528 if (status != PJ_SUCCESS) 529 return status; 530 531 /* Bind the socket to the requested address, or if no address is 532 * specified, let the operating system chooses the address. 533 */ 534 if (/*local->sin_addr.s_addr != 0 &&*/ local->sin_port != 0) { 535 /* Bind to the requested address. */ 536 status = pj_sock_bind(sock, local, sizeof(*local)); 537 if (status != PJ_SUCCESS) { 538 pj_sock_close(sock); 539 return status; 540 } 541 } else if (type == PJSIP_TRANSPORT_UDP) { 542 /* Only for UDP sockets: bind to any address so that the operating 543 * system allocates the port for us. For TCP, let the OS implicitly 544 * bind the socket with connect() syscall (if we bind now, then we'll 545 * get 0.0.0.0 as local address). 546 */ 547 pj_memset(local, 0, sizeof(*local)); 548 local->sin_family = PJ_AF_INET; 549 status = pj_sock_bind(sock, local, sizeof(*local)); 550 if (status != PJ_SUCCESS) { 551 pj_sock_close(sock); 552 return status; 553 } 554 555 /* Get the local address. */ 556 len = sizeof(pj_sockaddr_in); 557 status = pj_sock_getsockname(sock, local, &len); 558 if (status != PJ_SUCCESS) { 559 pj_sock_close(sock); 560 return status; 561 } 562 } 563 564 *p_sock = sock; 565 return PJ_SUCCESS; 566 } 567 568 /* 569 * Close the transport. 570 */ 571 static void destroy_socket( pjsip_transport_t * tr) 572 { 573 pj_assert( pj_atomic_get(tr->ref_cnt) == 0); 574 pj_sock_close(tr->sock); 575 tr->sock = -1; 576 } 577 578 /* 579 * Create a new transport object. 580 */ 581 static pj_status_t create_transport( pjsip_transport_mgr *mgr, 582 pjsip_transport_type_e type, 583 pj_sock_t sock_hnd, 584 const pj_sockaddr_in *local_addr, 585 const pj_sockaddr_in *addr_name, 586 pjsip_transport_t **p_transport ) 587 { 588 pj_pool_t *tr_pool=NULL, *rdata_pool=NULL; 589 pjsip_transport_t *tr = NULL; 590 pj_status_t status; 591 592 /* Allocate pool for transport from endpoint. */ 593 tr_pool = pjsip_endpt_create_pool( mgr->endpt, 594 transport_get_name_format(type), 595 PJSIP_POOL_LEN_TRANSPORT, 596 PJSIP_POOL_INC_TRANSPORT ); 597 if (!tr_pool) { 598 status = PJ_ENOMEM; 599 goto on_error; 600 } 601 602 /* Allocate pool for rdata from endpoint. */ 603 rdata_pool = pjsip_endpt_create_pool( mgr->endpt, 604 "prdt%p", 605 PJSIP_POOL_LEN_RDATA, 606 PJSIP_POOL_INC_RDATA ); 607 if (!rdata_pool) { 608 status = PJ_ENOMEM; 609 goto on_error; 610 } 611 612 /* Allocate and initialize the transport. */ 613 tr = pj_pool_calloc(tr_pool, 1, sizeof(*tr)); 614 tr->pool = tr_pool; 615 tr->type = type; 616 tr->mgr = mgr; 617 tr->sock = sock_hnd; 618 pj_memcpy(&tr->local_addr, local_addr, sizeof(pj_sockaddr_in)); 619 pj_list_init(&tr->cb_list); 620 pj_sprintf(tr->obj_name, transport_get_name_format(type), tr); 621 622 if (type != PJSIP_TRANSPORT_UDP) { 623 tr->flag |= PJSIP_TRANSPORT_RELIABLE; 624 } 625 626 /* Address name. */ 627 if (addr_name == NULL) { 628 addr_name = &tr->local_addr; 629 } 630 pj_memcpy(&tr->addr_name, addr_name, sizeof(*addr_name)); 631 632 /* Create atomic */ 633 status = pj_atomic_create(tr_pool, 0, &tr->ref_cnt); 634 if (status != PJ_SUCCESS) 635 goto on_error; 636 637 /* Init rdata in the transport. */ 638 tr->rdata = pj_pool_alloc(rdata_pool, sizeof(*tr->rdata)); 639 tr->rdata->pool = rdata_pool; 640 tr->rdata->len = 0; 641 tr->rdata->transport = tr; 642 643 /* Init transport mutex. */ 644 status = pj_mutex_create_recursive(tr_pool, "mtr%p", &tr->tr_mutex); 645 if (status != PJ_SUCCESS) 646 goto on_error; 647 648 /* Register to I/O Queue */ 649 status = pj_ioqueue_register_sock( tr_pool, mgr->ioqueue, 650 tr->sock, tr, 651 &ioqueue_transport_callback, 652 &tr->key); 653 if (status != PJ_SUCCESS) 654 goto on_error; 655 656 *p_transport = tr; 657 return PJ_SUCCESS; 658 659 on_error: 660 if (tr && tr->tr_mutex) { 661 pj_mutex_destroy(tr->tr_mutex); 662 } 663 if (tr_pool) { 664 pjsip_endpt_destroy_pool(mgr->endpt, tr_pool); 665 } 666 if (rdata_pool) { 667 pjsip_endpt_destroy_pool(mgr->endpt, rdata_pool); 668 } 669 return status; 670 } 671 672 /* 673 * Destroy transport. 674 */ 675 static void destroy_transport( pjsip_transport_mgr *mgr, pjsip_transport_t *tr) 676 { 677 transport_key hash_key; 678 679 /* Remove from I/O queue. */ 680 pj_ioqueue_unregister( tr->key ); 681 682 /* Remove from hash table */ 683 init_key_from_transport(&hash_key, tr); 684 pj_hash_set(NULL, mgr->transport_table, &hash_key, sizeof(hash_key), NULL); 685 686 /* Close transport. */ 687 destroy_socket(tr); 688 689 /* Destroy the transport mutex. */ 690 pj_mutex_destroy(tr->tr_mutex); 691 692 /* Destroy atomic */ 693 pj_atomic_destroy( tr->ref_cnt ); 694 695 /* Release the pool associated with the rdata. */ 696 pjsip_endpt_destroy_pool(mgr->endpt, tr->rdata->pool ); 697 698 /* Release the pool associated with the transport. */ 699 pjsip_endpt_destroy_pool(mgr->endpt, tr->pool ); 700 } 701 702 703 static pj_status_t transport_send_msg( pjsip_transport_t *tr, 704 pjsip_tx_data *tdata, 705 const pj_sockaddr_in *addr, 706 pj_ssize_t *p_sent) 707 { 708 const char *buf = tdata->buf.start; 709 pj_ssize_t size; 710 pj_status_t status; 711 712 /* Can only send if tdata is not being sent! */ 713 if (pj_ioqueue_is_pending(tr->key, &tdata->op_key)) 282 PJ_ASSERT_RETURN(tr && tdata && addr, PJ_EINVAL); 283 284 /* Is it currently being sent? */ 285 if (tdata->is_pending) { 286 pj_assert(!"Invalid operation step!"); 714 287 return PJSIP_EPENDINGTX; 288 } 715 289 716 290 /* Allocate buffer if necessary. */ … … 721 295 } 722 296 723 /* Print the message if it's not printed */ 724 if (tdata->buf.cur <= tdata->buf.start) { 297 /* Do we need to reprint? */ 298 if (!pjsip_tx_data_is_valid(tdata)) { 299 pj_ssize_t size; 300 725 301 size = pjsip_msg_print( tdata->msg, tdata->buf.start, 726 302 tdata->buf.end - tdata->buf.start); … … 733 309 } 734 310 735 /* Send the message. */ 736 buf = tdata->buf.start; 737 size = tdata->buf.cur - tdata->buf.start; 738 739 if (tr->type == PJSIP_TRANSPORT_UDP) { 740 PJ_LOG(4,(tr->obj_name, "sendto %s:%d, %d bytes, data:\n" 741 "----------- begin msg ------------\n" 742 "%s" 743 "------------ end msg -------------", 744 pj_inet_ntoa(addr->sin_addr), 745 pj_sockaddr_in_get_port(addr), 746 size, buf)); 747 748 status = pj_ioqueue_sendto( tr->key, &tdata->op_key, 749 buf, &size, 0, addr, sizeof(*addr)); 750 } 751 #if PJ_HAS_TCP 752 else { 753 PJ_LOG(4,(tr->obj_name, "sending %d bytes, data:\n" 754 "----------- begin msg ------------\n" 755 "%s" 756 "------------ end msg -------------", 757 size, buf)); 758 759 status = pj_ioqueue_send(tr->key, &tdata->op_key, buf, &size, 0); 760 } 761 #else 762 else { 763 pj_assert(!"Unsupported transport"); 764 status = PJSIP_EUNSUPTRANSPORT; 765 } 766 #endif 767 768 *p_sent = size; 311 /* Save callback data. */ 312 tdata->token = token; 313 tdata->cb = cb; 314 315 /* Add reference counter. */ 316 pjsip_tx_data_add_ref(tdata); 317 318 /* Mark as pending. */ 319 tdata->is_pending = 1; 320 321 /* Send to transport. */ 322 status = (*tr->send_msg)(tr, tdata->buf.start, 323 tdata->buf.cur - tdata->buf.start, 324 &tdata->op_key, 325 addr, tdata, &transport_send_callback); 326 327 if (status != PJ_EPENDING) { 328 tdata->is_pending = 0; 329 pjsip_tx_data_dec_ref(tdata); 330 } 331 769 332 return status; 770 333 } 771 334 772 /* 773 * Send a SIP message using the specified transport, to the address specified 774 * in the outgoing data. 775 */ 776 PJ_DEF(pj_status_t) pjsip_transport_send_msg( pjsip_transport_t *tr, 777 pjsip_tx_data *tdata, 778 const pj_sockaddr_in *addr, 779 pj_ssize_t *sent) 780 { 781 PJ_LOG(5, (tr->obj_name, "pjsip_transport_send_msg(tdata=%s)", tdata->obj_name)); 782 783 return transport_send_msg(tr, tdata, addr, sent ); 784 } 785 786 /////////////////////////////////////////////////////////////////////////////// 335 static void transport_idle_callback(pj_timer_heap_t *timer_heap, 336 struct pj_timer_entry *entry) 337 { 338 pjsip_transport *tp = entry->user_data; 339 pj_assert(tp != NULL); 340 341 PJ_UNUSED_ARG(timer_heap); 342 343 entry->id = PJ_FALSE; 344 pjsip_transport_unregister(tp->tpmgr, tp); 345 } 346 347 /* 348 * Add ref. 349 */ 350 PJ_DEF(pj_status_t) pjsip_transport_add_ref( pjsip_transport *tp ) 351 { 352 PJ_ASSERT_RETURN(tp != NULL, PJ_EINVAL); 353 354 if (pj_atomic_inc_and_get(tp->ref_cnt) == 1) { 355 pj_lock_acquire(tp->tpmgr->lock); 356 /* Verify again. */ 357 if (pj_atomic_get(tp->ref_cnt) == 1) { 358 if (tp->idle_timer.id != PJ_FALSE) { 359 pj_timer_heap_cancel(tp->tpmgr->timer_heap, &tp->idle_timer); 360 tp->idle_timer.id = PJ_FALSE; 361 } 362 } 363 pj_lock_release(tp->tpmgr->lock); 364 } 365 366 return PJ_SUCCESS; 367 } 368 369 /* 370 * Dec ref. 371 */ 372 PJ_DEF(pj_status_t) pjsip_transport_dec_ref( pjsip_transport *tp ) 373 { 374 PJ_ASSERT_RETURN(tp != NULL, PJ_EINVAL); 375 376 pj_assert(pj_atomic_get(tp->ref_cnt) > 0); 377 378 if (pj_atomic_dec_and_get(tp->ref_cnt) == 0) { 379 pj_lock_acquire(tp->tpmgr->lock); 380 /* Verify again. */ 381 if (pj_atomic_get(tp->ref_cnt) == 0) { 382 pj_time_val delay = { PJSIP_TRANSPORT_IDLE_TIME, 0 }; 383 384 pj_assert(tp->idle_timer.id == 0); 385 tp->idle_timer.id = PJ_TRUE; 386 pj_timer_heap_schedule(tp->tpmgr->timer_heap, &tp->idle_timer, &delay); 387 } 388 pj_lock_release(tp->tpmgr->lock); 389 } 390 391 return PJ_SUCCESS; 392 } 393 394 395 /** 396 * Register a transport. 397 */ 398 PJ_DEF(pj_status_t) pjsip_transport_register( pjsip_tpmgr *mgr, 399 pjsip_transport *tp ) 400 { 401 transport_key key; 402 403 /* Init. */ 404 tp->tpmgr = mgr; 405 pj_memset(&tp->idle_timer, 0, sizeof(tp->idle_timer)); 406 tp->idle_timer.user_data = tp; 407 tp->idle_timer.cb = &transport_idle_callback; 408 409 /* 410 * Register to hash table. 411 */ 412 key.type = (pj_uint8_t)tp->type; 413 key.zero = 0; 414 key.addr = pj_ntohl(tp->rem_addr.sin_addr.s_addr); 415 key.port = pj_ntohs(tp->rem_addr.sin_port); 416 417 pj_lock_acquire(mgr->lock); 418 pj_hash_set(tp->pool, mgr->table, &key, sizeof(key), tp); 419 pj_lock_release(mgr->lock); 420 421 return PJ_SUCCESS; 422 } 423 424 425 /** 426 * Unregister transport. 427 */ 428 PJ_DEF(pj_status_t) pjsip_transport_unregister( pjsip_tpmgr *mgr, 429 pjsip_transport *tp) 430 { 431 transport_key key; 432 433 PJ_ASSERT_RETURN(pj_atomic_get(tp->ref_cnt) == 0, PJSIP_EBUSY); 434 435 pj_lock_acquire(tp->lock); 436 pj_lock_acquire(mgr->lock); 437 438 /* 439 * Unregister timer, if any. 440 */ 441 pj_assert(tp->idle_timer.id == PJ_FALSE); 442 if (tp->idle_timer.id != PJ_FALSE) { 443 pj_timer_heap_cancel(mgr->timer_heap, &tp->idle_timer); 444 tp->idle_timer.id = PJ_FALSE; 445 } 446 447 /* 448 * Unregister from hash table. 449 */ 450 key.type = (pj_uint8_t)tp->type; 451 key.zero = 0; 452 key.addr = pj_ntohl(tp->rem_addr.sin_addr.s_addr); 453 key.port = pj_ntohs(tp->rem_addr.sin_port); 454 455 pj_hash_set(tp->pool, mgr->table, &key, sizeof(key), NULL); 456 457 pj_lock_release(mgr->lock); 458 459 /* Destroy. */ 460 return tp->destroy(tp); 461 } 462 463 464 465 /***************************************************************************** 466 * 467 * TRANSPORT FACTORY 468 * 469 *****************************************************************************/ 470 471 472 PJ_DEF(pj_status_t) pjsip_tpmgr_register_tpfactory( pjsip_tpmgr *mgr, 473 pjsip_tpfactory *tpf) 474 { 475 pjsip_tpfactory *p; 476 pj_status_t status; 477 478 pj_lock_acquire(mgr->lock); 479 480 /* Check that no factory with the same type has been registered. */ 481 status = PJ_SUCCESS; 482 for (p=mgr->factory_list.next; p!=&mgr->factory_list; p=p->next) { 483 if (p->type == tpf->type) { 484 status = PJSIP_ETYPEEXISTS; 485 break; 486 } 487 if (p == tpf) { 488 status = PJ_EEXISTS; 489 break; 490 } 491 } 492 493 if (status != PJ_SUCCESS) { 494 pj_lock_release(mgr->lock); 495 return status; 496 } 497 498 pj_list_insert_before(&mgr->factory_list, tpf); 499 500 pj_lock_release(mgr->lock); 501 502 return PJ_SUCCESS; 503 } 504 505 506 /** 507 * Unregister factory. 508 */ 509 PJ_DEF(pj_status_t) pjsip_tpmgr_unregister_tpfactory( pjsip_tpmgr *mgr, 510 pjsip_tpfactory *tpf) 511 { 512 pj_lock_acquire(mgr->lock); 513 514 pj_assert(pj_list_find_node(&mgr->factory_list, tpf) == tpf); 515 pj_list_erase(tpf); 516 517 pj_lock_release(mgr->lock); 518 519 return PJ_SUCCESS; 520 } 521 522 523 /***************************************************************************** 524 * 525 * TRANSPORT MANAGER 526 * 527 *****************************************************************************/ 787 528 788 529 /* 789 530 * Create a new transport manager. 790 531 */ 791 PJ_DEF(pj_status_t) pjsip_transport_mgr_create( pj_pool_t *pool, 792 pjsip_endpoint * endpt, 793 void (*cb)(pjsip_endpoint*, 794 pjsip_rx_data *), 795 pjsip_transport_mgr **p_mgr) 796 { 797 pjsip_transport_mgr *mgr; 532 PJ_DEF(pj_status_t) pjsip_tpmgr_create( pj_pool_t *pool, 533 pjsip_endpoint *endpt, 534 pj_ioqueue_t *ioqueue, 535 pj_timer_heap_t *timer_heap, 536 void (*cb)(pjsip_endpoint*, 537 pj_status_t, 538 pjsip_rx_data *), 539 pjsip_tpmgr **p_mgr) 540 { 541 pjsip_tpmgr *mgr; 798 542 pj_status_t status; 799 543 800 PJ_LOG(5, (LOG_TRANSPORT_MGR, "pjsip_transport_mgr_create()")); 801 802 mgr = pj_pool_alloc(pool, sizeof(*mgr)); 544 PJ_ASSERT_RETURN(pool && endpt && cb && p_mgr, PJ_EINVAL); 545 546 PJ_LOG(5, (THIS_FILE, "pjsip_tpmgr_create()")); 547 548 mgr = pj_pool_zalloc(pool, sizeof(*mgr)); 803 549 mgr->endpt = endpt; 804 mgr->message_callback = cb; 805 mgr->send_buf_size = DEFAULT_SO_SNDBUF; 806 mgr->recv_buf_size = DEFAULT_SO_RCVBUF; 807 808 mgr->transport_table = pj_hash_create(pool, MGR_HASH_TABLE_SIZE); 809 if (!mgr->transport_table) { 550 mgr->msg_cb = cb; 551 mgr->ioqueue = ioqueue; 552 mgr->timer_heap = timer_heap; 553 pj_list_init(&mgr->factory_list); 554 555 mgr->table = pj_hash_create(pool, PJSIP_TPMGR_HTABLE_SIZE); 556 if (!mgr->table) 810 557 return PJ_ENOMEM; 811 } 812 status = pj_ ioqueue_create(pool, PJSIP_MAX_TRANSPORTS, &mgr->ioqueue);813 if (status != PJ_SUCCESS) {558 559 status = pj_lock_create_recursive_mutex(pool, "tmgr%p", &mgr->lock); 560 if (status != PJ_SUCCESS) 814 561 return status; 815 }816 status = pj_mutex_create_recursive(pool, "tmgr%p", &mgr->mutex);817 if (status != PJ_SUCCESS) {818 pj_ioqueue_destroy(mgr->ioqueue);819 return status;820 }821 pj_gettimeofday(&mgr->next_idle_check);822 mgr->next_idle_check.sec += MGR_IDLE_CHECK_INTERVAL;823 562 824 563 *p_mgr = mgr; 825 return status; 826 } 827 828 /* 564 return PJ_SUCCESS; 565 } 566 567 /* 568 * pjsip_tpmgr_destroy() 569 * 829 570 * Destroy transport manager. 830 571 */ 831 PJ_DEF(pj_status_t) pjsip_t ransport_mgr_destroy( pjsip_transport_mgr *mgr )572 PJ_DEF(pj_status_t) pjsip_tpmgr_destroy( pjsip_tpmgr *mgr ) 832 573 { 833 574 pj_hash_iterator_t itr_val; 834 575 pj_hash_iterator_t *itr; 835 576 836 PJ_LOG(5, ( LOG_TRANSPORT_MGR, "pjsip_transport_mgr_destroy()"));837 838 pj_ mutex_lock(mgr->mutex);839 840 itr = pj sip_transport_first(mgr, &itr_val);577 PJ_LOG(5, (THIS_FILE, "pjsip_tpmgr_destroy()")); 578 579 pj_lock_acquire(mgr->lock); 580 581 itr = pj_hash_first(mgr->table, &itr_val); 841 582 while (itr != NULL) { 842 583 pj_hash_iterator_t *next; 843 pjsip_transport _t*transport;584 pjsip_transport *transport; 844 585 845 transport = pj sip_transport_this(mgr, itr);846 847 next = pj sip_transport_next(mgr, itr);586 transport = pj_hash_this(mgr->table, itr); 587 588 next = pj_hash_next(mgr->table, itr); 848 589 849 590 pj_atomic_set(transport->ref_cnt, 0); 850 destroy_transport(mgr, transport);591 pjsip_transport_unregister(mgr, transport); 851 592 852 593 itr = next; … … 854 595 pj_ioqueue_destroy(mgr->ioqueue); 855 596 856 pj_ mutex_unlock(mgr->mutex);597 pj_lock_release(mgr->lock); 857 598 858 599 return PJ_SUCCESS; 859 600 } 860 601 861 /* 862 * Create listener 863 */ 864 static pj_status_t create_listener( pjsip_transport_mgr *mgr, 865 pjsip_transport_type_e type, 866 pj_sock_t sock_hnd, 867 pj_sockaddr_in *local_addr, 868 const pj_sockaddr_in *addr_name) 869 { 870 pjsip_transport_t *tr; 871 struct transport_key *hash_key; 872 const pj_str_t loopback_addr = { "127.0.0.1", 9 }; 873 pj_status_t status; 874 875 if (mgr->send_buf_size != 0) { 876 int opt_val = mgr->send_buf_size; 877 status = pj_sock_setsockopt( sock_hnd, PJ_SOL_SOCKET, 878 PJ_SO_SNDBUF, 879 &opt_val, sizeof(opt_val)); 880 881 if (status != PJ_SUCCESS) { 882 return status; 883 } 884 } 885 886 if (mgr->recv_buf_size != 0) { 887 int opt_val = mgr->recv_buf_size; 888 status = pj_sock_setsockopt( sock_hnd, PJ_SOL_SOCKET, 889 PJ_SO_RCVBUF, 890 &opt_val, sizeof(opt_val)); 891 if (status != PJ_SUCCESS) { 892 return status; 893 } 894 } 895 896 status = create_transport(mgr, type, sock_hnd, local_addr, addr_name, &tr); 897 if (status != PJ_SUCCESS) { 898 pj_sock_close(sock_hnd); 899 return status; 900 } 901 #if PJ_HAS_TCP 902 if (type == PJSIP_TRANSPORT_TCP) { 903 904 status = pj_sock_listen(tr->sock, BACKLOG); 905 if (status != 0) { 906 destroy_transport(mgr, tr); 907 return status; 908 } 909 910 /* Discard immediate connections. */ 911 do { 912 tr->accept_data.addrlen = sizeof(tr->accept_data.local); 913 status = pj_ioqueue_accept(tr->key, &tr->accept_op, 914 &tr->accept_data.sock, 915 &tr->accept_data.local, 916 &tr->accept_data.remote, 917 &tr->accept_data.addrlen); 918 if (status==PJ_SUCCESS) { 919 pj_sock_close(tr->accept_data.sock); 920 } else if (status != PJ_EPENDING) { 921 destroy_transport(mgr, tr); 922 return status; 923 } 924 } while (status==PJ_SUCCESS); 925 926 } else 927 #endif 928 if (type == PJSIP_TRANSPORT_UDP) { 929 pj_ssize_t bytes; 930 931 /* Discard immediate data. */ 932 do { 933 tr->rdata->addr_len = sizeof(tr->rdata->addr); 934 bytes = PJSIP_MAX_PKT_LEN; 935 status = pj_ioqueue_recvfrom( tr->key, &tr->rdata->op_key, 936 tr->rdata->packet, &bytes, 0, 937 &tr->rdata->addr, 938 &tr->rdata->addr_len); 939 if (status == PJ_SUCCESS) { 940 ; 941 } else if (status != PJ_EPENDING) { 942 destroy_transport(mgr, tr); 943 return status; 944 } 945 } while (status == PJ_SUCCESS); 946 } 947 948 pj_atomic_set(tr->ref_cnt, 1); 949 950 /* Listeners normally have no remote address */ 951 pj_memset(&tr->remote_addr, 0, sizeof(tr->remote_addr)); 952 953 /* Set remote address to 127.0.0.1 for UDP socket bound to 127.0.0.1. 954 * See further comments on struct pjsip_transport_t definition. 602 603 /* 604 * pjsip_tpmgr_receive_packet() 605 * 606 * Called by tranports when they receive a new packet. 607 */ 608 PJ_DEF(pj_ssize_t) pjsip_tpmgr_receive_packet( pjsip_tpmgr *mgr, 609 pjsip_rx_data *rdata) 610 { 611 pjsip_transport *tr = rdata->tp_info.transport; 612 pj_str_t s; 613 614 char *current_pkt; 615 pj_size_t remaining_len; 616 pj_size_t total_processed = 0; 617 618 /* Check size. */ 619 pj_assert(rdata->pkt_info.len > 0); 620 if (rdata->pkt_info.len <= 0) 621 return -1; 622 623 current_pkt = rdata->pkt_info.packet; 624 remaining_len = rdata->pkt_info.len; 625 626 /* Must NULL terminate buffer. This is the requirement of the 627 * parser etc. 955 628 */ 956 if (type == PJSIP_TRANSPORT_UDP && 957 local_addr->sin_addr.s_addr == pj_inet_addr(&loopback_addr).s_addr) 958 { 959 pj_str_t localaddr = pj_str("127.0.0.1"); 960 pj_sockaddr_in_set_str_addr( &tr->remote_addr, &localaddr); 961 } 962 hash_key = pj_pool_alloc(tr->pool, sizeof(transport_key)); 963 init_key_from_transport(hash_key, tr); 964 965 pj_mutex_lock(mgr->mutex); 966 pj_hash_set(tr->pool, mgr->transport_table, 967 hash_key, sizeof(transport_key), tr); 968 pj_mutex_unlock(mgr->mutex); 969 970 PJ_LOG(4,(tr->obj_name, "Listening at %s %s:%d", 971 get_type_name(tr->type), 972 pj_inet_ntoa(tr->local_addr.sin_addr), 973 pj_sockaddr_in_get_port(&tr->local_addr))); 974 PJ_LOG(4,(tr->obj_name, "Listener public address is at %s %s:%d", 975 get_type_name(tr->type), 976 pj_inet_ntoa(tr->addr_name.sin_addr), 977 pj_sockaddr_in_get_port(&tr->addr_name))); 978 return PJ_SUCCESS; 979 } 980 981 /* 982 * Create listener. 983 */ 984 PJ_DEF(pj_status_t) pjsip_create_listener( pjsip_transport_mgr *mgr, 985 pjsip_transport_type_e type, 986 pj_sockaddr_in *local_addr, 987 const pj_sockaddr_in *addr_name) 988 { 989 pj_sock_t sock_hnd; 990 pj_status_t status; 991 992 PJ_LOG(5, (LOG_TRANSPORT_MGR, "pjsip_create_listener(type=%d)", type)); 993 994 status = create_socket(type, local_addr, &sock_hnd); 995 if (status != PJ_SUCCESS) { 996 return status; 997 } 998 999 return create_listener(mgr, type, sock_hnd, local_addr, addr_name); 1000 } 1001 1002 /* 1003 * Create UDP listener. 1004 */ 1005 PJ_DEF(pj_status_t) pjsip_create_udp_listener( pjsip_transport_mgr *mgr, 1006 pj_sock_t sock, 1007 const pj_sockaddr_in *addr_name) 1008 { 1009 pj_sockaddr_in local_addr; 1010 pj_status_t status; 1011 int addrlen = sizeof(local_addr); 1012 1013 status = pj_sock_getsockname(sock, (pj_sockaddr_t*)&local_addr, &addrlen); 1014 if (status != PJ_SUCCESS) 1015 return status; 1016 1017 return create_listener(mgr, PJSIP_TRANSPORT_UDP, sock, 1018 &local_addr, addr_name); 1019 } 1020 1021 /* 1022 * Find transport to be used to send message to remote destination. If no 1023 * suitable transport is found, a new one will be created. 1024 */ 1025 PJ_DEF(void) pjsip_transport_get( pjsip_transport_mgr *mgr, 1026 pj_pool_t *pool, 1027 pjsip_transport_type_e type, 1028 const pj_sockaddr_in *remote, 1029 void *token, 1030 pjsip_transport_completion_callback *cb) 1031 { 1032 transport_key search_key, *hash_key; 1033 pjsip_transport_t *tr; 1034 pj_sockaddr_in local; 1035 pj_sock_t sock_hnd; 1036 pj_status_t status; 1037 struct transport_callback *cb_rec; 1038 1039 PJ_LOG(5, (LOG_TRANSPORT_MGR, "pjsip_transport_get()")); 1040 1041 /* Create the callback record. 1042 */ 1043 cb_rec = pj_pool_calloc(pool, 1, sizeof(*cb_rec)); 1044 cb_rec->token = token; 1045 cb_rec->cb = cb; 1046 1047 /* Create key for hash table look-up. 1048 * The key creation is different for TCP and UDP. 1049 */ 1050 #if PJ_HAS_TCP 1051 if (type==PJSIP_TRANSPORT_TCP) { 1052 init_tcp_key(&search_key, type, remote); 1053 } else 1054 #endif 1055 if (type==PJSIP_TRANSPORT_UDP) { 1056 init_udp_key(&search_key, type, remote); 1057 } 1058 1059 /* Start lock the manager. */ 1060 pj_mutex_lock(mgr->mutex); 1061 1062 /* Lookup the transport in the hash table. */ 1063 tr = pj_hash_get(mgr->transport_table, &search_key, sizeof(transport_key)); 1064 1065 if (tr) { 1066 /* Transport found. If the transport is still busy (i.e. connecting 1067 * is in progress), then just register the callback. Otherwise 1068 * report via the callback if callback is specified. 629 current_pkt[remaining_len] = '\0'; 630 631 /* Process all message fragments. */ 632 while (total_processed < remaining_len) { 633 634 pjsip_msg *msg; 635 pj_size_t msg_fragment_size = 0; 636 637 /* Initialize default fragment size. */ 638 msg_fragment_size = remaining_len; 639 640 /* Null terminate packet. */ 641 642 /* Clear and init msg_info in rdata. 643 * Endpoint might inspect the values there when we call the callback 644 * to report some errors. 1069 645 */ 1070 pj_mutex_unlock(mgr->mutex); 1071 pj_mutex_lock(tr->tr_mutex); 1072 1073 if (tr->flag & PJSIP_TRANSPORT_IOQUEUE_BUSY) { 1074 /* Transport is busy. Just register the callback. */ 1075 pj_list_insert_before(&tr->cb_list, cb_rec); 1076 1077 } else { 1078 /* Transport is ready. Call callback now. 1079 */ 1080 (*cb_rec->cb)(tr, cb_rec->token, PJ_SUCCESS); 1081 } 1082 pj_mutex_unlock(tr->tr_mutex); 1083 1084 return; 1085 } 1086 1087 1088 /* Transport not found. Create new one. */ 1089 pj_memset(&local, 0, sizeof(local)); 1090 local.sin_family = PJ_AF_INET; 1091 status = create_socket(type, &local, &sock_hnd); 1092 if (status != PJ_SUCCESS) { 1093 pj_mutex_unlock(mgr->mutex); 1094 (*cb_rec->cb)(NULL, cb_rec->token, status); 1095 return; 1096 } 1097 status = create_transport(mgr, type, sock_hnd, &local, NULL, &tr); 1098 if (status != PJ_SUCCESS) { 1099 pj_mutex_unlock(mgr->mutex); 1100 (*cb_rec->cb)(NULL, cb_rec->token, status); 1101 return; 1102 } 1103 1104 #if PJ_HAS_TCP 1105 if (type == PJSIP_TRANSPORT_TCP) { 1106 pj_memcpy(&tr->remote_addr, remote, sizeof(pj_sockaddr_in)); 1107 status = pj_ioqueue_connect(tr->key, &tr->remote_addr, 1108 sizeof(pj_sockaddr_in)); 1109 pj_assert(status != 0); 1110 if (status != PJ_EPENDING) { 1111 PJ_TODO(HANDLE_IMMEDIATE_CONNECT); 1112 destroy_transport(mgr, tr); 1113 pj_mutex_unlock(mgr->mutex); 1114 (*cb_rec->cb)(NULL, cb_rec->token, status); 1115 return; 1116 } 1117 } else 1118 #endif 1119 if (type == PJSIP_TRANSPORT_UDP) { 1120 pj_ssize_t size; 1121 1122 do { 1123 tr->rdata->addr_len = sizeof(tr->rdata->addr); 1124 size = PJSIP_MAX_PKT_LEN; 1125 status = pj_ioqueue_recvfrom( tr->key, &tr->rdata->op_key, 1126 tr->rdata->packet, &size, 0, 1127 &tr->rdata->addr, 1128 &tr->rdata->addr_len); 1129 if (status == PJ_SUCCESS) 1130 ; 1131 else if (status != PJ_EPENDING) { 1132 destroy_transport(mgr, tr); 1133 pj_mutex_unlock(mgr->mutex); 1134 (*cb_rec->cb)(NULL, cb_rec->token, status); 1135 return; 1136 } 1137 1138 /* Bug here. 1139 * If data is immediately available, although not likely, it will 1140 * be dropped because we don't expect to have data right after 1141 * the socket is created, do we ?! 1142 */ 1143 PJ_TODO(FIXED_BUG_ON_IMMEDIATE_TRANSPORT_DATA); 1144 1145 } while (status == PJ_SUCCESS); 1146 1147 //Bug: cb will never be called! 1148 // Must force status to PJ_SUCCESS; 1149 //status = PJ_IOQUEUE_PENDING; 1150 1151 status = PJ_SUCCESS; 1152 1153 } else { 1154 pj_mutex_unlock(mgr->mutex); 1155 (*cb_rec->cb)(NULL, cb_rec->token, PJSIP_EUNSUPTRANSPORT); 1156 return; 1157 } 1158 1159 pj_assert(status==PJ_EPENDING || status==PJ_SUCCESS); 1160 pj_mutex_lock(tr->tr_mutex); 1161 hash_key = pj_pool_alloc(tr->pool, sizeof(transport_key)); 1162 pj_memcpy(hash_key, &search_key, sizeof(transport_key)); 1163 pj_hash_set(tr->pool, mgr->transport_table, 1164 hash_key, sizeof(transport_key), tr); 1165 if (status == PJ_SUCCESS) { 1166 pj_mutex_unlock(tr->tr_mutex); 1167 pj_mutex_unlock(mgr->mutex); 1168 (*cb_rec->cb)(tr, cb_rec->token, PJ_SUCCESS); 1169 } else { 1170 pj_list_insert_before(&tr->cb_list, cb_rec); 1171 pj_mutex_unlock(tr->tr_mutex); 1172 pj_mutex_unlock(mgr->mutex); 1173 } 1174 1175 } 1176 1177 #if PJ_HAS_TCP 1178 /* 1179 * Handle completion of asynchronous accept() operation. 1180 * This function is called by handle_events() function. 1181 */ 1182 static void handle_new_connection( pjsip_transport_mgr *mgr, 1183 pjsip_transport_t *listener, 1184 pj_status_t status ) 1185 { 1186 pjsip_transport_t *tr; 1187 transport_key *hash_key; 1188 pj_ssize_t size; 1189 1190 pj_assert (listener->type == PJSIP_TRANSPORT_TCP); 1191 1192 if (status != PJ_SUCCESS) { 1193 PJSIP_ENDPT_LOG_ERROR((mgr->endpt, listener->obj_name, status, 1194 "Error in accept() completion")); 1195 goto on_return; 1196 } 1197 1198 PJ_LOG(4,(listener->obj_name, "incoming tcp connection from %s:%d", 1199 pj_inet_ntoa(listener->accept_data.remote.sin_addr), 1200 pj_sockaddr_in_get_port(&listener->accept_data.remote))); 1201 1202 status = create_transport(mgr, listener->type, 1203 listener->accept_data.sock, 1204 &listener->accept_data.local, 1205 NULL, &tr); 1206 if (status != PJ_SUCCESS) { 1207 PJSIP_ENDPT_LOG_ERROR((mgr->endpt, listener->obj_name, status, 1208 "Error in creating new incoming TCP")); 1209 goto on_return; 1210 } 1211 1212 /* 1213 tr->rdata->addr_len = sizeof(tr->rdata->addr); 1214 status = pj_ioqueue_recvfrom( mgr->ioqueue, tr->key, 1215 tr->rdata->packet, PJSIP_MAX_PKT_LEN, 1216 &tr->rdata->addr, 1217 &tr->rdata->addr_len); 1218 */ 1219 tr->rdata->addr = listener->accept_data.remote; 1220 tr->rdata->addr_len = listener->accept_data.addrlen; 1221 1222 size = PJSIP_MAX_PKT_LEN; 1223 status = pj_ioqueue_recv(tr->key, &tr->rdata->op_key, 1224 tr->rdata->packet, &size, 0); 1225 if (status != PJ_EPENDING) { 1226 PJSIP_ENDPT_LOG_ERROR((mgr->endpt, listener->obj_name, status, 1227 "Error in receiving data")); 1228 PJ_TODO(IMMEDIATE_DATA); 1229 destroy_transport(mgr, tr); 1230 goto on_return; 1231 } 1232 1233 pj_memcpy(&tr->remote_addr, &listener->accept_data.remote, 1234 listener->accept_data.addrlen); 1235 hash_key = pj_pool_alloc(tr->pool, sizeof(transport_key)); 1236 init_key_from_transport(hash_key, tr); 1237 1238 pj_mutex_lock(mgr->mutex); 1239 pj_hash_set(tr->pool, mgr->transport_table, hash_key, 1240 sizeof(transport_key), tr); 1241 pj_mutex_unlock(mgr->mutex); 1242 1243 on_return: 1244 /* Re-initiate asynchronous accept() */ 1245 listener->accept_data.addrlen = sizeof(listener->accept_data.local); 1246 status = pj_ioqueue_accept(listener->key, &listener->accept_op, 1247 &listener->accept_data.sock, 1248 &listener->accept_data.local, 1249 &listener->accept_data.remote, 1250 &listener->accept_data.addrlen); 1251 if (status != PJ_EPENDING) { 1252 PJSIP_ENDPT_LOG_ERROR((mgr->endpt, listener->obj_name, status, 1253 "Error in receiving data")); 1254 PJ_TODO(IMMEDIATE_ACCEPT); 1255 return; 1256 } 1257 } 1258 1259 /* 1260 * Handle completion of asynchronous connect() function. 1261 * This function is called by the handle_events() function. 1262 */ 1263 static void handle_connect_completion( pjsip_transport_mgr *mgr, 1264 pjsip_transport_t *tr, 1265 pj_status_t status ) 1266 { 1267 struct transport_callback new_list; 1268 struct transport_callback *cb_rec; 1269 pj_ssize_t recv_size; 1270 1271 PJ_UNUSED_ARG(mgr); 1272 1273 /* On connect completion, we must call all registered callbacks in 1274 * the transport. 1275 */ 1276 1277 /* Initialize new list. */ 1278 pj_list_init(&new_list); 1279 1280 /* Hold transport's mutex. We don't want other thread to register a 1281 * callback while we're dealing with it. 1282 */ 1283 pj_mutex_lock(tr->tr_mutex); 1284 1285 /* Copy callback list to new list so that we can call the callbacks 1286 * without holding the mutex. 1287 */ 1288 pj_list_merge_last(&new_list, &tr->cb_list); 1289 1290 /* Clear transport's busy flag. */ 1291 tr->flag &= ~PJSIP_TRANSPORT_IOQUEUE_BUSY; 1292 1293 /* If success, update local address. 1294 * Local address is only available after connect() has returned. 1295 */ 1296 if (status == PJ_SUCCESS) { 1297 int addrlen = sizeof(tr->local_addr); 1298 1299 status = pj_sock_getsockname(tr->sock, 1300 (pj_sockaddr_t*)&tr->local_addr, 1301 &addrlen); 1302 if (status == PJ_SUCCESS) { 1303 pj_memcpy(&tr->addr_name, &tr->local_addr, sizeof(tr->addr_name)); 1304 } 1305 } 1306 1307 /* Unlock mutex. */ 1308 pj_mutex_unlock(tr->tr_mutex); 1309 1310 /* Call all registered callbacks. */ 1311 cb_rec = new_list.next; 1312 while (cb_rec != &new_list) { 1313 struct transport_callback *next; 1314 next = cb_rec->next; 1315 (*cb_rec->cb)(tr, cb_rec->token, status); 1316 cb_rec = next; 1317 } 1318 1319 /* Success? */ 1320 if (status != PJ_SUCCESS) { 1321 destroy_transport(mgr, tr); 1322 PJ_TODO(WTF); 1323 return; 1324 } 1325 1326 /* Initiate read operation to socket. */ 1327 recv_size = PJSIP_MAX_PKT_LEN; 1328 status = pj_ioqueue_recv( tr->key, &tr->rdata->op_key, tr->rdata->packet, 1329 &recv_size, 0); 1330 if (status != PJ_EPENDING) { 1331 destroy_transport(mgr, tr); 1332 PJ_TODO(IMMEDIATE_DATA); 1333 return; 1334 } 1335 } 1336 #endif /* PJ_HAS_TCP */ 1337 1338 /* 1339 * Handle incoming data. 1340 * This function is called when the transport manager receives 'notification' 1341 * from the I/O Queue that the receive operation has completed. 1342 * This function will then attempt to parse the message, and hands over the 1343 * message to the endpoint. 1344 */ 1345 static void handle_received_data( pjsip_transport_mgr *mgr, 1346 pjsip_transport_t *tr, 1347 pj_ssize_t size ) 1348 { 1349 pjsip_msg *msg; 1350 pjsip_rx_data *rdata = tr->rdata; 1351 pj_pool_t *rdata_pool; 1352 pjsip_hdr *hdr; 1353 pj_str_t s; 1354 char *src_addr; 1355 int src_port; 1356 pj_size_t msg_fragment_size = 0; 1357 1358 /* Check size. */ 1359 if (size < 1) { 1360 if (tr->type != PJSIP_TRANSPORT_UDP) { 1361 /* zero bytes indicates transport has been closed for TCP. 1362 * But alas, we can't destroy it now since transactions may still 1363 * have reference to it. In that case, just do nothing, the 1364 * transaction will receive error when it tries to send anything. 1365 * But alas!! UAC transactions wont send anything!!. 1366 * So this is a bug! 1367 */ 1368 if (pj_atomic_get(tr->ref_cnt)==0) { 1369 PJ_LOG(4,(tr->obj_name, "connection closed")); 1370 destroy_transport(mgr, tr); 1371 } else { 1372 PJ_TODO(HANDLE_TCP_TRANSPORT_CLOSED); 1373 //PJ_TODO(SIGNAL_TRANSACTIONS_ON_TRANSPORT_CLOSED); 1374 } 1375 return; 1376 } else { 1377 /* On Windows machines, UDP recv() will return zero upon receiving 1378 * ICMP port unreachable message. 1379 */ 1380 PJ_LOG(4,(tr->obj_name, "Ignored zero length UDP packet (port unreachable?)")); 1381 goto on_return; 1382 } 1383 } 1384 1385 /* Save received time. */ 1386 pj_gettimeofday(&rdata->timestamp); 1387 1388 /* Update length. */ 1389 rdata->len += size; 1390 1391 /* Null terminate packet, this is the requirement of the parser. */ 1392 rdata->packet[rdata->len] = '\0'; 1393 1394 /* Get source address and port for logging purpose. */ 1395 src_addr = pj_inet_ntoa(rdata->addr.sin_addr); 1396 src_port = pj_sockaddr_in_get_port(&rdata->addr); 1397 1398 /* Print the whole data to the log. */ 1399 PJ_LOG(4,(tr->obj_name, "%d bytes recvfrom %s:%d:\n" 1400 "----------- begin msg ------------\n" 1401 "%s" 1402 "------------ end msg -------------", 1403 rdata->len, src_addr, src_port, rdata->packet)); 1404 1405 1406 /* Process all message fragments. */ 1407 while (rdata->len > 0) { 1408 1409 msg_fragment_size = rdata->len; 1410 #if PJ_HAS_TCP 646 pj_memset(&rdata->msg_info, 0, sizeof(rdata->msg_info)); 647 pj_list_init(&rdata->msg_info.parse_err); 648 rdata->msg_info.msg_buf = current_pkt; 649 rdata->msg_info.len = remaining_len; 650 1411 651 /* For TCP transport, check if the whole message has been received. */ 1412 if ( tr->type != PJSIP_TRANSPORT_UDP) {652 if ((tr->flag & PJSIP_TRANSPORT_DATAGRAM) == 0) { 1413 653 pj_status_t msg_status; 1414 msg_status = pjsip_find_msg( rdata->packet, rdata->len, PJ_FALSE,654 msg_status = pjsip_find_msg(current_pkt, remaining_len, PJ_FALSE, 1415 655 &msg_fragment_size); 1416 656 if (msg_status != PJ_SUCCESS) { 1417 if (rdata->len == PJSIP_MAX_PKT_LEN) { 1418 PJSIP_ENDPT_LOG_ERROR((mgr->endpt, tr->obj_name, 1419 PJSIP_EOVERFLOW, 1420 "Buffer discarded for %s:%d", 1421 src_addr, src_port)); 1422 goto on_return; 657 if (remaining_len == PJSIP_MAX_PKT_LEN) { 658 mgr->msg_cb(mgr->endpt, PJSIP_EOVERFLOW, rdata); 659 /* Exhaust all data. */ 660 return rdata->pkt_info.len; 1423 661 } else { 1424 goto tcp_read_packet; 662 /* Not enough data in packet. */ 663 return total_processed; 1425 664 } 1426 665 } 1427 666 } 1428 #endif 1429 1430 /* Clear parser error report */ 1431 pj_list_init(&rdata->parse_err); 667 668 /* Update msg_info. */ 669 rdata->msg_info.len = msg_fragment_size; 1432 670 1433 671 /* Parse the message. */ 1434 PJ_LOG(5,(tr->obj_name, "Parsing %d bytes from %s:%d", msg_fragment_size, 1435 src_addr, src_port)); 1436 1437 msg = pjsip_parse_rdata( rdata->packet, msg_fragment_size, rdata); 672 rdata->msg_info.msg = msg = 673 pjsip_parse_rdata( current_pkt, msg_fragment_size, rdata); 1438 674 if (msg == NULL) { 1439 PJ_LOG(3,(tr->obj_name, "Bad message (%d bytes from %s:%d)", msg_fragment_size, 1440 src_addr, src_port)); 675 mgr->msg_cb(mgr->endpt, PJSIP_EINVALIDMSG, rdata); 1441 676 goto finish_process_fragment; 1442 677 } 1443 678 1444 679 /* Perform basic header checking. */ 1445 if (rdata->call_id.ptr == NULL || rdata->from == NULL || 1446 rdata->to == NULL || rdata->via == NULL || rdata->cseq == NULL) 680 if (rdata->msg_info.call_id.ptr == NULL || 681 rdata->msg_info.from == NULL || 682 rdata->msg_info.to == NULL || 683 rdata->msg_info.via == NULL || 684 rdata->msg_info.cseq == NULL) 1447 685 { 1448 PJ_LOG(3,(tr->obj_name, "Bad message from %s:%d: missing some header", 1449 src_addr, src_port)); 686 mgr->msg_cb(mgr->endpt, PJSIP_EMISSINGHDR, rdata); 1450 687 goto finish_process_fragment; 1451 688 } 1452 689 1453 /* If message is received from address that's different from thesent-by,690 /* If message is received from address that's different from sent-by, 1454 691 * MUST add received parameter to the via. 1455 * In our case, we add Via receive param for EVERY received message,1456 * because it saves us from resolving the host HERE in case sent-by is in1457 * FQDN format. And it doesn't hurt either.1458 692 */ 1459 s = pj_str(src_addr); 1460 pj_strdup(rdata->pool, &rdata->via->recvd_param, &s); 693 s = pj_str(pj_inet_ntoa(rdata->pkt_info.addr.sin_addr)); 694 if (pj_strcmp(&s, &rdata->msg_info.via->sent_by.host) != 0) { 695 pj_strdup(rdata->tp_info.pool, 696 &rdata->msg_info.via->recvd_param, &s); 697 } 1461 698 1462 699 /* RFC 3581: 1463 700 * If message contains "rport" param, put the received port there. 1464 701 */ 1465 if (rdata->via->rport_param == 0) { 1466 rdata->via->rport_param = pj_sockaddr_in_get_port(&rdata->addr); 702 if (rdata->msg_info.via->rport_param == 0) { 703 rdata->msg_info.via->rport_param = 704 pj_ntohs(rdata->pkt_info.addr.sin_port); 1467 705 } 1468 706 … … 1470 708 */ 1471 709 if (msg->type == PJSIP_RESPONSE_MSG) { 1472 hdr = (pjsip_hdr*)rdata->via->next; 1473 if (hdr != &rdata->msg->hdr) { 710 pjsip_hdr *hdr; 711 hdr = (pjsip_hdr*)rdata->msg_info.via->next; 712 if (hdr != &msg->hdr) { 1474 713 hdr = pjsip_msg_find_hdr(msg, PJSIP_H_VIA, hdr); 1475 714 if (hdr) { 1476 PJ_LOG(3,(tr->obj_name, "Bad message from %s:%d: " 1477 "multiple Via in response message", 1478 src_addr, src_port)); 715 mgr->msg_cb(mgr->endpt, PJSIP_EMULTIPLEVIA, rdata); 1479 716 goto finish_process_fragment; 1480 717 } … … 1483 720 1484 721 /* Call the transport manager's upstream message callback. 1485 */ 1486 (*mgr->message_callback)(mgr->endpt, rdata); 722 */ 723 mgr->msg_cb(mgr->endpt, PJ_SUCCESS, rdata); 724 1487 725 1488 726 finish_process_fragment: 1489 rdata->len -= msg_fragment_size; 1490 if (rdata->len > 0) { 1491 pj_memmove(rdata->packet, rdata->packet+msg_fragment_size, rdata->len); 1492 PJ_LOG(4,(tr->obj_name, "Processing next fragment, size=%d bytes", rdata->len)); 1493 } 1494 1495 } /* while (rdata->len > 0) */ 1496 1497 on_return: 1498 /* Reset the pool and rdata */ 1499 rdata_pool = rdata->pool; 1500 pj_pool_reset(rdata_pool); 1501 rdata = pj_pool_alloc( rdata_pool, sizeof(*rdata) ); 1502 rdata->len = 0; 1503 rdata->transport = tr; 1504 rdata->pool = rdata_pool; 1505 tr->rdata = rdata; 1506 1507 /* Read the next packet. */ 1508 rdata->addr_len = sizeof(rdata->addr); 1509 if (tr->type == PJSIP_TRANSPORT_UDP) { 1510 pj_ssize_t size = PJSIP_MAX_PKT_LEN; 1511 pj_ioqueue_recvfrom(tr->key, &tr->rdata->op_key, 1512 tr->rdata->packet, &size, 0, 1513 &rdata->addr, &rdata->addr_len); 1514 PJ_TODO(HANDLE_IMMEDIATE_DATA); 1515 } 1516 1517 #if PJ_HAS_TCP 1518 /* The next 'if' should have been 'else if', but we need to put the 1519 label inside the '#if PJ_HAS_TCP' block to avoid 'unreferenced label' warning. 727 total_processed += msg_fragment_size; 728 current_pkt += msg_fragment_size; 729 remaining_len -= msg_fragment_size; 730 731 } /* while (rdata->pkt_info.len > 0) */ 732 733 734 return total_processed; 735 } 736 737 738 /* 739 * pjsip_tpmgr_alloc_transport() 740 * 741 * Get transport suitable to communicate to remote. Create a new one 742 * if necessary. 743 */ 744 PJ_DEF(pj_status_t) pjsip_tpmgr_alloc_transport( pjsip_tpmgr *mgr, 745 pjsip_transport_type_e type, 746 const pj_sockaddr_in *remote, 747 pjsip_transport **p_transport) 748 { 749 transport_key key; 750 pjsip_transport *transport; 751 pjsip_tpfactory *factory; 752 pj_status_t status; 753 754 pj_lock_acquire(mgr->lock); 755 756 /* First try to get exact destination. */ 757 key.type = (pj_uint8_t)type; 758 key.zero = 0; 759 key.addr = pj_ntohl(remote->sin_addr.s_addr); 760 key.port = pj_ntohs(remote->sin_port); 761 762 transport = pj_hash_get(mgr->table, &key, sizeof(key)); 763 if (transport != NULL) { 764 unsigned flag = pjsip_transport_get_flag_from_type(type); 765 766 /* For datagram transports, try lookup with zero address. */ 767 if (flag & PJSIP_TRANSPORT_DATAGRAM) { 768 key.addr = 0; 769 key.port = 0; 770 771 transport = pj_hash_get(mgr->table, &key, sizeof(key)); 772 } 773 } 774 775 if (transport != NULL) { 776 /* 777 * Transport found! 778 */ 779 pjsip_transport_add_ref(transport); 780 pj_lock_release(mgr->lock); 781 *p_transport = transport; 782 return PJ_SUCCESS; 783 } 784 785 /* 786 * Transport not found! 787 * Find factory that can create such transport. 1520 788 */ 1521 tcp_read_packet: 1522 if (tr->type == PJSIP_TRANSPORT_TCP) { 1523 pj_ssize_t size = PJSIP_MAX_PKT_LEN - tr->rdata->len; 1524 pj_ioqueue_recv( tr->key, &tr->rdata->op_key, 1525 tr->rdata->packet + tr->rdata->len, 1526 &size, 0); 1527 PJ_TODO(HANDLE_IMMEDIATE_DATA_1); 1528 } 1529 #endif 1530 } 1531 1532 static void transport_mgr_on_idle( pjsip_transport_mgr *mgr ) 1533 { 1534 pj_time_val now; 789 factory = mgr->factory_list.next; 790 while (factory != &mgr->factory_list) { 791 if (factory->type == type) 792 break; 793 factory = factory->next; 794 } 795 796 if (factory == &mgr->factory_list) { 797 /* No factory can create the transport! */ 798 pj_lock_release(mgr->lock); 799 return PJSIP_EUNSUPTRANSPORT; 800 } 801 802 /* Request factory to create transport. */ 803 status = factory->create_transport(factory, mgr, mgr->endpt, 804 mgr->ioqueue, remote, p_transport); 805 806 pj_lock_release(mgr->lock); 807 return status; 808 } 809 810 /** 811 * Dump transport info. 812 */ 813 PJ_DEF(void) pjsip_tpmgr_dump_transports(pjsip_tpmgr *mgr) 814 { 815 #if PJ_LOG_MAX_LEVEL >= 3 1535 816 pj_hash_iterator_t itr_val; 1536 817 pj_hash_iterator_t *itr; 1537 818 1538 1539 /* Get time for comparing transport's close time. */ 1540 pj_gettimeofday(&now); 1541 if (now.sec < mgr->next_idle_check.sec) { 1542 return; 1543 } 1544 1545 /* Acquire transport manager's lock. */ 1546 pj_mutex_lock(mgr->mutex); 1547 1548 /* Update next idle check. */ 1549 mgr->next_idle_check.sec += MGR_IDLE_CHECK_INTERVAL; 1550 1551 /* Iterate all transports, and close transports that are not used for 1552 some periods. 1553 */ 1554 itr = pjsip_transport_first(mgr, &itr_val); 1555 while (itr != NULL) { 1556 pj_hash_iterator_t *next; 1557 pjsip_transport_t *transport; 1558 1559 transport = pjsip_transport_this(mgr, itr); 1560 1561 next = pjsip_transport_next(mgr, itr); 1562 1563 if (pj_atomic_get(transport->ref_cnt)==0 && 1564 PJ_TIME_VAL_LTE(transport->close_time, now)) 1565 { 1566 destroy_transport(mgr, transport); 1567 } 1568 1569 itr = next; 1570 } 1571 1572 /* Release transport manager's lock. */ 1573 pj_mutex_unlock(mgr->mutex); 1574 } 1575 1576 static void on_ioqueue_read(pj_ioqueue_key_t *key, 1577 pj_ioqueue_op_key_t *op_key, 1578 pj_ssize_t bytes_read) 1579 { 1580 pjsip_transport_t *t; 1581 t = pj_ioqueue_get_user_data(key); 1582 1583 handle_received_data( t->mgr, t, bytes_read ); 1584 } 1585 1586 static void on_ioqueue_write(pj_ioqueue_key_t *key, 1587 pj_ioqueue_op_key_t *op_key, 1588 pj_ssize_t bytes_sent) 1589 { 1590 PJ_UNUSED_ARG(key); 1591 PJ_UNUSED_ARG(bytes_sent); 1592 1593 /* Completion of write operation. 1594 * Do nothing. 1595 */ 1596 } 1597 1598 static void on_ioqueue_accept(pj_ioqueue_key_t *key, 1599 pj_ioqueue_op_key_t *op_key, 1600 pj_sock_t newsock, 1601 int status) 1602 { 1603 #if PJ_HAS_TCP 1604 pjsip_transport_t *t; 1605 t = pj_ioqueue_get_user_data(key); 1606 1607 handle_new_connection( t->mgr, t, status ); 1608 #else 1609 PJ_UNUSED_ARG(key); 1610 PJ_UNUSED_ARG(status); 819 pj_lock_acquire(mgr->lock); 820 821 itr = pj_hash_first(mgr->table, &itr_val); 822 if (itr) { 823 PJ_LOG(3, (THIS_FILE, " Dumping transports:")); 824 825 do { 826 char src_addr[128], dst_addr[128]; 827 int src_port, dst_port; 828 pjsip_transport *t; 829 830 t = pj_hash_this(mgr->table, itr); 831 pj_native_strcpy(src_addr, pj_inet_ntoa(t->local_addr.sin_addr)); 832 src_port = pj_ntohs(t->local_addr.sin_port); 833 834 pj_native_strcpy(dst_addr, pj_inet_ntoa(t->rem_addr.sin_addr)); 835 dst_port = pj_ntohs(t->rem_addr.sin_port); 836 837 PJ_LOG(3, (THIS_FILE, " %s %s %s:%d --> %s:%d (refcnt=%d)", 838 t->type_name, 839 t->obj_name, 840 src_addr, src_port, 841 dst_addr, dst_port, 842 pj_atomic_get(t->ref_cnt))); 843 844 itr = pj_hash_next(mgr->table, itr); 845 } while (itr); 846 } 847 848 pj_lock_release(mgr->lock); 1611 849 #endif 1612 850 } 1613 851 1614 static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status)1615 {1616 #if PJ_HAS_TCP1617 pjsip_transport_t *t;1618 t = pj_ioqueue_get_user_data(key);1619 1620 handle_connect_completion( t->mgr, t, status);1621 #else1622 PJ_UNUSED_ARG(key);1623 PJ_UNUSED_ARG(status);1624 #endif1625 }1626 1627 1628 /*1629 * Poll for events.1630 */1631 PJ_DEF(int) pjsip_transport_mgr_handle_events( pjsip_transport_mgr *mgr,1632 const pj_time_val *req_timeout )1633 {1634 int event_count;1635 int break_loop;1636 int result;1637 pj_time_val timeout;1638 1639 PJ_LOG(5, (LOG_TRANSPORT_MGR, "pjsip_transport_mgr_handle_events()"));1640 1641 event_count = 0;1642 break_loop = 0;1643 timeout = *req_timeout;1644 do {1645 result = pj_ioqueue_poll( mgr->ioqueue, &timeout);1646 if (result == 1) {1647 ++event_count;1648 1649 /* Break the loop. */1650 //if (timeout.msec==0 && timeout.sec==0) {1651 break_loop = 1;1652 //}1653 1654 } else {1655 /* On idle, cleanup transport. */1656 transport_mgr_on_idle(mgr);1657 1658 break_loop = 1;1659 }1660 timeout.sec = timeout.msec = 0;1661 } while (!break_loop);1662 1663 return event_count;1664 }1665 1666 1667 PJ_DEF(pj_hash_iterator_t*) pjsip_transport_first( pjsip_transport_mgr *mgr,1668 pj_hash_iterator_t *it )1669 {1670 return pj_hash_first(mgr->transport_table, it);1671 }1672 1673 PJ_DEF(pj_hash_iterator_t*) pjsip_transport_next( pjsip_transport_mgr *mgr,1674 pj_hash_iterator_t *itr )1675 {1676 return pj_hash_next(mgr->transport_table, itr);1677 }1678 1679 PJ_DEF(pjsip_transport_t*) pjsip_transport_this( pjsip_transport_mgr *mgr,1680 pj_hash_iterator_t *itr )1681 {1682 return pj_hash_this(mgr->transport_table, itr);1683 }
Note: See TracChangeset
for help on using the changeset viewer.