Ignore:
Timestamp:
Jun 22, 2006 6:49:45 PM (18 years ago)
Author:
bennylp
Message:

Added better API for media transport, and fixed bugs with pending RTP write operation in UDP media transport

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjmedia/src/pjmedia/transport_udp.c

    r533 r539  
    3131#define RTCP_LEN    600 
    3232 
     33/* Maximum pending write operations */ 
     34#define MAX_PENDING 4 
     35 
     36/* Pending write buffer */ 
     37typedef struct pending_write 
     38{ 
     39    char                buffer[RTP_LEN]; 
     40    pj_ioqueue_op_key_t op_key; 
     41} pending_write; 
     42 
    3343 
    3444struct transport_udp 
     
    3848    pj_pool_t          *pool;           /**< Memory pool                    */ 
    3949    unsigned            options;        /**< Transport options.             */ 
    40     pjmedia_stream     *stream;         /**< Stream user (may be NULL)      */ 
     50    void               *user_data;      /**< Only valid when attached       */ 
     51    pj_bool_t           attached;       /**< Has attachment?                */ 
    4152    pj_sockaddr_in      rem_rtp_addr;   /**< Remote RTP address             */ 
    4253    pj_sockaddr_in      rem_rtcp_addr;  /**< Remote RTCP address            */ 
     
    5263    pj_ioqueue_key_t   *rtp_key;        /**< RTP socket key in ioqueue      */ 
    5364    pj_ioqueue_op_key_t rtp_read_op;    /**< Pending read operation         */ 
    54     pj_ioqueue_op_key_t rtp_write_op;   /**< Pending write operation        */ 
     65    unsigned            rtp_write_op_id;/**< Next write_op to use           */ 
     66    pending_write       rtp_pending_write[MAX_PENDING];  /**< Pending write */ 
    5567    pj_sockaddr_in      rtp_src_addr;   /**< Actual packet src addr.        */ 
    5668    unsigned            rtp_src_cnt;    /**< How many pkt from this addr.   */ 
     
    7688 
    7789static pj_status_t transport_attach(   pjmedia_transport *tp, 
    78                                        pjmedia_stream *strm, 
     90                                       void *user_data, 
    7991                                       const pj_sockaddr_t *rem_addr, 
    8092                                       unsigned addr_len, 
    81                                        void (*rtp_cb)(pjmedia_stream*, 
     93                                       void (*rtp_cb)(void*, 
    8294                                                      const void*, 
    8395                                                      pj_ssize_t), 
    84                                        void (*rtcp_cb)(pjmedia_stream*, 
     96                                       void (*rtcp_cb)(void*, 
    8597                                                       const void*, 
    8698                                                       pj_ssize_t)); 
    8799static void        transport_detach(   pjmedia_transport *tp, 
    88                                        pjmedia_stream *strm); 
     100                                       void *strm); 
    89101static pj_status_t transport_send_rtp( pjmedia_transport *tp, 
    90102                                       const void *pkt, 
     
    114126                                                  pjmedia_transport **p_tp) 
    115127{ 
     128    return pjmedia_transport_udp_create2(endpt, name, NULL, port, options,  
     129                                        p_tp); 
     130} 
     131 
     132/** 
     133 * Create UDP stream transport. 
     134 */ 
     135PJ_DEF(pj_status_t) pjmedia_transport_udp_create2(pjmedia_endpt *endpt, 
     136                                                  const char *name, 
     137                                                  const pj_str_t *addr, 
     138                                                  int port, 
     139                                                  unsigned options, 
     140                                                  pjmedia_transport **p_tp) 
     141{ 
    116142    pjmedia_sock_info si; 
    117143    pj_status_t status; 
     
    131157 
    132158    /* Bind RTP socket */ 
    133     si.rtp_addr_name.sin_family = PJ_AF_INET; 
    134     si.rtp_addr_name.sin_port = pj_htons((pj_uint16_t)port); 
    135  
     159    pj_sockaddr_in_init(&si.rtp_addr_name, addr, (pj_uint16_t)port); 
    136160    status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name,  
    137161                          sizeof(si.rtp_addr_name)); 
     
    146170 
    147171    /* Bind RTCP socket */ 
    148     si.rtcp_addr_name.sin_family = PJ_AF_INET; 
    149     si.rtcp_addr_name.sin_port = pj_htons((pj_uint16_t)(port+1)); 
    150  
     172    pj_sockaddr_in_init(&si.rtcp_addr_name, addr, (pj_uint16_t)(port+1)); 
    151173    status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name, 
    152174                          sizeof(si.rtcp_addr_name)); 
     
    182204    pj_ioqueue_callback rtp_cb, rtcp_cb; 
    183205    pj_ssize_t size; 
     206    unsigned i; 
    184207    pj_status_t status; 
    185208 
     
    224247     
    225248    pj_ioqueue_op_key_init(&tp->rtp_read_op, sizeof(tp->rtp_read_op)); 
    226     pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op)); 
     249    for (i=0; i<PJ_ARRAY_SIZE(tp->rtp_pending_write); ++i) 
     250        pj_ioqueue_op_key_init(&tp->rtp_pending_write[i].op_key,  
     251                               sizeof(tp->rtp_pending_write[i].op_key)); 
    227252 
    228253    /* Kick of pending RTP read from the ioqueue */ 
     
    271296 * Get media socket info. 
    272297 */ 
    273 PJ_DEF(pj_status_t) pjmedia_transport_udp_get_sock_info(pjmedia_transport *tp, 
    274                                                         pjmedia_sock_info *inf) 
     298PJ_DEF(pj_status_t)  
     299pjmedia_transport_udp_get_info( pjmedia_transport *tp, 
     300                                pjmedia_transport_udp_info *inf) 
    275301{ 
    276302    struct transport_udp *udp = (struct transport_udp*)tp; 
    277303    PJ_ASSERT_RETURN(tp && inf, PJ_EINVAL); 
    278304 
    279     inf->rtp_sock = udp->rtp_sock; 
    280     inf->rtp_addr_name = udp->rtp_addr_name; 
    281     inf->rtcp_sock = udp->rtcp_sock; 
    282     inf->rtcp_addr_name = udp->rtcp_addr_name; 
     305    inf->skinfo.rtp_sock = udp->rtp_sock; 
     306    inf->skinfo.rtp_addr_name = udp->rtp_addr_name; 
     307    inf->skinfo.rtcp_sock = udp->rtcp_sock; 
     308    inf->skinfo.rtcp_addr_name = udp->rtcp_addr_name; 
    283309 
    284310    return PJ_SUCCESS; 
     
    297323 
    298324    /* Must not close while stream is using this */ 
    299     PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP); 
     325    PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP); 
    300326     
    301327 
     
    335361 
    336362    do { 
    337         void (*cb)(pjmedia_stream*,const void*,pj_ssize_t); 
    338         pjmedia_stream *stream; 
     363        void (*cb)(void*,const void*,pj_ssize_t); 
     364        void *user_data; 
    339365 
    340366        cb = udp->rtp_cb; 
    341         stream = udp->stream; 
    342  
    343         if (bytes_read > 0 && cb && stream) 
    344             (*cb)(stream, udp->rtp_pkt, bytes_read); 
     367        user_data = udp->user_data; 
     368 
     369        if (udp->attached && cb) 
     370            (*cb)(user_data, udp->rtp_pkt, bytes_read); 
    345371 
    346372        /* See if source address of RTP packet is different than the  
     
    406432 
    407433    do { 
    408         void (*cb)(pjmedia_stream*,const void*,pj_ssize_t); 
    409         pjmedia_stream *stream; 
     434        void (*cb)(void*,const void*,pj_ssize_t); 
     435        void *user_data; 
    410436 
    411437        cb = udp->rtcp_cb; 
    412         stream = udp->stream; 
    413  
    414         if (bytes_read > 0 && cb && stream) 
    415             (*cb)(stream, udp->rtcp_pkt, bytes_read); 
     438        user_data = udp->user_data; 
     439 
     440        if (udp->attached && cb) 
     441            (*cb)(user_data, udp->rtcp_pkt, bytes_read); 
    416442 
    417443        bytes_read = sizeof(udp->rtcp_pkt); 
     
    425451/* Called by stream to initialize the transport */ 
    426452static pj_status_t transport_attach(   pjmedia_transport *tp, 
    427                                        pjmedia_stream *strm, 
     453                                       void *user_data, 
    428454                                       const pj_sockaddr_t *rem_addr, 
    429455                                       unsigned addr_len, 
    430                                        void (*rtp_cb)(pjmedia_stream*, 
     456                                       void (*rtp_cb)(void*, 
    431457                                                      const void*, 
    432458                                                      pj_ssize_t), 
    433                                        void (*rtcp_cb)(pjmedia_stream*, 
     459                                       void (*rtcp_cb)(void*, 
    434460                                                       const void*, 
    435461                                                       pj_ssize_t)) 
     
    438464 
    439465    /* Validate arguments */ 
    440     PJ_ASSERT_RETURN(tp && strm && rem_addr && addr_len, PJ_EINVAL); 
     466    PJ_ASSERT_RETURN(tp && rem_addr && addr_len, PJ_EINVAL); 
    441467 
    442468    /* Must not be "attached" to existing stream */ 
    443     PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP); 
     469    PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP); 
    444470 
    445471    /* "Attach" the stream: */ 
     
    456482    udp->rtp_cb = rtp_cb; 
    457483    udp->rtcp_cb = rtcp_cb; 
    458  
    459     /* Last, save the stream to mark that we have a "client" */ 
    460     udp->stream = strm; 
     484    udp->user_data = user_data; 
     485 
     486    /* Last, mark transport as attached */ 
     487    udp->attached = PJ_TRUE; 
    461488 
    462489    return PJ_SUCCESS; 
     
    466493/* Called by stream when it no longer needs the transport */ 
    467494static void transport_detach( pjmedia_transport *tp, 
    468                               pjmedia_stream *strm) 
     495                              void *user_data) 
    469496{ 
    470497    struct transport_udp *udp = (struct transport_udp*) tp; 
    471498 
    472     pj_assert(tp && strm); 
    473  
    474     PJ_UNUSED_ARG(strm); 
     499    pj_assert(tp); 
     500 
     501    /* User data is unreferenced on Release build */ 
     502    PJ_UNUSED_ARG(user_data); 
     503 
     504    /* As additional checking, check if the same user data is specified */ 
     505    pj_assert(user_data == udp->user_data); 
     506 
     507    /* First, mark stream as unattached */ 
     508    udp->attached = PJ_FALSE; 
    475509 
    476510    /* Clear up stream infos from transport */ 
    477     udp->stream = NULL; 
    478511    udp->rtp_cb = NULL; 
    479512    udp->rtcp_cb = NULL; 
     513    udp->user_data = NULL; 
    480514} 
    481515 
     
    488522    struct transport_udp *udp = (struct transport_udp*)tp; 
    489523    pj_ssize_t sent; 
     524    unsigned id; 
     525    struct pending_write *pw; 
    490526    pj_status_t status; 
    491527 
    492     PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP); 
     528    /* Must be attached */ 
     529    PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP); 
     530 
     531    /* Check that the size is supported */ 
     532    PJ_ASSERT_RETURN(size <= RTP_LEN, PJ_ETOOBIG); 
     533 
     534    id = udp->rtp_write_op_id; 
     535    pw = &udp->rtp_pending_write[id]; 
     536 
     537    /* We need to copy packet to our buffer because when the 
     538     * operation is pending, caller might write something else 
     539     * to the original buffer. 
     540     */ 
     541    pj_memcpy(pw->buffer, pkt, size); 
    493542 
    494543    sent = size; 
    495     status = pj_ioqueue_sendto( udp->rtp_key, &udp->rtp_write_op, 
    496                                 pkt, &sent, 0, 
    497                                 &udp->rem_rtp_addr, sizeof(pj_sockaddr_in)); 
     544    status = pj_ioqueue_sendto( udp->rtp_key,  
     545                                &udp->rtp_pending_write[id].op_key, 
     546                                pw->buffer, &sent, 0, 
     547                                &udp->rem_rtp_addr,  
     548                                sizeof(pj_sockaddr_in)); 
     549 
     550    udp->rtp_write_op_id = (udp->rtp_write_op_id + 1) % 
     551                           PJ_ARRAY_SIZE(udp->rtp_pending_write); 
    498552 
    499553    if (status==PJ_SUCCESS || status==PJ_EPENDING) 
     
    512566    pj_status_t status; 
    513567 
    514     PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP); 
     568    PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP); 
    515569 
    516570    sent = size; 
Note: See TracChangeset for help on using the changeset viewer.