Ignore:
Timestamp:
May 14, 2019 9:31:39 AM (3 years ago)
Author:
nanang
Message:

Close #2197: Support TURN extensions for TCP allocations (RFC 6062).

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjnath/src/pjnath/turn_sock.c

    r5983 r5987  
    3939#define INIT    0x1FFFFFFF 
    4040 
     41enum { 
     42    DATACONN_STATE_NULL, 
     43    DATACONN_STATE_INITSOCK, 
     44    DATACONN_STATE_CONN_BINDING, 
     45    DATACONN_STATE_READY, 
     46}; 
     47 
     48/* This structure describe data connection of TURN TCP allocations 
     49 * (RFC 6062). 
     50 */ 
     51typedef struct tcp_data_conn_t 
     52{ 
     53    pj_pool_t            *pool; 
     54 
     55    pj_uint32_t          id;            /* Connection ID.               */ 
     56    int                  state;         /* Connection state.            */ 
     57    pj_sockaddr          peer_addr;     /* Peer address (mapped).       */ 
     58    unsigned             peer_addr_len; 
     59 
     60    pj_activesock_t     *asock;         /* Active socket.               */ 
     61    pj_ioqueue_op_key_t  send_key; 
     62 
     63    pj_turn_sock        *turn_sock;     /* TURN socket parent.          */ 
     64} tcp_data_conn_t; 
     65 
     66 
    4167struct pj_turn_sock 
    4268{ 
     
    6086    pj_activesock_t     *active_sock; 
    6187    pj_ioqueue_op_key_t  send_key; 
     88 
     89    /* Data connection, when peer_conn_type==PJ_TURN_TP_TCP (RFC 6062) */ 
     90    unsigned             data_conn_cnt; 
     91    tcp_data_conn_t      data_conn[PJ_TURN_MAX_TCP_CONN_CNT]; 
    6292}; 
    6393 
     
    83113                          pj_turn_state_t old_state, 
    84114                          pj_turn_state_t new_state); 
     115static void turn_on_connection_attempt(pj_turn_session *sess, 
     116                                       pj_uint32_t conn_id, 
     117                                       const pj_sockaddr_t *peer_addr, 
     118                                       unsigned addr_len); 
     119static void turn_on_connection_bind_status(pj_turn_session *sess, 
     120                                           pj_status_t status, 
     121                                           pj_uint32_t conn_id, 
     122                                           const pj_sockaddr_t *peer_addr, 
     123                                           unsigned addr_len); 
    85124 
    86125static pj_bool_t on_data_read(pj_activesock_t *asock, 
     
    92131                                     pj_status_t status); 
    93132 
    94  
     133static pj_bool_t dataconn_on_data_read(pj_activesock_t *asock, 
     134                                       void *data, 
     135                                       pj_size_t size, 
     136                                       pj_status_t status, 
     137                                       pj_size_t *remainder); 
     138static pj_bool_t dataconn_on_connect_complete(pj_activesock_t *asock, 
     139                                              pj_status_t status); 
     140static void dataconn_cleanup(tcp_data_conn_t *conn); 
    95141 
    96142static void turn_sock_on_destroy(void *comp); 
     
    194240    sess_cb.on_rx_data = &turn_on_rx_data; 
    195241    sess_cb.on_state = &turn_on_state; 
     242    sess_cb.on_connection_attempt = &turn_on_connection_attempt; 
     243    sess_cb.on_connection_bind_status = &turn_on_connection_bind_status; 
    196244    status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type, 
    197245                                    turn_sock->grp_lock, &sess_cb, 0, 
     
    225273static void destroy(pj_turn_sock *turn_sock) 
    226274{ 
     275    unsigned i; 
     276 
    227277    PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroy request, ref_cnt=%d", 
    228278              pj_grp_lock_get_ref(turn_sock->grp_lock))); 
     
    239289    if (turn_sock->active_sock) 
    240290        pj_activesock_close(turn_sock->active_sock); 
     291 
     292    for (i=0; i < PJ_TURN_MAX_TCP_CONN_CNT; ++i) { 
     293        dataconn_cleanup(&turn_sock->data_conn[i]); 
     294    } 
     295    turn_sock->data_conn_cnt = 0; 
     296 
    241297    pj_grp_lock_dec_ref(turn_sock->grp_lock); 
    242298    pj_grp_lock_release(turn_sock->grp_lock); 
     
    668724                              pj_turn_session_get_user_data(sess); 
    669725    pj_ssize_t len = pkt_len; 
    670     pj_status_t status; 
     726    pj_status_t status = PJ_SUCCESS; 
    671727 
    672728    if (turn_sock == NULL || turn_sock->is_destroying) { 
     
    681737                                      &turn_sock->send_key, pkt, &len, 0, 
    682738                                      dst_addr, dst_addr_len); 
     739    } else if (turn_sock->alloc_param.peer_conn_type == PJ_TURN_TP_TCP) { 
     740        pj_turn_session_info info; 
     741        pj_turn_session_get_info(turn_sock->sess, &info); 
     742        if (pj_sockaddr_cmp(&info.server, dst_addr) == 0) { 
     743            /* Destination address is TURN server */ 
     744            status = pj_activesock_send(turn_sock->active_sock, 
     745                                        &turn_sock->send_key, pkt, &len, 0); 
     746        } else { 
     747            /* Destination address is peer, lookup data connection */ 
     748            unsigned i; 
     749 
     750            status = PJ_ENOTFOUND; 
     751            for (i=0; i < PJ_TURN_MAX_TCP_CONN_CNT; ++i) { 
     752                tcp_data_conn_t *conn = &turn_sock->data_conn[i]; 
     753                if (conn->state < DATACONN_STATE_CONN_BINDING) 
     754                    continue; 
     755                if (pj_sockaddr_cmp(&conn->peer_addr, dst_addr) == 0) { 
     756                    status = pj_activesock_send(conn->asock, 
     757                                                &conn->send_key, 
     758                                                pkt, &len, 0); 
     759                    break; 
     760                } 
     761            } 
     762        } 
    683763    } else { 
    684764        status = pj_activesock_send(turn_sock->active_sock, 
    685765                                    &turn_sock->send_key, pkt, &len, 0); 
    686766    } 
     767 
    687768    if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
    688769        show_err(turn_sock, "socket send()", status); 
     
    721802    if (turn_sock == NULL || turn_sock->is_destroying) { 
    722803        /* We've been destroyed */ 
     804        return; 
     805    } 
     806 
     807    if (turn_sock->alloc_param.peer_conn_type != PJ_TURN_TP_UDP) { 
     808        /* Data traffic for RFC 6062 is not via TURN session */ 
    723809        return; 
    724810    } 
     
    9371023 
    9381024 
     1025static void dataconn_cleanup(tcp_data_conn_t *conn) 
     1026{ 
     1027    if (conn->asock) 
     1028        pj_activesock_close(conn->asock); 
     1029 
     1030    pj_pool_safe_release(&conn->pool); 
     1031 
     1032    pj_bzero(conn, sizeof(conn)); 
     1033} 
     1034 
     1035static pj_bool_t dataconn_on_data_read(pj_activesock_t *asock, 
     1036                                       void *data, 
     1037                                       pj_size_t size, 
     1038                                       pj_status_t status, 
     1039                                       pj_size_t *remainder) 
     1040{ 
     1041    tcp_data_conn_t *conn = (tcp_data_conn_t*) 
     1042                            pj_activesock_get_user_data(asock); 
     1043    pj_turn_sock *turn_sock = conn->turn_sock; 
     1044 
     1045    pj_grp_lock_acquire(turn_sock->grp_lock); 
     1046 
     1047    if (size == 0 && status != PJ_SUCCESS) { 
     1048        /* Connection gone, release data connection */ 
     1049        dataconn_cleanup(conn); 
     1050        --turn_sock->data_conn_cnt; 
     1051        pj_grp_lock_release(turn_sock->grp_lock); 
     1052        return PJ_FALSE; 
     1053    } 
     1054 
     1055    if (conn->state == DATACONN_STATE_READY) { 
     1056        /* Application data */ 
     1057        if (turn_sock->cb.on_rx_data) { 
     1058            (*turn_sock->cb.on_rx_data)(turn_sock, data, size, 
     1059                                        &conn->peer_addr, 
     1060                                        conn->peer_addr_len); 
     1061        } 
     1062    } else if (conn->state == DATACONN_STATE_CONN_BINDING) { 
     1063        /* Waiting for ConnectionBind response */ 
     1064        pj_bool_t is_stun; 
     1065        pj_turn_session_on_rx_pkt_param prm; 
     1066         
     1067        /* Ignore if this is not a STUN message */ 
     1068        is_stun = ((((pj_uint8_t*)data)[0] & 0xC0) == 0); 
     1069        if (!is_stun) 
     1070            goto on_return; 
     1071 
     1072        pj_bzero(&prm, sizeof(prm)); 
     1073        prm.pkt = data; 
     1074        prm.pkt_len = size; 
     1075        prm.src_addr = &conn->peer_addr; 
     1076        prm.src_addr_len = conn->peer_addr_len; 
     1077        pj_turn_session_on_rx_pkt2(conn->turn_sock->sess, &prm); 
     1078        /* Got remainder? */ 
     1079        if (prm.parsed_len < size) { 
     1080            *remainder = size - prm.parsed_len; 
     1081            if (prm.parsed_len) { 
     1082                pj_memmove(data, (pj_uint8_t*)data+prm.parsed_len, 
     1083                           *remainder); 
     1084            } 
     1085        } 
     1086    } 
     1087 
     1088on_return: 
     1089    pj_grp_lock_release(turn_sock->grp_lock); 
     1090    return PJ_TRUE; 
     1091} 
     1092 
     1093static pj_bool_t dataconn_on_connect_complete(pj_activesock_t *asock, 
     1094                                              pj_status_t status) 
     1095{ 
     1096    tcp_data_conn_t *conn = (tcp_data_conn_t*) 
     1097                            pj_activesock_get_user_data(asock); 
     1098    pj_turn_sock *turn_sock = conn->turn_sock; 
     1099 
     1100    pj_grp_lock_acquire(turn_sock->grp_lock); 
     1101 
     1102    if (status == PJ_SUCCESS) { 
     1103        status = pj_activesock_start_read(asock, turn_sock->pool, 
     1104                                          turn_sock->setting.max_pkt_size, 0); 
     1105    } 
     1106    if (status == PJ_SUCCESS) { 
     1107        conn->state = DATACONN_STATE_CONN_BINDING; 
     1108        status = pj_turn_session_connection_bind(turn_sock->sess, 
     1109                                                 conn->pool, 
     1110                                                 conn->id, 
     1111                                                 &conn->peer_addr, 
     1112                                                 conn->peer_addr_len); 
     1113    } 
     1114    if (status != PJ_SUCCESS) { 
     1115        dataconn_cleanup(conn); 
     1116        --turn_sock->data_conn_cnt; 
     1117        pj_grp_lock_release(turn_sock->grp_lock); 
     1118        return PJ_FALSE; 
     1119    } 
     1120 
     1121    pj_grp_lock_release(turn_sock->grp_lock); 
     1122    return PJ_TRUE; 
     1123} 
     1124 
     1125 
     1126static void turn_on_connection_attempt(pj_turn_session *sess, 
     1127                                       pj_uint32_t conn_id, 
     1128                                       const pj_sockaddr_t *peer_addr, 
     1129                                       unsigned addr_len) 
     1130{ 
     1131    pj_turn_sock *turn_sock = (pj_turn_sock*)  
     1132                              pj_turn_session_get_user_data(sess); 
     1133    pj_pool_t *pool; 
     1134    tcp_data_conn_t *new_conn; 
     1135    pj_turn_session_info info; 
     1136    pj_sock_t sock = PJ_INVALID_SOCKET; 
     1137    pj_activesock_cfg asock_cfg; 
     1138    pj_activesock_cb asock_cb; 
     1139    pj_sockaddr bound_addr, *cfg_bind_addr; 
     1140    pj_uint16_t max_bind_retry; 
     1141    char addrtxt[PJ_INET6_ADDRSTRLEN+8]; 
     1142    pj_status_t status; 
     1143    unsigned i; 
     1144 
     1145    PJ_ASSERT_ON_FAIL(turn_sock->conn_type == PJ_TURN_TP_TCP && 
     1146                      turn_sock->alloc_param.peer_conn_type == PJ_TURN_TP_TCP, 
     1147                      return); 
     1148 
     1149    PJ_LOG(5,(turn_sock->pool->obj_name, "Connection attempt from peer %s", 
     1150              pj_sockaddr_print(&peer_addr, addrtxt, sizeof(addrtxt), 3))); 
     1151 
     1152    if (turn_sock == NULL) { 
     1153        /* We've been destroyed */ 
     1154        return; 
     1155    } 
     1156 
     1157    pj_grp_lock_acquire(turn_sock->grp_lock); 
     1158 
     1159    if (turn_sock->data_conn_cnt == PJ_TURN_MAX_TCP_CONN_CNT) { 
     1160        /* Data connection has reached limit */ 
     1161        pj_grp_lock_release(turn_sock->grp_lock); 
     1162        return; 
     1163    } 
     1164 
     1165    /* Check if app wants to accept this connection */ 
     1166    status = PJ_SUCCESS; 
     1167    if (turn_sock->cb.on_connection_attempt) { 
     1168        status = (*turn_sock->cb.on_connection_attempt)(turn_sock, conn_id, 
     1169                                                        peer_addr, addr_len); 
     1170    } 
     1171    /* App rejects it */ 
     1172    if (status != PJ_SUCCESS) { 
     1173        pj_perror(4, turn_sock->pool->obj_name, status, 
     1174                  "Rejected connection attempt from peer %s", 
     1175                  pj_sockaddr_print(peer_addr, addrtxt, sizeof(addrtxt), 3)); 
     1176        pj_grp_lock_release(turn_sock->grp_lock); 
     1177        return; 
     1178    } 
     1179 
     1180    /* Find free data connection slot */ 
     1181    for (i=0; i < PJ_TURN_MAX_TCP_CONN_CNT; ++i) { 
     1182        if (turn_sock->data_conn[i].state == DATACONN_STATE_NULL) 
     1183            break; 
     1184    } 
     1185    pj_assert(i < turn_sock->data_conn_cnt); 
     1186    ++turn_sock->data_conn_cnt; 
     1187 
     1188    /* Init new data connection */ 
     1189    new_conn = &turn_sock->data_conn[i]; 
     1190    pj_bzero(new_conn, sizeof(*new_conn)); 
     1191    pool = pj_pool_create(turn_sock->cfg.pf, "dataconn", 128, 128, NULL); 
     1192    new_conn->pool = pool; 
     1193    new_conn->id = conn_id; 
     1194    new_conn->turn_sock = turn_sock; 
     1195    pj_sockaddr_cp(&new_conn->peer_addr, peer_addr); 
     1196    new_conn->peer_addr_len = addr_len; 
     1197    pj_ioqueue_op_key_init(&new_conn->send_key, sizeof(new_conn->send_key)); 
     1198    new_conn->state = DATACONN_STATE_INITSOCK; 
     1199 
     1200    /* Init socket */ 
     1201    status = pj_sock_socket(turn_sock->af, pj_SOCK_STREAM(), 0, &sock); 
     1202    if (status != PJ_SUCCESS) 
     1203        goto on_return; 
     1204 
     1205    /* Bind socket */ 
     1206    cfg_bind_addr = &turn_sock->setting.bound_addr; 
     1207    max_bind_retry = MAX_BIND_RETRY; 
     1208    if (turn_sock->setting.port_range && 
     1209        turn_sock->setting.port_range < max_bind_retry) 
     1210    { 
     1211        max_bind_retry = turn_sock->setting.port_range; 
     1212    } 
     1213    pj_sockaddr_init(turn_sock->af, &bound_addr, NULL, 0); 
     1214    if (cfg_bind_addr->addr.sa_family == pj_AF_INET() || 
     1215        cfg_bind_addr->addr.sa_family == pj_AF_INET6()) 
     1216    { 
     1217        pj_sockaddr_cp(&bound_addr, cfg_bind_addr); 
     1218    } 
     1219    status = pj_sock_bind_random(sock, &bound_addr, 
     1220                                 turn_sock->setting.port_range, 
     1221                                 max_bind_retry); 
     1222    if (status != PJ_SUCCESS) 
     1223        goto on_return; 
     1224 
     1225    /* Apply socket buffer size */ 
     1226    if (turn_sock->setting.so_rcvbuf_size > 0) { 
     1227        unsigned sobuf_size = turn_sock->setting.so_rcvbuf_size; 
     1228        status = pj_sock_setsockopt_sobuf(sock, pj_SO_RCVBUF(), PJ_TRUE, 
     1229                                          &sobuf_size); 
     1230        if (status != PJ_SUCCESS) { 
     1231            pj_perror(3, turn_sock->obj_name, status, 
     1232                      "Failed setting SO_RCVBUF"); 
     1233        } else { 
     1234            if (sobuf_size < turn_sock->setting.so_rcvbuf_size) { 
     1235                PJ_LOG(4, (turn_sock->obj_name, 
     1236                           "Warning! Cannot set SO_RCVBUF as configured," 
     1237                           " now=%d, configured=%d", sobuf_size, 
     1238                           turn_sock->setting.so_rcvbuf_size)); 
     1239            } else { 
     1240                PJ_LOG(5, (turn_sock->obj_name, "SO_RCVBUF set to %d", 
     1241                           sobuf_size)); 
     1242            } 
     1243        } 
     1244    } 
     1245    if (turn_sock->setting.so_sndbuf_size > 0) { 
     1246        unsigned sobuf_size = turn_sock->setting.so_sndbuf_size; 
     1247        status = pj_sock_setsockopt_sobuf(sock, pj_SO_SNDBUF(), PJ_TRUE, 
     1248                                          &sobuf_size); 
     1249        if (status != PJ_SUCCESS) { 
     1250            pj_perror(3, turn_sock->obj_name, status, 
     1251                      "Failed setting SO_SNDBUF"); 
     1252        } else { 
     1253            if (sobuf_size < turn_sock->setting.so_sndbuf_size) { 
     1254                PJ_LOG(4, (turn_sock->obj_name, 
     1255                           "Warning! Cannot set SO_SNDBUF as configured," 
     1256                           " now=%d, configured=%d", sobuf_size, 
     1257                           turn_sock->setting.so_sndbuf_size)); 
     1258            } else { 
     1259                PJ_LOG(5, (turn_sock->obj_name, "SO_SNDBUF set to %d", 
     1260                           sobuf_size)); 
     1261            } 
     1262        } 
     1263    } 
     1264 
     1265    /* Create active socket */ 
     1266    pj_activesock_cfg_default(&asock_cfg); 
     1267    asock_cfg.grp_lock = turn_sock->grp_lock; 
     1268 
     1269    pj_bzero(&asock_cb, sizeof(asock_cb)); 
     1270    asock_cb.on_data_read = &dataconn_on_data_read; 
     1271    asock_cb.on_connect_complete = &dataconn_on_connect_complete; 
     1272    status = pj_activesock_create(pool, sock, 
     1273                                  pj_SOCK_STREAM(), &asock_cfg, 
     1274                                  turn_sock->cfg.ioqueue, &asock_cb, 
     1275                                  new_conn, &new_conn->asock); 
     1276    if (status != PJ_SUCCESS) 
     1277        goto on_return; 
     1278 
     1279    /* Connect to TURN server for data connection */ 
     1280    pj_turn_session_get_info(turn_sock->sess, &info); 
     1281    status = pj_activesock_start_connect(new_conn->asock, 
     1282                                         pool, 
     1283                                         &info.server, 
     1284                                         pj_sockaddr_get_len(&info.server)); 
     1285    if (status == PJ_SUCCESS) { 
     1286        dataconn_on_connect_complete(new_conn->asock, PJ_SUCCESS); 
     1287        pj_grp_lock_release(turn_sock->grp_lock); 
     1288        return; 
     1289    } 
     1290 
     1291on_return: 
     1292    if (status == PJ_EPENDING) { 
     1293        PJ_LOG(5,(pool->obj_name, 
     1294                  "Accepting connection from peer %s", 
     1295                  pj_sockaddr_print(peer_addr, addrtxt, sizeof(addrtxt), 3))); 
     1296    } else { 
     1297        /* not PJ_SUCCESS */ 
     1298        pj_perror(4, pool->obj_name, status, 
     1299                  "Failed in accepting connection from peer %s", 
     1300                  pj_sockaddr_print(peer_addr, addrtxt, sizeof(addrtxt), 3)); 
     1301 
     1302        if (!new_conn->asock && sock != PJ_INVALID_SOCKET) 
     1303            pj_sock_close(sock);     
     1304 
     1305        dataconn_cleanup(new_conn); 
     1306        --turn_sock->data_conn_cnt; 
     1307 
     1308        /* Notify app for failure */ 
     1309        if (turn_sock->cb.on_connection_status) { 
     1310            (*turn_sock->cb.on_connection_status)(turn_sock, status, conn_id, 
     1311                                                  peer_addr, addr_len); 
     1312        } 
     1313    } 
     1314    pj_grp_lock_release(turn_sock->grp_lock); 
     1315} 
     1316 
     1317static void turn_on_connection_bind_status(pj_turn_session *sess, 
     1318                                           pj_status_t status, 
     1319                                           pj_uint32_t conn_id, 
     1320                                           const pj_sockaddr_t *peer_addr, 
     1321                                           unsigned addr_len) 
     1322{ 
     1323    pj_turn_sock *turn_sock = (pj_turn_sock*)  
     1324                              pj_turn_session_get_user_data(sess); 
     1325    tcp_data_conn_t *conn = NULL; 
     1326    unsigned i; 
     1327 
     1328    pj_grp_lock_acquire(turn_sock->grp_lock); 
     1329 
     1330    for (i=0; i < PJ_TURN_MAX_TCP_CONN_CNT; ++i) { 
     1331        tcp_data_conn_t *c = &turn_sock->data_conn[i]; 
     1332        if (c->id == conn_id && 
     1333            pj_sockaddr_cmp(peer_addr, &c->peer_addr) == 0) 
     1334        { 
     1335            conn = c; 
     1336            break; 
     1337        } 
     1338    } 
     1339    if (!conn) { 
     1340        PJ_LOG(5,(turn_sock->pool->obj_name, 
     1341                  "Warning: stray connection bind event")); 
     1342        pj_grp_lock_release(turn_sock->grp_lock); 
     1343        return; 
     1344    } 
     1345 
     1346    if (status == PJ_SUCCESS) { 
     1347        conn->state = DATACONN_STATE_READY; 
     1348    } else { 
     1349        dataconn_cleanup(conn); 
     1350        --turn_sock->data_conn_cnt; 
     1351    } 
     1352 
     1353    pj_grp_lock_release(turn_sock->grp_lock); 
     1354 
     1355    if (turn_sock->cb.on_connection_status) { 
     1356        (*turn_sock->cb.on_connection_status)(turn_sock, status, conn_id, 
     1357                                              peer_addr, addr_len); 
     1358    } 
     1359} 
Note: See TracChangeset for help on using the changeset viewer.