Changeset 2845


Ignore:
Timestamp:
Jul 29, 2009 12:19:25 PM (15 years ago)
Author:
bennylp
Message:

Ticket #924: Loop media transport now allows more than one streams to receive the reflected packets

  • This ticket allows the same loop media transport instance to be attached to more than one streams, and allow application to control which stream(s) receives the reflected packets.
Location:
pjproject/trunk/pjmedia
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjmedia/include/pjmedia/transport_loop.h

    r2394 r2845  
    3636 * @{ 
    3737 * 
    38  * This is the loopback media transport, where packet sent to this transport 
    39  * will immediately be sent back to the callback. 
     38 * This is the loopback media transport, where packets sent to this transport 
     39 * will be sent back to the streams attached to this transport. Unlike the 
     40 * other PJMEDIA transports, the loop transport may be attached to multiple 
     41 * streams (in other words, application should specify the same loop transport 
     42 * instance when calling #pjmedia_stream_create()). Any RTP or RTCP packets 
     43 * sent by one stream to this transport by default will be sent back to all  
     44 * streams that are attached to this transport, including to the stream that 
     45 * sends the packet. Application may individually select which stream to 
     46 * receive packets by calling #pjmedia_transport_loop_disable_rx(). 
    4047 */ 
    4148 
     
    5562 
    5663 
     64/** 
     65 * Set this stream as the receiver of incoming packets. 
     66 */ 
     67PJ_DECL(pj_status_t) pjmedia_transport_loop_disable_rx(pjmedia_transport *tp, 
     68                                                       void *user, 
     69                                                       pj_bool_t disabled); 
    5770 
    5871 
  • pjproject/trunk/pjmedia/src/pjmedia/transport_loop.c

    r2506 r2845  
    1919 */ 
    2020#include <pjmedia/transport_loop.h> 
     21#include <pj/array.h> 
    2122#include <pj/assert.h> 
    2223#include <pj/errno.h> 
     
    2829 
    2930 
    30 struct transport_loop 
    31 { 
    32     pjmedia_transport   base;           /**< Base transport.                */ 
    33  
    34     pj_pool_t          *pool;           /**< Memory pool                    */ 
     31struct user 
     32{ 
     33    pj_bool_t           rx_disabled;    /**< Doesn't want to receive pkt?   */ 
    3534    void               *user_data;      /**< Only valid when attached       */ 
    36     pj_bool_t           attached;       /**< Has attachment?                */ 
    37     pj_sockaddr         rem_rtp_addr;   /**< Remote RTP address             */ 
    38     pj_sockaddr         rem_rtcp_addr;  /**< Remote RTCP address            */ 
    39     int                 addr_len;       /**< Length of addresses.           */ 
    4035    void  (*rtp_cb)(    void*,          /**< To report incoming RTP.        */ 
    4136                        void*, 
     
    4439                        void*, 
    4540                        pj_ssize_t); 
     41}; 
     42 
     43struct transport_loop 
     44{ 
     45    pjmedia_transport   base;           /**< Base transport.                */ 
     46 
     47    pj_pool_t          *pool;           /**< Memory pool                    */ 
     48    unsigned            user_cnt;       /**< Number of attachments          */ 
     49    struct user         users[4];       /**< Array of users.                */ 
    4650 
    4751    unsigned            tx_drop_pct;    /**< Percent of tx pkts to drop.    */ 
     
    149153 
    150154 
     155PJ_DEF(pj_status_t) pjmedia_transport_loop_disable_rx( pjmedia_transport *tp, 
     156                                                       void *user, 
     157                                                       pj_bool_t disabled) 
     158{ 
     159    struct transport_loop *loop = (struct transport_loop*) tp; 
     160    unsigned i; 
     161 
     162    for (i=0; i<loop->user_cnt; ++i) { 
     163        if (loop->users[i].user_data == user) { 
     164            loop->users[i].rx_disabled = disabled; 
     165            return PJ_SUCCESS; 
     166        } 
     167    } 
     168    pj_assert(!"Invalid stream user"); 
     169    return PJ_ENOTFOUND; 
     170} 
     171 
    151172/** 
    152173 * Close loopback transport. 
     
    194215{ 
    195216    struct transport_loop *loop = (struct transport_loop*) tp; 
     217    unsigned i; 
    196218    const pj_sockaddr *rtcp_addr; 
    197219 
     
    199221    PJ_ASSERT_RETURN(tp && rem_addr && addr_len, PJ_EINVAL); 
    200222 
    201     /* Must not be "attached" to existing application */ 
    202     PJ_ASSERT_RETURN(!loop->attached, PJ_EINVALIDOP); 
     223    /* Must not be "attached" to same user */ 
     224    for (i=0; i<loop->user_cnt; ++i) { 
     225        PJ_ASSERT_RETURN(loop->users[i].user_data != user_data, 
     226                         PJ_EINVALIDOP); 
     227    } 
     228    PJ_ASSERT_RETURN(loop->user_cnt != PJ_ARRAY_SIZE(loop->users),  
     229                     PJ_ETOOMANY); 
    203230 
    204231    PJ_UNUSED_ARG(rem_rtcp); 
     
    207234    /* "Attach" the application: */ 
    208235 
    209     /* Save the callbacks */ 
    210     loop->rtp_cb = rtp_cb; 
    211     loop->rtcp_cb = rtcp_cb; 
    212     loop->user_data = user_data; 
    213  
    214     /* Save address length */ 
    215     loop->addr_len = addr_len; 
    216  
    217     /* Last, mark transport as attached */ 
    218     loop->attached = PJ_TRUE; 
     236    /* Save the new user */ 
     237    loop->users[loop->user_cnt].rtp_cb = rtp_cb; 
     238    loop->users[loop->user_cnt].rtcp_cb = rtcp_cb; 
     239    loop->users[loop->user_cnt].user_data = user_data; 
     240    ++loop->user_cnt; 
    219241 
    220242    return PJ_SUCCESS; 
     
    227249{ 
    228250    struct transport_loop *loop = (struct transport_loop*) tp; 
     251    unsigned i; 
    229252 
    230253    pj_assert(tp); 
    231254 
    232     if (loop->attached) { 
    233         /* User data is unreferenced on Release build */ 
    234         PJ_UNUSED_ARG(user_data); 
    235  
    236         /* As additional checking, check if the same user data is specified */ 
    237         pj_assert(user_data == loop->user_data); 
    238  
    239         /* First, mark transport as unattached */ 
    240         loop->attached = PJ_FALSE; 
    241  
    242         /* Clear up application infos from transport */ 
    243         loop->rtp_cb = NULL; 
    244         loop->rtcp_cb = NULL; 
    245         loop->user_data = NULL; 
    246     } 
     255    for (i=0; i<loop->user_cnt; ++i) { 
     256        if (loop->users[i].user_data == user_data) 
     257            break; 
     258    } 
     259    PJ_ASSERT_ON_FAIL(i != loop->user_cnt, return); 
     260 
     261    /* Remove this user */ 
     262    pj_array_erase(loop->users, sizeof(loop->users[0]), 
     263                   loop->user_cnt, i); 
     264    --loop->user_cnt; 
    247265} 
    248266 
     
    254272{ 
    255273    struct transport_loop *loop = (struct transport_loop*)tp; 
    256     void (*cb)(void*,void*,pj_ssize_t); 
    257     void *user_data; 
    258  
    259     /* Must be attached */ 
    260     PJ_ASSERT_RETURN(loop->attached, PJ_EINVALIDOP); 
     274    unsigned i; 
    261275 
    262276    /* Simulate packet lost on TX direction */ 
     
    270284    } 
    271285 
    272     cb = loop->rtp_cb; 
    273     user_data = loop->user_data; 
    274  
    275286    /* Simulate packet lost on RX direction */ 
    276287    if (loop->rx_drop_pct) { 
     
    283294    } 
    284295 
    285     if (loop->attached && cb) 
    286         (*cb)(user_data, (void*)pkt, size); 
     296    /* Distribute to users */ 
     297    for (i=0; i<loop->user_cnt; ++i) { 
     298        if (!loop->users[i].rx_disabled && loop->users[i].rtp_cb) 
     299            (*loop->users[i].rtp_cb)(loop->users[i].user_data, (void*)pkt,  
     300                                     size); 
     301    } 
    287302 
    288303    return PJ_SUCCESS; 
     
    306321{ 
    307322    struct transport_loop *loop = (struct transport_loop*)tp; 
    308     void (*cb)(void*,void*,pj_ssize_t); 
    309     void *user_data; 
    310  
    311     PJ_ASSERT_RETURN(loop->attached, PJ_EINVALIDOP); 
     323    unsigned i; 
    312324 
    313325    PJ_UNUSED_ARG(addr_len); 
    314326    PJ_UNUSED_ARG(addr); 
    315327 
    316     cb = loop->rtcp_cb; 
    317     user_data = loop->user_data; 
    318  
    319     if (loop->attached && cb) 
    320         (*cb)(user_data, (void*)pkt, size); 
     328    /* Distribute to users */ 
     329    for (i=0; i<loop->user_cnt; ++i) { 
     330        if (!loop->users[i].rx_disabled && loop->users[i].rtcp_cb) 
     331            (*loop->users[i].rtcp_cb)(loop->users[i].user_data, (void*)pkt, 
     332                                      size); 
     333    } 
    321334 
    322335    return PJ_SUCCESS; 
Note: See TracChangeset for help on using the changeset viewer.