Changeset 5987 for pjproject/trunk/pjnath/src/pjnath/turn_sock.c
- Timestamp:
- May 14, 2019 9:31:39 AM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjnath/src/pjnath/turn_sock.c
r5983 r5987 39 39 #define INIT 0x1FFFFFFF 40 40 41 enum { 42 DATACONN_STATE_NULL, 43 DATACONN_STATE_INITSOCK, 44 DATACONN_STATE_CONN_BINDING, 45 DATACONN_STATE_READY, 46 }; 47 48 /* This structure describe data connection of TURN TCP allocations 49 * (RFC 6062). 50 */ 51 typedef struct tcp_data_conn_t 52 { 53 pj_pool_t *pool; 54 55 pj_uint32_t id; /* Connection ID. */ 56 int state; /* Connection state. */ 57 pj_sockaddr peer_addr; /* Peer address (mapped). */ 58 unsigned peer_addr_len; 59 60 pj_activesock_t *asock; /* Active socket. */ 61 pj_ioqueue_op_key_t send_key; 62 63 pj_turn_sock *turn_sock; /* TURN socket parent. */ 64 } tcp_data_conn_t; 65 66 41 67 struct pj_turn_sock 42 68 { … … 60 86 pj_activesock_t *active_sock; 61 87 pj_ioqueue_op_key_t send_key; 88 89 /* Data connection, when peer_conn_type==PJ_TURN_TP_TCP (RFC 6062) */ 90 unsigned data_conn_cnt; 91 tcp_data_conn_t data_conn[PJ_TURN_MAX_TCP_CONN_CNT]; 62 92 }; 63 93 … … 83 113 pj_turn_state_t old_state, 84 114 pj_turn_state_t new_state); 115 static void turn_on_connection_attempt(pj_turn_session *sess, 116 pj_uint32_t conn_id, 117 const pj_sockaddr_t *peer_addr, 118 unsigned addr_len); 119 static void turn_on_connection_bind_status(pj_turn_session *sess, 120 pj_status_t status, 121 pj_uint32_t conn_id, 122 const pj_sockaddr_t *peer_addr, 123 unsigned addr_len); 85 124 86 125 static pj_bool_t on_data_read(pj_activesock_t *asock, … … 92 131 pj_status_t status); 93 132 94 133 static pj_bool_t dataconn_on_data_read(pj_activesock_t *asock, 134 void *data, 135 pj_size_t size, 136 pj_status_t status, 137 pj_size_t *remainder); 138 static pj_bool_t dataconn_on_connect_complete(pj_activesock_t *asock, 139 pj_status_t status); 140 static void dataconn_cleanup(tcp_data_conn_t *conn); 95 141 96 142 static void turn_sock_on_destroy(void *comp); … … 194 240 sess_cb.on_rx_data = &turn_on_rx_data; 195 241 sess_cb.on_state = &turn_on_state; 242 sess_cb.on_connection_attempt = &turn_on_connection_attempt; 243 sess_cb.on_connection_bind_status = &turn_on_connection_bind_status; 196 244 status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type, 197 245 turn_sock->grp_lock, &sess_cb, 0, … … 225 273 static void destroy(pj_turn_sock *turn_sock) 226 274 { 275 unsigned i; 276 227 277 PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroy request, ref_cnt=%d", 228 278 pj_grp_lock_get_ref(turn_sock->grp_lock))); … … 239 289 if (turn_sock->active_sock) 240 290 pj_activesock_close(turn_sock->active_sock); 291 292 for (i=0; i < PJ_TURN_MAX_TCP_CONN_CNT; ++i) { 293 dataconn_cleanup(&turn_sock->data_conn[i]); 294 } 295 turn_sock->data_conn_cnt = 0; 296 241 297 pj_grp_lock_dec_ref(turn_sock->grp_lock); 242 298 pj_grp_lock_release(turn_sock->grp_lock); … … 668 724 pj_turn_session_get_user_data(sess); 669 725 pj_ssize_t len = pkt_len; 670 pj_status_t status ;726 pj_status_t status = PJ_SUCCESS; 671 727 672 728 if (turn_sock == NULL || turn_sock->is_destroying) { … … 681 737 &turn_sock->send_key, pkt, &len, 0, 682 738 dst_addr, dst_addr_len); 739 } else if (turn_sock->alloc_param.peer_conn_type == PJ_TURN_TP_TCP) { 740 pj_turn_session_info info; 741 pj_turn_session_get_info(turn_sock->sess, &info); 742 if (pj_sockaddr_cmp(&info.server, dst_addr) == 0) { 743 /* Destination address is TURN server */ 744 status = pj_activesock_send(turn_sock->active_sock, 745 &turn_sock->send_key, pkt, &len, 0); 746 } else { 747 /* Destination address is peer, lookup data connection */ 748 unsigned i; 749 750 status = PJ_ENOTFOUND; 751 for (i=0; i < PJ_TURN_MAX_TCP_CONN_CNT; ++i) { 752 tcp_data_conn_t *conn = &turn_sock->data_conn[i]; 753 if (conn->state < DATACONN_STATE_CONN_BINDING) 754 continue; 755 if (pj_sockaddr_cmp(&conn->peer_addr, dst_addr) == 0) { 756 status = pj_activesock_send(conn->asock, 757 &conn->send_key, 758 pkt, &len, 0); 759 break; 760 } 761 } 762 } 683 763 } else { 684 764 status = pj_activesock_send(turn_sock->active_sock, 685 765 &turn_sock->send_key, pkt, &len, 0); 686 766 } 767 687 768 if (status != PJ_SUCCESS && status != PJ_EPENDING) { 688 769 show_err(turn_sock, "socket send()", status); … … 721 802 if (turn_sock == NULL || turn_sock->is_destroying) { 722 803 /* We've been destroyed */ 804 return; 805 } 806 807 if (turn_sock->alloc_param.peer_conn_type != PJ_TURN_TP_UDP) { 808 /* Data traffic for RFC 6062 is not via TURN session */ 723 809 return; 724 810 } … … 937 1023 938 1024 1025 static void dataconn_cleanup(tcp_data_conn_t *conn) 1026 { 1027 if (conn->asock) 1028 pj_activesock_close(conn->asock); 1029 1030 pj_pool_safe_release(&conn->pool); 1031 1032 pj_bzero(conn, sizeof(conn)); 1033 } 1034 1035 static pj_bool_t dataconn_on_data_read(pj_activesock_t *asock, 1036 void *data, 1037 pj_size_t size, 1038 pj_status_t status, 1039 pj_size_t *remainder) 1040 { 1041 tcp_data_conn_t *conn = (tcp_data_conn_t*) 1042 pj_activesock_get_user_data(asock); 1043 pj_turn_sock *turn_sock = conn->turn_sock; 1044 1045 pj_grp_lock_acquire(turn_sock->grp_lock); 1046 1047 if (size == 0 && status != PJ_SUCCESS) { 1048 /* Connection gone, release data connection */ 1049 dataconn_cleanup(conn); 1050 --turn_sock->data_conn_cnt; 1051 pj_grp_lock_release(turn_sock->grp_lock); 1052 return PJ_FALSE; 1053 } 1054 1055 if (conn->state == DATACONN_STATE_READY) { 1056 /* Application data */ 1057 if (turn_sock->cb.on_rx_data) { 1058 (*turn_sock->cb.on_rx_data)(turn_sock, data, size, 1059 &conn->peer_addr, 1060 conn->peer_addr_len); 1061 } 1062 } else if (conn->state == DATACONN_STATE_CONN_BINDING) { 1063 /* Waiting for ConnectionBind response */ 1064 pj_bool_t is_stun; 1065 pj_turn_session_on_rx_pkt_param prm; 1066 1067 /* Ignore if this is not a STUN message */ 1068 is_stun = ((((pj_uint8_t*)data)[0] & 0xC0) == 0); 1069 if (!is_stun) 1070 goto on_return; 1071 1072 pj_bzero(&prm, sizeof(prm)); 1073 prm.pkt = data; 1074 prm.pkt_len = size; 1075 prm.src_addr = &conn->peer_addr; 1076 prm.src_addr_len = conn->peer_addr_len; 1077 pj_turn_session_on_rx_pkt2(conn->turn_sock->sess, &prm); 1078 /* Got remainder? */ 1079 if (prm.parsed_len < size) { 1080 *remainder = size - prm.parsed_len; 1081 if (prm.parsed_len) { 1082 pj_memmove(data, (pj_uint8_t*)data+prm.parsed_len, 1083 *remainder); 1084 } 1085 } 1086 } 1087 1088 on_return: 1089 pj_grp_lock_release(turn_sock->grp_lock); 1090 return PJ_TRUE; 1091 } 1092 1093 static pj_bool_t dataconn_on_connect_complete(pj_activesock_t *asock, 1094 pj_status_t status) 1095 { 1096 tcp_data_conn_t *conn = (tcp_data_conn_t*) 1097 pj_activesock_get_user_data(asock); 1098 pj_turn_sock *turn_sock = conn->turn_sock; 1099 1100 pj_grp_lock_acquire(turn_sock->grp_lock); 1101 1102 if (status == PJ_SUCCESS) { 1103 status = pj_activesock_start_read(asock, turn_sock->pool, 1104 turn_sock->setting.max_pkt_size, 0); 1105 } 1106 if (status == PJ_SUCCESS) { 1107 conn->state = DATACONN_STATE_CONN_BINDING; 1108 status = pj_turn_session_connection_bind(turn_sock->sess, 1109 conn->pool, 1110 conn->id, 1111 &conn->peer_addr, 1112 conn->peer_addr_len); 1113 } 1114 if (status != PJ_SUCCESS) { 1115 dataconn_cleanup(conn); 1116 --turn_sock->data_conn_cnt; 1117 pj_grp_lock_release(turn_sock->grp_lock); 1118 return PJ_FALSE; 1119 } 1120 1121 pj_grp_lock_release(turn_sock->grp_lock); 1122 return PJ_TRUE; 1123 } 1124 1125 1126 static void turn_on_connection_attempt(pj_turn_session *sess, 1127 pj_uint32_t conn_id, 1128 const pj_sockaddr_t *peer_addr, 1129 unsigned addr_len) 1130 { 1131 pj_turn_sock *turn_sock = (pj_turn_sock*) 1132 pj_turn_session_get_user_data(sess); 1133 pj_pool_t *pool; 1134 tcp_data_conn_t *new_conn; 1135 pj_turn_session_info info; 1136 pj_sock_t sock = PJ_INVALID_SOCKET; 1137 pj_activesock_cfg asock_cfg; 1138 pj_activesock_cb asock_cb; 1139 pj_sockaddr bound_addr, *cfg_bind_addr; 1140 pj_uint16_t max_bind_retry; 1141 char addrtxt[PJ_INET6_ADDRSTRLEN+8]; 1142 pj_status_t status; 1143 unsigned i; 1144 1145 PJ_ASSERT_ON_FAIL(turn_sock->conn_type == PJ_TURN_TP_TCP && 1146 turn_sock->alloc_param.peer_conn_type == PJ_TURN_TP_TCP, 1147 return); 1148 1149 PJ_LOG(5,(turn_sock->pool->obj_name, "Connection attempt from peer %s", 1150 pj_sockaddr_print(&peer_addr, addrtxt, sizeof(addrtxt), 3))); 1151 1152 if (turn_sock == NULL) { 1153 /* We've been destroyed */ 1154 return; 1155 } 1156 1157 pj_grp_lock_acquire(turn_sock->grp_lock); 1158 1159 if (turn_sock->data_conn_cnt == PJ_TURN_MAX_TCP_CONN_CNT) { 1160 /* Data connection has reached limit */ 1161 pj_grp_lock_release(turn_sock->grp_lock); 1162 return; 1163 } 1164 1165 /* Check if app wants to accept this connection */ 1166 status = PJ_SUCCESS; 1167 if (turn_sock->cb.on_connection_attempt) { 1168 status = (*turn_sock->cb.on_connection_attempt)(turn_sock, conn_id, 1169 peer_addr, addr_len); 1170 } 1171 /* App rejects it */ 1172 if (status != PJ_SUCCESS) { 1173 pj_perror(4, turn_sock->pool->obj_name, status, 1174 "Rejected connection attempt from peer %s", 1175 pj_sockaddr_print(peer_addr, addrtxt, sizeof(addrtxt), 3)); 1176 pj_grp_lock_release(turn_sock->grp_lock); 1177 return; 1178 } 1179 1180 /* Find free data connection slot */ 1181 for (i=0; i < PJ_TURN_MAX_TCP_CONN_CNT; ++i) { 1182 if (turn_sock->data_conn[i].state == DATACONN_STATE_NULL) 1183 break; 1184 } 1185 pj_assert(i < turn_sock->data_conn_cnt); 1186 ++turn_sock->data_conn_cnt; 1187 1188 /* Init new data connection */ 1189 new_conn = &turn_sock->data_conn[i]; 1190 pj_bzero(new_conn, sizeof(*new_conn)); 1191 pool = pj_pool_create(turn_sock->cfg.pf, "dataconn", 128, 128, NULL); 1192 new_conn->pool = pool; 1193 new_conn->id = conn_id; 1194 new_conn->turn_sock = turn_sock; 1195 pj_sockaddr_cp(&new_conn->peer_addr, peer_addr); 1196 new_conn->peer_addr_len = addr_len; 1197 pj_ioqueue_op_key_init(&new_conn->send_key, sizeof(new_conn->send_key)); 1198 new_conn->state = DATACONN_STATE_INITSOCK; 1199 1200 /* Init socket */ 1201 status = pj_sock_socket(turn_sock->af, pj_SOCK_STREAM(), 0, &sock); 1202 if (status != PJ_SUCCESS) 1203 goto on_return; 1204 1205 /* Bind socket */ 1206 cfg_bind_addr = &turn_sock->setting.bound_addr; 1207 max_bind_retry = MAX_BIND_RETRY; 1208 if (turn_sock->setting.port_range && 1209 turn_sock->setting.port_range < max_bind_retry) 1210 { 1211 max_bind_retry = turn_sock->setting.port_range; 1212 } 1213 pj_sockaddr_init(turn_sock->af, &bound_addr, NULL, 0); 1214 if (cfg_bind_addr->addr.sa_family == pj_AF_INET() || 1215 cfg_bind_addr->addr.sa_family == pj_AF_INET6()) 1216 { 1217 pj_sockaddr_cp(&bound_addr, cfg_bind_addr); 1218 } 1219 status = pj_sock_bind_random(sock, &bound_addr, 1220 turn_sock->setting.port_range, 1221 max_bind_retry); 1222 if (status != PJ_SUCCESS) 1223 goto on_return; 1224 1225 /* Apply socket buffer size */ 1226 if (turn_sock->setting.so_rcvbuf_size > 0) { 1227 unsigned sobuf_size = turn_sock->setting.so_rcvbuf_size; 1228 status = pj_sock_setsockopt_sobuf(sock, pj_SO_RCVBUF(), PJ_TRUE, 1229 &sobuf_size); 1230 if (status != PJ_SUCCESS) { 1231 pj_perror(3, turn_sock->obj_name, status, 1232 "Failed setting SO_RCVBUF"); 1233 } else { 1234 if (sobuf_size < turn_sock->setting.so_rcvbuf_size) { 1235 PJ_LOG(4, (turn_sock->obj_name, 1236 "Warning! Cannot set SO_RCVBUF as configured," 1237 " now=%d, configured=%d", sobuf_size, 1238 turn_sock->setting.so_rcvbuf_size)); 1239 } else { 1240 PJ_LOG(5, (turn_sock->obj_name, "SO_RCVBUF set to %d", 1241 sobuf_size)); 1242 } 1243 } 1244 } 1245 if (turn_sock->setting.so_sndbuf_size > 0) { 1246 unsigned sobuf_size = turn_sock->setting.so_sndbuf_size; 1247 status = pj_sock_setsockopt_sobuf(sock, pj_SO_SNDBUF(), PJ_TRUE, 1248 &sobuf_size); 1249 if (status != PJ_SUCCESS) { 1250 pj_perror(3, turn_sock->obj_name, status, 1251 "Failed setting SO_SNDBUF"); 1252 } else { 1253 if (sobuf_size < turn_sock->setting.so_sndbuf_size) { 1254 PJ_LOG(4, (turn_sock->obj_name, 1255 "Warning! Cannot set SO_SNDBUF as configured," 1256 " now=%d, configured=%d", sobuf_size, 1257 turn_sock->setting.so_sndbuf_size)); 1258 } else { 1259 PJ_LOG(5, (turn_sock->obj_name, "SO_SNDBUF set to %d", 1260 sobuf_size)); 1261 } 1262 } 1263 } 1264 1265 /* Create active socket */ 1266 pj_activesock_cfg_default(&asock_cfg); 1267 asock_cfg.grp_lock = turn_sock->grp_lock; 1268 1269 pj_bzero(&asock_cb, sizeof(asock_cb)); 1270 asock_cb.on_data_read = &dataconn_on_data_read; 1271 asock_cb.on_connect_complete = &dataconn_on_connect_complete; 1272 status = pj_activesock_create(pool, sock, 1273 pj_SOCK_STREAM(), &asock_cfg, 1274 turn_sock->cfg.ioqueue, &asock_cb, 1275 new_conn, &new_conn->asock); 1276 if (status != PJ_SUCCESS) 1277 goto on_return; 1278 1279 /* Connect to TURN server for data connection */ 1280 pj_turn_session_get_info(turn_sock->sess, &info); 1281 status = pj_activesock_start_connect(new_conn->asock, 1282 pool, 1283 &info.server, 1284 pj_sockaddr_get_len(&info.server)); 1285 if (status == PJ_SUCCESS) { 1286 dataconn_on_connect_complete(new_conn->asock, PJ_SUCCESS); 1287 pj_grp_lock_release(turn_sock->grp_lock); 1288 return; 1289 } 1290 1291 on_return: 1292 if (status == PJ_EPENDING) { 1293 PJ_LOG(5,(pool->obj_name, 1294 "Accepting connection from peer %s", 1295 pj_sockaddr_print(peer_addr, addrtxt, sizeof(addrtxt), 3))); 1296 } else { 1297 /* not PJ_SUCCESS */ 1298 pj_perror(4, pool->obj_name, status, 1299 "Failed in accepting connection from peer %s", 1300 pj_sockaddr_print(peer_addr, addrtxt, sizeof(addrtxt), 3)); 1301 1302 if (!new_conn->asock && sock != PJ_INVALID_SOCKET) 1303 pj_sock_close(sock); 1304 1305 dataconn_cleanup(new_conn); 1306 --turn_sock->data_conn_cnt; 1307 1308 /* Notify app for failure */ 1309 if (turn_sock->cb.on_connection_status) { 1310 (*turn_sock->cb.on_connection_status)(turn_sock, status, conn_id, 1311 peer_addr, addr_len); 1312 } 1313 } 1314 pj_grp_lock_release(turn_sock->grp_lock); 1315 } 1316 1317 static void turn_on_connection_bind_status(pj_turn_session *sess, 1318 pj_status_t status, 1319 pj_uint32_t conn_id, 1320 const pj_sockaddr_t *peer_addr, 1321 unsigned addr_len) 1322 { 1323 pj_turn_sock *turn_sock = (pj_turn_sock*) 1324 pj_turn_session_get_user_data(sess); 1325 tcp_data_conn_t *conn = NULL; 1326 unsigned i; 1327 1328 pj_grp_lock_acquire(turn_sock->grp_lock); 1329 1330 for (i=0; i < PJ_TURN_MAX_TCP_CONN_CNT; ++i) { 1331 tcp_data_conn_t *c = &turn_sock->data_conn[i]; 1332 if (c->id == conn_id && 1333 pj_sockaddr_cmp(peer_addr, &c->peer_addr) == 0) 1334 { 1335 conn = c; 1336 break; 1337 } 1338 } 1339 if (!conn) { 1340 PJ_LOG(5,(turn_sock->pool->obj_name, 1341 "Warning: stray connection bind event")); 1342 pj_grp_lock_release(turn_sock->grp_lock); 1343 return; 1344 } 1345 1346 if (status == PJ_SUCCESS) { 1347 conn->state = DATACONN_STATE_READY; 1348 } else { 1349 dataconn_cleanup(conn); 1350 --turn_sock->data_conn_cnt; 1351 } 1352 1353 pj_grp_lock_release(turn_sock->grp_lock); 1354 1355 if (turn_sock->cb.on_connection_status) { 1356 (*turn_sock->cb.on_connection_status)(turn_sock, status, conn_id, 1357 peer_addr, addr_len); 1358 } 1359 }
Note: See TracChangeset
for help on using the changeset viewer.