Ignore:
Timestamp:
Sep 23, 2019 7:25:41 AM (5 years ago)
Author:
ming
Message:

Fixed #2229: Limitations in ICE data sending

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjnath/src/pjnath/ice_strans.c

    r6048 r6071  
    128128                            const pj_sockaddr_t *peer_addr, 
    129129                            unsigned addr_len); 
     130static pj_bool_t turn_on_data_sent(pj_turn_sock *turn_sock, 
     131                                   pj_ssize_t sent); 
    130132static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, 
    131133                          pj_turn_state_t new_state); 
     
    134136 
    135137/* Forward decls */ 
     138static pj_bool_t on_data_sent(pj_ice_strans *ice_st, pj_ssize_t sent); 
    136139static void ice_st_on_destroy(void *obj); 
    137140static void destroy_ice_st(pj_ice_strans *ice_st); 
     
    179182 
    180183 
     184/* Pending send buffer */ 
     185typedef struct pending_send 
     186{ 
     187    void               *buffer; 
     188    unsigned            comp_id; 
     189    pj_size_t           data_len; 
     190    pj_sockaddr         dst_addr; 
     191    int                 dst_addr_len; 
     192} pending_send; 
     193 
    181194/** 
    182195 * This structure represents the ICE stream transport. 
     
    185198{ 
    186199    char                    *obj_name;  /**< Log ID.                    */ 
     200    pj_pool_factory         *pf;        /**< Pool factory.              */ 
    187201    pj_pool_t               *pool;      /**< Pool used by this object.  */ 
    188202    void                    *user_data; /**< Application data.          */ 
     
    198212    pj_ice_strans_comp     **comp;      /**< Components array.          */ 
    199213 
     214    pj_pool_t               *buf_pool;  /**< Pool for buffers.          */ 
     215    unsigned                 num_buf;   /**< Number of buffers.         */ 
     216    unsigned                 buf_idx;   /**< Index of buffer.           */ 
     217    unsigned                 empty_idx; /**< Index of empty buffer.     */ 
     218    unsigned                 buf_size;  /**< Buffer size.               */ 
     219    pending_send            *send_buf;  /**< Send buffers.              */ 
     220    pj_bool_t                is_pending;/**< Any pending send?          */ 
     221 
    200222    pj_timer_entry           ka_timer;  /**< STUN keep-alive timer.     */ 
    201223 
    202224    pj_bool_t                destroy_req;/**< Destroy has been called?  */ 
    203225    pj_bool_t                cb_called; /**< Init error callback called?*/ 
     226    pj_bool_t                call_send_cb;/**< Need to call send cb?    */ 
    204227}; 
    205228 
     
    242265    pj_ice_strans_turn_cfg_default(&cfg->turn); 
    243266    pj_ice_sess_options_default(&cfg->opt); 
     267 
     268    cfg->num_send_buf = 4; 
    244269} 
    245270 
     
    368393    pj_bzero(&turn_sock_cb, sizeof(turn_sock_cb)); 
    369394    turn_sock_cb.on_rx_data = &turn_on_rx_data; 
     395    turn_sock_cb.on_data_sent = &turn_on_data_sent; 
    370396    turn_sock_cb.on_state = &turn_on_state; 
    371397 
     
    786812} 
    787813 
     814static pj_status_t alloc_send_buf(pj_ice_strans *ice_st, unsigned buf_size) 
     815{ 
     816    if (buf_size > ice_st->buf_size) { 
     817        unsigned i; 
     818         
     819        if (ice_st->is_pending) { 
     820            /* The current buffer is insufficient, but still currently used.*/ 
     821            return PJ_EBUSY; 
     822        } 
     823 
     824        pj_pool_safe_release(&ice_st->buf_pool); 
     825 
     826        ice_st->buf_pool = pj_pool_create(ice_st->pf, "ice_buf", 
     827                               (buf_size + sizeof(pending_send)) * 
     828                               ice_st->num_buf, 512, NULL); 
     829        if (!ice_st->buf_pool) 
     830            return PJ_ENOMEM; 
     831 
     832        ice_st->buf_size = buf_size; 
     833        ice_st->send_buf = pj_pool_calloc(ice_st->buf_pool, ice_st->num_buf, 
     834                                          sizeof(pending_send)); 
     835        for (i = 0; i < ice_st->num_buf; i++) { 
     836            ice_st->send_buf[i].buffer = pj_pool_alloc(ice_st->buf_pool, 
     837                                                       buf_size); 
     838        } 
     839        ice_st->buf_idx = ice_st->empty_idx = 0; 
     840    } 
     841     
     842    return PJ_SUCCESS; 
     843} 
    788844 
    789845/* 
     
    816872    ice_st = PJ_POOL_ZALLOC_T(pool, pj_ice_strans); 
    817873    ice_st->pool = pool; 
     874    ice_st->pf = cfg->stun_cfg.pf; 
    818875    ice_st->obj_name = pool->obj_name; 
    819876    ice_st->user_data = user_data; 
     
    827884    if (status != PJ_SUCCESS) { 
    828885        pj_pool_release(pool); 
     886        pj_log_pop_indent(); 
     887        return status; 
     888    } 
     889 
     890    /* Allocate send buffer */ 
     891    ice_st->num_buf = cfg->num_send_buf; 
     892    status = alloc_send_buf(ice_st, cfg->send_buf_size); 
     893    if (status != PJ_SUCCESS) { 
     894        destroy_ice_st(ice_st); 
    829895        pj_log_pop_indent(); 
    830896        return status; 
     
    902968 
    903969    /* Done */ 
    904     pj_pool_release(ice_st->pool); 
     970    pj_pool_safe_release(&ice_st->buf_pool); 
     971    pj_pool_safe_release(&ice_st->pool); 
    905972} 
    906973 
     
    14641531     
    14651532    /* Protect with group lock, since this may cause race condition with 
    1466      * pj_ice_strans_sendto(). 
     1533     * pj_ice_strans_sendto2(). 
    14671534     * See ticket #1877. 
    14681535     */ 
     
    14811548} 
    14821549 
     1550static pj_status_t use_buffer( pj_ice_strans *ice_st, 
     1551                               unsigned comp_id, 
     1552                               const void *data, 
     1553                               pj_size_t data_len, 
     1554                               const pj_sockaddr_t *dst_addr, 
     1555                               int dst_addr_len, 
     1556                               void **buffer ) 
     1557{ 
     1558    unsigned idx; 
     1559    pj_status_t status; 
     1560 
     1561    /* Allocate send buffer, if necessary. */ 
     1562    status = alloc_send_buf(ice_st, data_len); 
     1563    if (status != PJ_SUCCESS) 
     1564        return status; 
     1565     
     1566    if (ice_st->is_pending && ice_st->empty_idx == ice_st->buf_idx) { 
     1567        /* We don't use buffer or there's no more empty buffer. */ 
     1568        return PJ_EBUSY; 
     1569    } 
     1570 
     1571    idx = ice_st->empty_idx; 
     1572    ice_st->empty_idx = (ice_st->empty_idx + 1) % ice_st->num_buf; 
     1573    ice_st->send_buf[idx].comp_id = comp_id; 
     1574    ice_st->send_buf[idx].data_len = data_len; 
     1575    pj_assert(ice_st->buf_size >= data_len); 
     1576    pj_memcpy(ice_st->send_buf[idx].buffer, data, data_len); 
     1577    pj_sockaddr_cp(&ice_st->send_buf[idx].dst_addr, dst_addr); 
     1578    ice_st->send_buf[idx].dst_addr_len = dst_addr_len; 
     1579    *buffer = ice_st->send_buf[idx].buffer; 
     1580     
     1581    if (ice_st->is_pending) { 
     1582        /* We'll continue later since there's still a pending send. */ 
     1583        return PJ_EPENDING; 
     1584    } 
     1585     
     1586    ice_st->is_pending = PJ_TRUE; 
     1587    ice_st->buf_idx = idx; 
     1588 
     1589    return PJ_SUCCESS; 
     1590} 
     1591 
    14831592/* 
    14841593 * Application wants to send outgoing packet. 
    14851594 */ 
    1486 PJ_DEF(pj_status_t) pj_ice_strans_sendto( pj_ice_strans *ice_st, 
    1487                                           unsigned comp_id, 
    1488                                           const void *data, 
    1489                                           pj_size_t data_len, 
    1490                                           const pj_sockaddr_t *dst_addr, 
    1491                                           int dst_addr_len) 
     1595static pj_status_t send_data(pj_ice_strans *ice_st, 
     1596                             unsigned comp_id, 
     1597                             const void *data, 
     1598                             pj_size_t data_len, 
     1599                             const pj_sockaddr_t *dst_addr, 
     1600                             int dst_addr_len, 
     1601                             pj_bool_t use_buf, 
     1602                             pj_bool_t call_cb) 
    14921603{ 
    14931604    pj_ice_strans_comp *comp; 
    14941605    pj_ice_sess_cand *def_cand; 
     1606    void *buf = (void *)data; 
    14951607    pj_status_t status; 
    14961608 
     
    15011613 
    15021614    /* Check that default candidate for the component exists */ 
    1503     if (comp->default_cand >= comp->cand_cnt) 
    1504         return PJ_EINVALIDOP; 
     1615    if (comp->default_cand >= comp->cand_cnt) { 
     1616        status = PJ_EINVALIDOP; 
     1617        goto on_return; 
     1618    } 
    15051619 
    15061620    /* Protect with group lock, since this may cause race condition with 
     
    15091623     */ 
    15101624    pj_grp_lock_acquire(ice_st->grp_lock); 
     1625 
     1626    if (use_buf && ice_st->num_buf > 0) { 
     1627        status = use_buffer(ice_st, comp_id, data, data_len, dst_addr, 
     1628                            dst_addr_len, &buf); 
     1629 
     1630        if (status == PJ_EPENDING || status != PJ_SUCCESS) { 
     1631            pj_grp_lock_release(ice_st->grp_lock); 
     1632            return status; 
     1633        } 
     1634    } 
    15111635 
    15121636    /* If ICE is available, send data with ICE, otherwise send with the 
     
    15171641     */ 
    15181642    if (ice_st->ice && ice_st->state == PJ_ICE_STRANS_STATE_RUNNING) { 
    1519         status = pj_ice_sess_send_data(ice_st->ice, comp_id, data, data_len); 
     1643        status = pj_ice_sess_send_data(ice_st->ice, comp_id, buf, data_len); 
    15201644         
    15211645        pj_grp_lock_release(ice_st->grp_lock); 
    15221646         
    1523         return status; 
     1647        goto on_return; 
    15241648    }  
    1525      
     1649 
    15261650    pj_grp_lock_release(ice_st->grp_lock); 
    15271651 
     
    15421666            if (comp->turn[tp_idx].sock == NULL) { 
    15431667                /* TURN socket error */ 
    1544                 return PJ_EINVALIDOP; 
     1668                status = PJ_EINVALIDOP; 
     1669                goto on_return; 
    15451670            } 
    15461671 
     
    15561681 
    15571682            status = pj_turn_sock_sendto(comp->turn[tp_idx].sock, 
    1558                                          (const pj_uint8_t*)data, 
     1683                                         (const pj_uint8_t*)buf, 
    15591684                                         (unsigned)data_len, 
    15601685                                         dst_addr, dst_addr_len); 
    1561             return (status==PJ_SUCCESS||status==PJ_EPENDING) ? 
    1562                     PJ_SUCCESS : status; 
     1686            goto on_return; 
    15631687        } else { 
    15641688            const pj_sockaddr_t *dest_addr; 
     
    15731697                                                    dst_addr); 
    15741698                    if (status != PJ_SUCCESS) 
    1575                         return status; 
     1699                        goto on_return; 
    15761700 
    15771701                    pj_sockaddr_cp(&comp->dst_addr, dst_addr); 
     
    15861710            } 
    15871711 
    1588             status = pj_stun_sock_sendto(comp->stun[tp_idx].sock, NULL, data, 
     1712            status = pj_stun_sock_sendto(comp->stun[tp_idx].sock, NULL, buf, 
    15891713                                         (unsigned)data_len, 0, dest_addr, 
    15901714                                         dest_addr_len); 
    1591             return (status==PJ_SUCCESS||status==PJ_EPENDING) ? 
    1592                     PJ_SUCCESS : status; 
     1715            goto on_return; 
    15931716        } 
    15941717 
    15951718    } else 
    1596         return PJ_EINVALIDOP; 
    1597 } 
     1719        status = PJ_EINVALIDOP; 
     1720 
     1721on_return: 
     1722    /* We continue later in on_data_sent() callback. */ 
     1723    if (status == PJ_EPENDING) 
     1724        return status; 
     1725 
     1726    if (call_cb) { 
     1727        on_data_sent(ice_st, (status == PJ_SUCCESS? data_len: -status)); 
     1728    } else { 
     1729        pj_grp_lock_acquire(ice_st->grp_lock); 
     1730        if (ice_st->num_buf > 0) { 
     1731            ice_st->buf_idx = (ice_st->buf_idx + 1) % ice_st->num_buf; 
     1732            pj_assert (ice_st->buf_idx == ice_st->empty_idx); 
     1733        } 
     1734        ice_st->is_pending = PJ_FALSE; 
     1735        pj_grp_lock_release(ice_st->grp_lock); 
     1736    } 
     1737 
     1738    return status; 
     1739} 
     1740 
     1741 
     1742#if !DEPRECATED_FOR_TICKET_2229 
     1743/* 
     1744 * Application wants to send outgoing packet. 
     1745 */ 
     1746PJ_DEF(pj_status_t) pj_ice_strans_sendto( pj_ice_strans *ice_st, 
     1747                                          unsigned comp_id, 
     1748                                          const void *data, 
     1749                                          pj_size_t data_len, 
     1750                                          const pj_sockaddr_t *dst_addr, 
     1751                                          int dst_addr_len) 
     1752{ 
     1753    pj_status_t status; 
     1754 
     1755    PJ_LOG(1, (ice_st->obj_name, "pj_ice_strans_sendto() is deprecated. " 
     1756                                 "Application is recommended to use " 
     1757                                 "pj_ice_strans_sendto2() instead.")); 
     1758    status = send_data(ice_st, comp_id, data, data_len, dst_addr, 
     1759                       dst_addr_len, PJ_TRUE, PJ_FALSE); 
     1760    if (status == PJ_EPENDING) 
     1761        status = PJ_SUCCESS; 
     1762     
     1763    return status; 
     1764} 
     1765#endif 
     1766 
     1767 
     1768/* 
     1769 * Application wants to send outgoing packet. 
     1770 */ 
     1771PJ_DEF(pj_status_t) pj_ice_strans_sendto2(pj_ice_strans *ice_st, 
     1772                                          unsigned comp_id, 
     1773                                          const void *data, 
     1774                                          pj_size_t data_len, 
     1775                                          const pj_sockaddr_t *dst_addr, 
     1776                                          int dst_addr_len) 
     1777{ 
     1778    ice_st->call_send_cb = PJ_TRUE; 
     1779    return send_data(ice_st, comp_id, data, data_len, dst_addr, 
     1780                     dst_addr_len, PJ_TRUE, PJ_FALSE); 
     1781} 
     1782 
    15981783 
    15991784/* 
     
    17061891    pj_ice_strans_comp *comp; 
    17071892    pj_status_t status; 
     1893    void *buf = (void *)pkt; 
     1894    pj_bool_t use_buf = PJ_FALSE; 
    17081895#if defined(ENABLE_TRACE) && (ENABLE_TRACE != 0) 
    17091896    char daddr[PJ_INET6_ADDRSTRLEN]; 
     
    17131900 
    17141901    PJ_ASSERT_RETURN(comp_id && comp_id <= ice_st->comp_cnt, PJ_EINVAL); 
     1902 
     1903    pj_grp_lock_acquire(ice_st->grp_lock); 
     1904    if (ice_st->num_buf > 0 && 
     1905        ice_st->send_buf[ice_st->buf_idx].buffer != pkt) 
     1906    { 
     1907        use_buf = PJ_TRUE; 
     1908        status = use_buffer(ice_st, comp_id, pkt, size, dst_addr, 
     1909                            dst_addr_len, &buf); 
     1910        if (status == PJ_EPENDING || status != PJ_SUCCESS) { 
     1911            pj_grp_lock_release(ice_st->grp_lock); 
     1912            return status; 
     1913        } 
     1914    } 
     1915    pj_grp_lock_release(ice_st->grp_lock); 
    17151916 
    17161917    comp = ice_st->comp[comp_id-1]; 
     
    17261927        if (comp->turn[tp_idx].sock) { 
    17271928            status = pj_turn_sock_sendto(comp->turn[tp_idx].sock, 
    1728                                          (const pj_uint8_t*)pkt, 
     1929                                         (const pj_uint8_t*)buf, 
    17291930                                         (unsigned)size, 
    17301931                                         dst_addr, dst_addr_len); 
     
    17421943                status = pj_sockaddr_synthesize(pj_AF_INET6(), 
    17431944                                                &comp->synth_addr, dst_addr); 
    1744                 if (status != PJ_SUCCESS) 
    1745                     return status; 
     1945                if (status != PJ_SUCCESS) { 
     1946                    goto on_return; 
     1947                } 
    17461948             
    17471949                pj_sockaddr_cp(&comp->dst_addr, dst_addr); 
     
    17561958 
    17571959        status = pj_stun_sock_sendto(comp->stun[tp_idx].sock, NULL, 
    1758                                      pkt, (unsigned)size, 0, 
     1960                                     buf, (unsigned)size, 0, 
    17591961                                     dest_addr, dest_addr_len); 
    17601962    } else { 
     
    17631965    } 
    17641966 
    1765     return (status==PJ_SUCCESS||status==PJ_EPENDING) ? PJ_SUCCESS : status; 
     1967on_return: 
     1968    if (use_buf && status != PJ_EPENDING) { 
     1969        pj_grp_lock_acquire(ice_st->grp_lock); 
     1970        if (ice_st->num_buf > 0) { 
     1971            ice_st->buf_idx = (ice_st->buf_idx + 1) % ice_st->num_buf; 
     1972            pj_assert(ice_st->buf_idx == ice_st->empty_idx); 
     1973        } 
     1974        ice_st->is_pending = PJ_FALSE; 
     1975        pj_grp_lock_release(ice_st->grp_lock); 
     1976    } 
     1977 
     1978    return status; 
    17661979} 
    17671980 
     
    17841997                                 src_addr, src_addr_len); 
    17851998    } 
     1999} 
     2000 
     2001/* Notifification when asynchronous send operation via STUN/TURN 
     2002 * has completed. 
     2003 */ 
     2004static pj_bool_t on_data_sent(pj_ice_strans *ice_st, pj_ssize_t sent) 
     2005{ 
     2006    if (ice_st->destroy_req || !ice_st->is_pending) 
     2007        return PJ_TRUE; 
     2008 
     2009    if (ice_st->call_send_cb && ice_st->cb.on_data_sent) { 
     2010        (*ice_st->cb.on_data_sent)(ice_st, sent); 
     2011    } 
     2012 
     2013    pj_grp_lock_acquire(ice_st->grp_lock); 
     2014 
     2015    if (ice_st->num_buf > 0) 
     2016        ice_st->buf_idx = (ice_st->buf_idx + 1) % ice_st->num_buf; 
     2017     
     2018    if (ice_st->num_buf > 0 && ice_st->buf_idx != ice_st->empty_idx) { 
     2019        /* There's still more pending send. Send it one by one. */ 
     2020        pending_send *ps = &ice_st->send_buf[ice_st->buf_idx]; 
     2021 
     2022        pj_grp_lock_release(ice_st->grp_lock); 
     2023        send_data(ice_st, ps->comp_id, ps->buffer, ps->data_len, 
     2024                  &ps->dst_addr, ps->dst_addr_len, PJ_FALSE, PJ_TRUE); 
     2025    } else { 
     2026        ice_st->is_pending = PJ_FALSE; 
     2027        pj_grp_lock_release(ice_st->grp_lock); 
     2028    } 
     2029 
     2030    return PJ_TRUE; 
    17862031} 
    17872032 
     
    18452090                                   pj_ssize_t sent) 
    18462091{ 
    1847     PJ_UNUSED_ARG(stun_sock); 
     2092    sock_user_data *data; 
     2093 
    18482094    PJ_UNUSED_ARG(send_key); 
    1849     PJ_UNUSED_ARG(sent); 
    1850     return PJ_TRUE; 
     2095 
     2096    data = (sock_user_data *)pj_stun_sock_get_user_data(stun_sock); 
     2097    if (!data || !data->comp || !data->comp->ice_st) return PJ_TRUE; 
     2098 
     2099    return on_data_sent(data->comp->ice_st, sent); 
    18512100} 
    18522101 
     
    21062355} 
    21072356 
     2357/* Notifification when asynchronous send operation to the TURN socket 
     2358 * has completed. 
     2359 */ 
     2360static pj_bool_t turn_on_data_sent(pj_turn_sock *turn_sock, 
     2361                                   pj_ssize_t sent) 
     2362{ 
     2363    sock_user_data *data; 
     2364 
     2365    data = (sock_user_data *)pj_turn_sock_get_user_data(turn_sock); 
     2366    if (!data || !data->comp || !data->comp->ice_st) return PJ_TRUE; 
     2367 
     2368    return on_data_sent(data->comp->ice_st, sent); 
     2369} 
    21082370 
    21092371/* Callback when TURN client state has changed */ 
Note: See TracChangeset for help on using the changeset viewer.