Ignore:
Timestamp:
Jun 6, 2008 2:47:10 PM (16 years ago)
Author:
bennylp
Message:

Major major modifications related to ticket #485 (support for TURN-07):

  • Added STUN socket transport pj_stun_sock
  • Integration of TURN-07 to ICE
  • Major refactoring in ICE stream transport to make it simpler
  • Major modification (i.e. API change) in almost everywhere else
  • Much more elaborate STUN, TURN, and ICE tests in pjnath-test
File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjnath/src/pjnath/turn_sock.c

    r1914 r1988  
    1818 */ 
    1919#include <pjnath/turn_sock.h> 
     20#include <pj/activesock.h> 
    2021#include <pj/assert.h> 
    2122#include <pj/errno.h> 
     
    5152    int                  af; 
    5253    pj_turn_tp_type      conn_type; 
    53     pj_sock_t            sock; 
    54     pj_ioqueue_key_t    *key; 
    55     pj_ioqueue_op_key_t  read_key; 
     54    pj_activesock_t     *active_sock; 
    5655    pj_ioqueue_op_key_t  send_key; 
    57     pj_uint8_t           pkt[PJ_TURN_MAX_PKT_LEN]; 
    5856}; 
    5957 
     
    7270                                  unsigned ch_num); 
    7371static void turn_on_rx_data(pj_turn_session *sess, 
    74                             const pj_uint8_t *pkt, 
     72                            void *pkt, 
    7573                            unsigned pkt_len, 
    7674                            const pj_sockaddr_t *peer_addr, 
     
    7977                          pj_turn_state_t old_state, 
    8078                          pj_turn_state_t new_state); 
    81 static void on_read_complete(pj_ioqueue_key_t *key,  
    82                              pj_ioqueue_op_key_t *op_key,  
    83                              pj_ssize_t bytes_read); 
    84 static void on_connect_complete(pj_ioqueue_key_t *key,  
    85                                 pj_status_t status); 
     79 
     80static pj_bool_t on_data_read(pj_activesock_t *asock, 
     81                              void *data, 
     82                              pj_size_t size, 
     83                              pj_status_t status, 
     84                              pj_size_t *remainder); 
     85static pj_bool_t on_connect_complete(pj_activesock_t *asock, 
     86                                     pj_status_t status); 
     87 
    8688 
    8789 
     
    159161    sess_cb.on_state = &turn_on_state; 
    160162    status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type, 
    161                                     &sess_cb, turn_sock, 0, &turn_sock->sess); 
     163                                    &sess_cb, 0, turn_sock, &turn_sock->sess); 
    162164    if (status != PJ_SUCCESS) { 
    163165        destroy(turn_sock); 
     
    188190    } 
    189191 
    190     if (turn_sock->key) { 
    191         pj_ioqueue_unregister(turn_sock->key); 
    192         turn_sock->key = NULL; 
    193         turn_sock->sock = 0; 
    194     } else if (turn_sock->sock) { 
    195         pj_sock_close(turn_sock->sock); 
    196         turn_sock->sock = 0; 
     192    if (turn_sock->active_sock) { 
     193        pj_activesock_close(turn_sock->active_sock); 
     194        turn_sock->active_sock = NULL; 
    197195    } 
    198196 
     
    272270{ 
    273271    show_err(turn_sock, title, status); 
    274     pj_turn_session_destroy(turn_sock->sess); 
     272    if (turn_sock->sess) 
     273        pj_turn_session_destroy(turn_sock->sess); 
    275274} 
    276275 
     
    281280                                               void *user_data) 
    282281{ 
     282    PJ_ASSERT_RETURN(turn_sock, PJ_EINVAL); 
    283283    turn_sock->user_data = user_data; 
    284284    return PJ_SUCCESS; 
     
    290290PJ_DEF(void*) pj_turn_sock_get_user_data(pj_turn_sock *turn_sock) 
    291291{ 
     292    PJ_ASSERT_RETURN(turn_sock, NULL); 
    292293    return turn_sock->user_data; 
    293294} 
     
    297298 */ 
    298299PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock, 
    299                                          pj_turn_session_info *info) 
     300                                          pj_turn_session_info *info) 
    300301{ 
    301302    PJ_ASSERT_RETURN(turn_sock && info, PJ_EINVAL); 
     
    310311} 
    311312 
     313/** 
     314 * Lock the TURN socket. Application may need to call this function to 
     315 * synchronize access to other objects to avoid deadlock. 
     316 */ 
     317PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) 
     318{ 
     319    return pj_lock_acquire(turn_sock->lock); 
     320} 
     321 
     322/** 
     323 * Unlock the TURN socket. 
     324 */ 
     325PJ_DEF(pj_status_t) pj_turn_sock_unlock(pj_turn_sock *turn_sock) 
     326{ 
     327    return pj_lock_release(turn_sock->lock); 
     328} 
     329 
     330/* 
     331 * Set STUN message logging for this TURN session.  
     332 */ 
     333PJ_DEF(void) pj_turn_sock_set_log( pj_turn_sock *turn_sock, 
     334                                   unsigned flags) 
     335{ 
     336    pj_turn_session_set_log(turn_sock->sess, flags); 
     337} 
     338 
    312339/* 
    313340 * Initialize. 
    314341 */ 
    315 PJ_DEF(pj_status_t) pj_turn_sock_init(pj_turn_sock *turn_sock, 
    316                                       const pj_str_t *domain, 
    317                                       int default_port, 
    318                                       pj_dns_resolver *resolver, 
    319                                       const pj_stun_auth_cred *cred, 
    320                                       const pj_turn_alloc_param *param) 
     342PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, 
     343                                       const pj_str_t *domain, 
     344                                       int default_port, 
     345                                       pj_dns_resolver *resolver, 
     346                                       const pj_stun_auth_cred *cred, 
     347                                       const pj_turn_alloc_param *param) 
    321348{ 
    322349    pj_status_t status; 
     
    393420 * Notification when outgoing TCP socket has been connected. 
    394421 */ 
    395 static void on_connect_complete(pj_ioqueue_key_t *key,  
    396                                 pj_status_t status) 
     422static pj_bool_t on_connect_complete(pj_activesock_t *asock, 
     423                                     pj_status_t status) 
    397424{ 
    398425    pj_turn_sock *turn_sock; 
    399426 
    400     turn_sock = (pj_turn_sock*) pj_ioqueue_get_user_data(key); 
     427    turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); 
    401428 
    402429    if (status != PJ_SUCCESS) { 
    403430        sess_fail(turn_sock, "TCP connect() error", status); 
    404         return; 
     431        return PJ_FALSE; 
    405432    } 
    406433 
     
    410437 
    411438    /* Kick start pending read operation */ 
    412     pj_ioqueue_op_key_init(&turn_sock->read_key, sizeof(turn_sock->read_key)); 
    413     on_read_complete(turn_sock->key, &turn_sock->read_key, INIT); 
     439    status = pj_activesock_start_read(asock, turn_sock->pool,  
     440                                      PJ_TURN_MAX_PKT_LEN, 0); 
    414441 
    415442    /* Init send_key */ 
     
    420447    if (status != PJ_SUCCESS) { 
    421448        sess_fail(turn_sock, "Error sending ALLOCATE", status); 
    422         return; 
    423     } 
     449        return PJ_FALSE; 
     450    } 
     451 
     452    return PJ_TRUE; 
    424453} 
    425454 
     
    427456 * Notification from ioqueue when incoming UDP packet is received. 
    428457 */ 
    429 static void on_read_complete(pj_ioqueue_key_t *key,  
    430                              pj_ioqueue_op_key_t *op_key,  
    431                              pj_ssize_t bytes_read) 
    432 { 
    433     enum { MAX_RETRY = 10 }; 
     458static pj_bool_t on_data_read(pj_activesock_t *asock, 
     459                              void *data, 
     460                              pj_size_t size, 
     461                              pj_status_t status, 
     462                              pj_size_t *remainder) 
     463{ 
    434464    pj_turn_sock *turn_sock; 
    435     int retry = 0; 
    436     pj_status_t status; 
    437  
    438     turn_sock = (pj_turn_sock*) pj_ioqueue_get_user_data(key); 
     465    pj_bool_t ret = PJ_TRUE; 
     466 
     467    turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); 
    439468    pj_lock_acquire(turn_sock->lock); 
    440469 
    441     do { 
    442         if (bytes_read == INIT) { 
    443             /* Special instruction to initialize pending read() */ 
    444         } else if (bytes_read > 0 && turn_sock->sess) { 
    445             /* Report incoming packet to TURN session */ 
    446             pj_turn_session_on_rx_pkt(turn_sock->sess, turn_sock->pkt,  
    447                                       bytes_read,  
    448                                       turn_sock->conn_type == PJ_TURN_TP_UDP); 
    449         } else if (bytes_read <= 0 && turn_sock->conn_type != PJ_TURN_TP_UDP) { 
    450             sess_fail(turn_sock, "TCP connection closed", -bytes_read); 
    451             goto on_return; 
    452         } 
    453  
    454         /* Read next packet */ 
    455         bytes_read = sizeof(turn_sock->pkt); 
    456         status = pj_ioqueue_recv(turn_sock->key, op_key, 
    457                                  turn_sock->pkt, &bytes_read, 0); 
    458  
    459         if (status != PJ_EPENDING && status != PJ_SUCCESS) { 
    460             char errmsg[PJ_ERR_MSG_SIZE]; 
    461  
    462             pj_strerror(status, errmsg, sizeof(errmsg)); 
    463             sess_fail(turn_sock, "Socket recv() error", status); 
    464             goto on_return; 
    465         } 
    466  
    467     } while (status != PJ_EPENDING && status != PJ_ECANCELLED && 
    468              ++retry < MAX_RETRY); 
     470    if (status == PJ_SUCCESS && turn_sock->sess) { 
     471        /* Report incoming packet to TURN session */ 
     472        PJ_TODO(REPORT_PARSED_LEN); 
     473        pj_turn_session_on_rx_pkt(turn_sock->sess, data,  size); 
     474    } else if (status != PJ_SUCCESS &&  
     475               turn_sock->conn_type != PJ_TURN_TP_UDP)  
     476    { 
     477        sess_fail(turn_sock, "TCP connection closed", status); 
     478        ret = PJ_FALSE; 
     479        goto on_return; 
     480    } 
    469481 
    470482on_return: 
    471483    pj_lock_release(turn_sock->lock); 
     484 
     485    return ret; 
    472486} 
    473487 
     
    483497{ 
    484498    pj_turn_sock *turn_sock = (pj_turn_sock*)  
    485                            pj_turn_session_get_user_data(sess); 
     499                              pj_turn_session_get_user_data(sess); 
    486500    pj_ssize_t len = pkt_len; 
    487501    pj_status_t status; 
     
    496510    PJ_UNUSED_ARG(dst_addr_len); 
    497511 
    498     status = pj_ioqueue_send(turn_sock->key, &turn_sock->send_key,  
    499                              pkt, &len, 0); 
     512    status = pj_activesock_send(turn_sock->active_sock, &turn_sock->send_key, 
     513                                pkt, &len, 0); 
    500514    if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
    501515        show_err(turn_sock, "socket send()", status); 
     
    525539 */ 
    526540static void turn_on_rx_data(pj_turn_session *sess, 
    527                             const pj_uint8_t *pkt, 
     541                            void *pkt, 
    528542                            unsigned pkt_len, 
    529543                            const pj_sockaddr_t *peer_addr, 
     
    560574    } 
    561575 
    562     if (new_state == PJ_TURN_STATE_RESOLVED) { 
     576    /* Notify app first */ 
     577    if (turn_sock->cb.on_state) { 
     578        (*turn_sock->cb.on_state)(turn_sock, old_state, new_state); 
     579    } 
     580 
     581    /* Make sure user hasn't destroyed us in the callback */ 
     582    if (turn_sock->sess && new_state == PJ_TURN_STATE_RESOLVED) { 
     583        pj_turn_session_info info; 
     584        pj_turn_session_get_info(turn_sock->sess, &info); 
     585        new_state = info.state; 
     586    } 
     587 
     588    if (turn_sock->sess && new_state == PJ_TURN_STATE_RESOLVED) { 
    563589        /* 
    564590         * Once server has been resolved, initiate outgoing TCP 
     
    568594        char addrtxt[PJ_INET6_ADDRSTRLEN+8]; 
    569595        int sock_type; 
    570         pj_ioqueue_callback ioq_cb; 
     596        pj_sock_t sock; 
     597        pj_activesock_cb asock_cb; 
    571598 
    572599        /* Close existing connection, if any. This happens when 
     
    574601         * connection or ALLOCATE request failed. 
    575602         */ 
    576         if (turn_sock->key) { 
    577             pj_ioqueue_unregister(turn_sock->key); 
    578             turn_sock->key = NULL; 
    579             turn_sock->sock = 0; 
    580         } else if (turn_sock->sock) { 
    581             pj_sock_close(turn_sock->sock); 
    582             turn_sock->sock = 0; 
     603        if (turn_sock->active_sock) { 
     604            pj_activesock_close(turn_sock->active_sock); 
     605            turn_sock->active_sock = NULL; 
    583606        } 
    584607 
     
    592615 
    593616        /* Init socket */ 
    594         status = pj_sock_socket(turn_sock->af, sock_type, 0,  
    595                                 &turn_sock->sock); 
     617        status = pj_sock_socket(turn_sock->af, sock_type, 0, &sock); 
    596618        if (status != PJ_SUCCESS) { 
    597619            pj_turn_sock_destroy(turn_sock); 
     
    599621        } 
    600622 
    601         /* Register to ioqeuue */ 
    602         pj_bzero(&ioq_cb, sizeof(ioq_cb)); 
    603         ioq_cb.on_read_complete = &on_read_complete; 
    604         ioq_cb.on_connect_complete = &on_connect_complete; 
    605         status = pj_ioqueue_register_sock(turn_sock->pool, turn_sock->cfg.ioqueue,  
    606                                           turn_sock->sock, turn_sock,  
    607                                           &ioq_cb, &turn_sock->key); 
     623        /* Create active socket */ 
     624        pj_bzero(&asock_cb, sizeof(asock_cb)); 
     625        asock_cb.on_data_read = &on_data_read; 
     626        asock_cb.on_connect_complete = &on_connect_complete; 
     627        status = pj_activesock_create(turn_sock->pool, sock, 
     628                                      sock_type, NULL, 
     629                                      turn_sock->cfg.ioqueue, &asock_cb,  
     630                                      turn_sock, 
     631                                      &turn_sock->active_sock); 
    608632        if (status != PJ_SUCCESS) { 
    609633            pj_turn_sock_destroy(turn_sock); 
     
    617641 
    618642        /* Initiate non-blocking connect */ 
    619         status = pj_ioqueue_connect(turn_sock->key, &info.server, 
    620                                     pj_sockaddr_get_len(&info.server)); 
     643        status=pj_activesock_start_connect(turn_sock->active_sock,  
     644                                           turn_sock->pool, 
     645                                           &info.server,  
     646                                           pj_sockaddr_get_len(&info.server)); 
    621647        if (status == PJ_SUCCESS) { 
    622             on_connect_complete(turn_sock->key, PJ_SUCCESS); 
     648            on_connect_complete(turn_sock->active_sock, PJ_SUCCESS); 
    623649        } else if (status != PJ_EPENDING) { 
    624650            pj_turn_sock_destroy(turn_sock); 
     
    631657    } 
    632658 
    633     if (turn_sock->cb.on_state) { 
    634         (*turn_sock->cb.on_state)(turn_sock, old_state, new_state); 
    635     } 
    636  
    637659    if (new_state >= PJ_TURN_STATE_DESTROYING && turn_sock->sess) { 
    638660        pj_time_val delay = {0, 0}; 
Note: See TracChangeset for help on using the changeset viewer.