Changeset 6071 for pjproject/trunk/pjnath/src/pjnath/ice_strans.c
- Timestamp:
- Sep 23, 2019 7:25:41 AM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjnath/src/pjnath/ice_strans.c
r6048 r6071 128 128 const pj_sockaddr_t *peer_addr, 129 129 unsigned addr_len); 130 static pj_bool_t turn_on_data_sent(pj_turn_sock *turn_sock, 131 pj_ssize_t sent); 130 132 static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, 131 133 pj_turn_state_t new_state); … … 134 136 135 137 /* Forward decls */ 138 static pj_bool_t on_data_sent(pj_ice_strans *ice_st, pj_ssize_t sent); 136 139 static void ice_st_on_destroy(void *obj); 137 140 static void destroy_ice_st(pj_ice_strans *ice_st); … … 179 182 180 183 184 /* Pending send buffer */ 185 typedef struct pending_send 186 { 187 void *buffer; 188 unsigned comp_id; 189 pj_size_t data_len; 190 pj_sockaddr dst_addr; 191 int dst_addr_len; 192 } pending_send; 193 181 194 /** 182 195 * This structure represents the ICE stream transport. … … 185 198 { 186 199 char *obj_name; /**< Log ID. */ 200 pj_pool_factory *pf; /**< Pool factory. */ 187 201 pj_pool_t *pool; /**< Pool used by this object. */ 188 202 void *user_data; /**< Application data. */ … … 198 212 pj_ice_strans_comp **comp; /**< Components array. */ 199 213 214 pj_pool_t *buf_pool; /**< Pool for buffers. */ 215 unsigned num_buf; /**< Number of buffers. */ 216 unsigned buf_idx; /**< Index of buffer. */ 217 unsigned empty_idx; /**< Index of empty buffer. */ 218 unsigned buf_size; /**< Buffer size. */ 219 pending_send *send_buf; /**< Send buffers. */ 220 pj_bool_t is_pending;/**< Any pending send? */ 221 200 222 pj_timer_entry ka_timer; /**< STUN keep-alive timer. */ 201 223 202 224 pj_bool_t destroy_req;/**< Destroy has been called? */ 203 225 pj_bool_t cb_called; /**< Init error callback called?*/ 226 pj_bool_t call_send_cb;/**< Need to call send cb? */ 204 227 }; 205 228 … … 242 265 pj_ice_strans_turn_cfg_default(&cfg->turn); 243 266 pj_ice_sess_options_default(&cfg->opt); 267 268 cfg->num_send_buf = 4; 244 269 } 245 270 … … 368 393 pj_bzero(&turn_sock_cb, sizeof(turn_sock_cb)); 369 394 turn_sock_cb.on_rx_data = &turn_on_rx_data; 395 turn_sock_cb.on_data_sent = &turn_on_data_sent; 370 396 turn_sock_cb.on_state = &turn_on_state; 371 397 … … 786 812 } 787 813 814 static pj_status_t alloc_send_buf(pj_ice_strans *ice_st, unsigned buf_size) 815 { 816 if (buf_size > ice_st->buf_size) { 817 unsigned i; 818 819 if (ice_st->is_pending) { 820 /* The current buffer is insufficient, but still currently used.*/ 821 return PJ_EBUSY; 822 } 823 824 pj_pool_safe_release(&ice_st->buf_pool); 825 826 ice_st->buf_pool = pj_pool_create(ice_st->pf, "ice_buf", 827 (buf_size + sizeof(pending_send)) * 828 ice_st->num_buf, 512, NULL); 829 if (!ice_st->buf_pool) 830 return PJ_ENOMEM; 831 832 ice_st->buf_size = buf_size; 833 ice_st->send_buf = pj_pool_calloc(ice_st->buf_pool, ice_st->num_buf, 834 sizeof(pending_send)); 835 for (i = 0; i < ice_st->num_buf; i++) { 836 ice_st->send_buf[i].buffer = pj_pool_alloc(ice_st->buf_pool, 837 buf_size); 838 } 839 ice_st->buf_idx = ice_st->empty_idx = 0; 840 } 841 842 return PJ_SUCCESS; 843 } 788 844 789 845 /* … … 816 872 ice_st = PJ_POOL_ZALLOC_T(pool, pj_ice_strans); 817 873 ice_st->pool = pool; 874 ice_st->pf = cfg->stun_cfg.pf; 818 875 ice_st->obj_name = pool->obj_name; 819 876 ice_st->user_data = user_data; … … 827 884 if (status != PJ_SUCCESS) { 828 885 pj_pool_release(pool); 886 pj_log_pop_indent(); 887 return status; 888 } 889 890 /* Allocate send buffer */ 891 ice_st->num_buf = cfg->num_send_buf; 892 status = alloc_send_buf(ice_st, cfg->send_buf_size); 893 if (status != PJ_SUCCESS) { 894 destroy_ice_st(ice_st); 829 895 pj_log_pop_indent(); 830 896 return status; … … 902 968 903 969 /* Done */ 904 pj_pool_release(ice_st->pool); 970 pj_pool_safe_release(&ice_st->buf_pool); 971 pj_pool_safe_release(&ice_st->pool); 905 972 } 906 973 … … 1464 1531 1465 1532 /* Protect with group lock, since this may cause race condition with 1466 * pj_ice_strans_sendto ().1533 * pj_ice_strans_sendto2(). 1467 1534 * See ticket #1877. 1468 1535 */ … … 1481 1548 } 1482 1549 1550 static pj_status_t use_buffer( pj_ice_strans *ice_st, 1551 unsigned comp_id, 1552 const void *data, 1553 pj_size_t data_len, 1554 const pj_sockaddr_t *dst_addr, 1555 int dst_addr_len, 1556 void **buffer ) 1557 { 1558 unsigned idx; 1559 pj_status_t status; 1560 1561 /* Allocate send buffer, if necessary. */ 1562 status = alloc_send_buf(ice_st, data_len); 1563 if (status != PJ_SUCCESS) 1564 return status; 1565 1566 if (ice_st->is_pending && ice_st->empty_idx == ice_st->buf_idx) { 1567 /* We don't use buffer or there's no more empty buffer. */ 1568 return PJ_EBUSY; 1569 } 1570 1571 idx = ice_st->empty_idx; 1572 ice_st->empty_idx = (ice_st->empty_idx + 1) % ice_st->num_buf; 1573 ice_st->send_buf[idx].comp_id = comp_id; 1574 ice_st->send_buf[idx].data_len = data_len; 1575 pj_assert(ice_st->buf_size >= data_len); 1576 pj_memcpy(ice_st->send_buf[idx].buffer, data, data_len); 1577 pj_sockaddr_cp(&ice_st->send_buf[idx].dst_addr, dst_addr); 1578 ice_st->send_buf[idx].dst_addr_len = dst_addr_len; 1579 *buffer = ice_st->send_buf[idx].buffer; 1580 1581 if (ice_st->is_pending) { 1582 /* We'll continue later since there's still a pending send. */ 1583 return PJ_EPENDING; 1584 } 1585 1586 ice_st->is_pending = PJ_TRUE; 1587 ice_st->buf_idx = idx; 1588 1589 return PJ_SUCCESS; 1590 } 1591 1483 1592 /* 1484 1593 * Application wants to send outgoing packet. 1485 1594 */ 1486 PJ_DEF(pj_status_t) pj_ice_strans_sendto( pj_ice_strans *ice_st, 1487 unsigned comp_id, 1488 const void *data, 1489 pj_size_t data_len, 1490 const pj_sockaddr_t *dst_addr, 1491 int dst_addr_len) 1595 static pj_status_t send_data(pj_ice_strans *ice_st, 1596 unsigned comp_id, 1597 const void *data, 1598 pj_size_t data_len, 1599 const pj_sockaddr_t *dst_addr, 1600 int dst_addr_len, 1601 pj_bool_t use_buf, 1602 pj_bool_t call_cb) 1492 1603 { 1493 1604 pj_ice_strans_comp *comp; 1494 1605 pj_ice_sess_cand *def_cand; 1606 void *buf = (void *)data; 1495 1607 pj_status_t status; 1496 1608 … … 1501 1613 1502 1614 /* Check that default candidate for the component exists */ 1503 if (comp->default_cand >= comp->cand_cnt) 1504 return PJ_EINVALIDOP; 1615 if (comp->default_cand >= comp->cand_cnt) { 1616 status = PJ_EINVALIDOP; 1617 goto on_return; 1618 } 1505 1619 1506 1620 /* Protect with group lock, since this may cause race condition with … … 1509 1623 */ 1510 1624 pj_grp_lock_acquire(ice_st->grp_lock); 1625 1626 if (use_buf && ice_st->num_buf > 0) { 1627 status = use_buffer(ice_st, comp_id, data, data_len, dst_addr, 1628 dst_addr_len, &buf); 1629 1630 if (status == PJ_EPENDING || status != PJ_SUCCESS) { 1631 pj_grp_lock_release(ice_st->grp_lock); 1632 return status; 1633 } 1634 } 1511 1635 1512 1636 /* If ICE is available, send data with ICE, otherwise send with the … … 1517 1641 */ 1518 1642 if (ice_st->ice && ice_st->state == PJ_ICE_STRANS_STATE_RUNNING) { 1519 status = pj_ice_sess_send_data(ice_st->ice, comp_id, data, data_len);1643 status = pj_ice_sess_send_data(ice_st->ice, comp_id, buf, data_len); 1520 1644 1521 1645 pj_grp_lock_release(ice_st->grp_lock); 1522 1646 1523 return status;1647 goto on_return; 1524 1648 } 1525 1649 1526 1650 pj_grp_lock_release(ice_st->grp_lock); 1527 1651 … … 1542 1666 if (comp->turn[tp_idx].sock == NULL) { 1543 1667 /* TURN socket error */ 1544 return PJ_EINVALIDOP; 1668 status = PJ_EINVALIDOP; 1669 goto on_return; 1545 1670 } 1546 1671 … … 1556 1681 1557 1682 status = pj_turn_sock_sendto(comp->turn[tp_idx].sock, 1558 (const pj_uint8_t*) data,1683 (const pj_uint8_t*)buf, 1559 1684 (unsigned)data_len, 1560 1685 dst_addr, dst_addr_len); 1561 return (status==PJ_SUCCESS||status==PJ_EPENDING) ? 1562 PJ_SUCCESS : status; 1686 goto on_return; 1563 1687 } else { 1564 1688 const pj_sockaddr_t *dest_addr; … … 1573 1697 dst_addr); 1574 1698 if (status != PJ_SUCCESS) 1575 return status;1699 goto on_return; 1576 1700 1577 1701 pj_sockaddr_cp(&comp->dst_addr, dst_addr); … … 1586 1710 } 1587 1711 1588 status = pj_stun_sock_sendto(comp->stun[tp_idx].sock, NULL, data,1712 status = pj_stun_sock_sendto(comp->stun[tp_idx].sock, NULL, buf, 1589 1713 (unsigned)data_len, 0, dest_addr, 1590 1714 dest_addr_len); 1591 return (status==PJ_SUCCESS||status==PJ_EPENDING) ? 1592 PJ_SUCCESS : status; 1715 goto on_return; 1593 1716 } 1594 1717 1595 1718 } else 1596 return PJ_EINVALIDOP; 1597 } 1719 status = PJ_EINVALIDOP; 1720 1721 on_return: 1722 /* We continue later in on_data_sent() callback. */ 1723 if (status == PJ_EPENDING) 1724 return status; 1725 1726 if (call_cb) { 1727 on_data_sent(ice_st, (status == PJ_SUCCESS? data_len: -status)); 1728 } else { 1729 pj_grp_lock_acquire(ice_st->grp_lock); 1730 if (ice_st->num_buf > 0) { 1731 ice_st->buf_idx = (ice_st->buf_idx + 1) % ice_st->num_buf; 1732 pj_assert (ice_st->buf_idx == ice_st->empty_idx); 1733 } 1734 ice_st->is_pending = PJ_FALSE; 1735 pj_grp_lock_release(ice_st->grp_lock); 1736 } 1737 1738 return status; 1739 } 1740 1741 1742 #if !DEPRECATED_FOR_TICKET_2229 1743 /* 1744 * Application wants to send outgoing packet. 1745 */ 1746 PJ_DEF(pj_status_t) pj_ice_strans_sendto( pj_ice_strans *ice_st, 1747 unsigned comp_id, 1748 const void *data, 1749 pj_size_t data_len, 1750 const pj_sockaddr_t *dst_addr, 1751 int dst_addr_len) 1752 { 1753 pj_status_t status; 1754 1755 PJ_LOG(1, (ice_st->obj_name, "pj_ice_strans_sendto() is deprecated. " 1756 "Application is recommended to use " 1757 "pj_ice_strans_sendto2() instead.")); 1758 status = send_data(ice_st, comp_id, data, data_len, dst_addr, 1759 dst_addr_len, PJ_TRUE, PJ_FALSE); 1760 if (status == PJ_EPENDING) 1761 status = PJ_SUCCESS; 1762 1763 return status; 1764 } 1765 #endif 1766 1767 1768 /* 1769 * Application wants to send outgoing packet. 1770 */ 1771 PJ_DEF(pj_status_t) pj_ice_strans_sendto2(pj_ice_strans *ice_st, 1772 unsigned comp_id, 1773 const void *data, 1774 pj_size_t data_len, 1775 const pj_sockaddr_t *dst_addr, 1776 int dst_addr_len) 1777 { 1778 ice_st->call_send_cb = PJ_TRUE; 1779 return send_data(ice_st, comp_id, data, data_len, dst_addr, 1780 dst_addr_len, PJ_TRUE, PJ_FALSE); 1781 } 1782 1598 1783 1599 1784 /* … … 1706 1891 pj_ice_strans_comp *comp; 1707 1892 pj_status_t status; 1893 void *buf = (void *)pkt; 1894 pj_bool_t use_buf = PJ_FALSE; 1708 1895 #if defined(ENABLE_TRACE) && (ENABLE_TRACE != 0) 1709 1896 char daddr[PJ_INET6_ADDRSTRLEN]; … … 1713 1900 1714 1901 PJ_ASSERT_RETURN(comp_id && comp_id <= ice_st->comp_cnt, PJ_EINVAL); 1902 1903 pj_grp_lock_acquire(ice_st->grp_lock); 1904 if (ice_st->num_buf > 0 && 1905 ice_st->send_buf[ice_st->buf_idx].buffer != pkt) 1906 { 1907 use_buf = PJ_TRUE; 1908 status = use_buffer(ice_st, comp_id, pkt, size, dst_addr, 1909 dst_addr_len, &buf); 1910 if (status == PJ_EPENDING || status != PJ_SUCCESS) { 1911 pj_grp_lock_release(ice_st->grp_lock); 1912 return status; 1913 } 1914 } 1915 pj_grp_lock_release(ice_st->grp_lock); 1715 1916 1716 1917 comp = ice_st->comp[comp_id-1]; … … 1726 1927 if (comp->turn[tp_idx].sock) { 1727 1928 status = pj_turn_sock_sendto(comp->turn[tp_idx].sock, 1728 (const pj_uint8_t*) pkt,1929 (const pj_uint8_t*)buf, 1729 1930 (unsigned)size, 1730 1931 dst_addr, dst_addr_len); … … 1742 1943 status = pj_sockaddr_synthesize(pj_AF_INET6(), 1743 1944 &comp->synth_addr, dst_addr); 1744 if (status != PJ_SUCCESS) 1745 return status; 1945 if (status != PJ_SUCCESS) { 1946 goto on_return; 1947 } 1746 1948 1747 1949 pj_sockaddr_cp(&comp->dst_addr, dst_addr); … … 1756 1958 1757 1959 status = pj_stun_sock_sendto(comp->stun[tp_idx].sock, NULL, 1758 pkt, (unsigned)size, 0,1960 buf, (unsigned)size, 0, 1759 1961 dest_addr, dest_addr_len); 1760 1962 } else { … … 1763 1965 } 1764 1966 1765 return (status==PJ_SUCCESS||status==PJ_EPENDING) ? PJ_SUCCESS : status; 1967 on_return: 1968 if (use_buf && status != PJ_EPENDING) { 1969 pj_grp_lock_acquire(ice_st->grp_lock); 1970 if (ice_st->num_buf > 0) { 1971 ice_st->buf_idx = (ice_st->buf_idx + 1) % ice_st->num_buf; 1972 pj_assert(ice_st->buf_idx == ice_st->empty_idx); 1973 } 1974 ice_st->is_pending = PJ_FALSE; 1975 pj_grp_lock_release(ice_st->grp_lock); 1976 } 1977 1978 return status; 1766 1979 } 1767 1980 … … 1784 1997 src_addr, src_addr_len); 1785 1998 } 1999 } 2000 2001 /* Notifification when asynchronous send operation via STUN/TURN 2002 * has completed. 2003 */ 2004 static pj_bool_t on_data_sent(pj_ice_strans *ice_st, pj_ssize_t sent) 2005 { 2006 if (ice_st->destroy_req || !ice_st->is_pending) 2007 return PJ_TRUE; 2008 2009 if (ice_st->call_send_cb && ice_st->cb.on_data_sent) { 2010 (*ice_st->cb.on_data_sent)(ice_st, sent); 2011 } 2012 2013 pj_grp_lock_acquire(ice_st->grp_lock); 2014 2015 if (ice_st->num_buf > 0) 2016 ice_st->buf_idx = (ice_st->buf_idx + 1) % ice_st->num_buf; 2017 2018 if (ice_st->num_buf > 0 && ice_st->buf_idx != ice_st->empty_idx) { 2019 /* There's still more pending send. Send it one by one. */ 2020 pending_send *ps = &ice_st->send_buf[ice_st->buf_idx]; 2021 2022 pj_grp_lock_release(ice_st->grp_lock); 2023 send_data(ice_st, ps->comp_id, ps->buffer, ps->data_len, 2024 &ps->dst_addr, ps->dst_addr_len, PJ_FALSE, PJ_TRUE); 2025 } else { 2026 ice_st->is_pending = PJ_FALSE; 2027 pj_grp_lock_release(ice_st->grp_lock); 2028 } 2029 2030 return PJ_TRUE; 1786 2031 } 1787 2032 … … 1845 2090 pj_ssize_t sent) 1846 2091 { 1847 PJ_UNUSED_ARG(stun_sock); 2092 sock_user_data *data; 2093 1848 2094 PJ_UNUSED_ARG(send_key); 1849 PJ_UNUSED_ARG(sent); 1850 return PJ_TRUE; 2095 2096 data = (sock_user_data *)pj_stun_sock_get_user_data(stun_sock); 2097 if (!data || !data->comp || !data->comp->ice_st) return PJ_TRUE; 2098 2099 return on_data_sent(data->comp->ice_st, sent); 1851 2100 } 1852 2101 … … 2106 2355 } 2107 2356 2357 /* Notifification when asynchronous send operation to the TURN socket 2358 * has completed. 2359 */ 2360 static pj_bool_t turn_on_data_sent(pj_turn_sock *turn_sock, 2361 pj_ssize_t sent) 2362 { 2363 sock_user_data *data; 2364 2365 data = (sock_user_data *)pj_turn_sock_get_user_data(turn_sock); 2366 if (!data || !data->comp || !data->comp->ice_st) return PJ_TRUE; 2367 2368 return on_data_sent(data->comp->ice_st, sent); 2369 } 2108 2370 2109 2371 /* Callback when TURN client state has changed */
Note: See TracChangeset
for help on using the changeset viewer.