Ignore:
Timestamp:
May 17, 2006 5:17:39 PM (18 years ago)
Author:
bennylp
Message:

Major modification in pjmedia to split stream transport into separate functionality, to allow using custom transports with streams

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjmedia/src/pjmedia/stream.c

    r450 r452  
    8686    void                    *user_data;     /**< User data.                 */ 
    8787 
     88    pjmedia_transport       *transport;     /**< Stream transport.          */ 
     89 
    8890    pjmedia_codec           *codec;         /**< Codec instance being used. */ 
    8991    pjmedia_codec_param      codec_param;   /**< Codec param.               */ 
     
    9294    pjmedia_jbuf            *jb;            /**< Jitter buffer.             */ 
    9395 
    94     pjmedia_sock_info        skinfo;        /**< Transport info.            */ 
    95     pj_sockaddr_in           rem_rtp_addr;  /**< Remote RTP address.        */ 
    96     pj_sockaddr_in           rem_rtcp_addr; /**< Remote RTCP address.       */ 
    97  
    98  
    9996    pjmedia_rtcp_session     rtcp;          /**< RTCP for incoming RTP.     */ 
    10097 
    101     pj_ioqueue_key_t        *rtp_key;       /**< RTP ioqueue key.           */ 
    102     pj_ioqueue_op_key_t      rtp_op_key;    /**< The pending read op key.   */ 
    103     pj_sockaddr_in           rtp_src_addr;  /**< addr of src pkt from remote*/ 
    104     unsigned                 rtp_src_cnt;   /**< if different, # of pkt rcv */ 
    105     int                      rtp_addrlen;   /**< Address length.            */ 
    106  
    107     pj_ioqueue_key_t        *rtcp_key;      /**< RTCP ioqueue key.          */ 
    108     pj_ioqueue_op_key_t      rtcp_op_key;   /**< The pending read op key.   */ 
    109     pj_size_t                rtcp_pkt_size; /**< Size of RTCP packet buf.   */ 
    110     char                     rtcp_pkt[512]; /**< RTCP packet buffer.        */ 
    11198    pj_uint32_t              rtcp_last_tx;  /**< RTCP tx time in timestamp  */ 
    11299    pj_uint32_t              rtcp_interval; /**< Interval, in timestamp.    */ 
    113     int                      rtcp_addrlen;  /**< Address length.            */ 
    114100 
    115101    /* RFC 2833 DTMF transmission queue: */ 
     
    424410         
    425411        pjmedia_rtcp_pkt *rtcp_pkt; 
    426         pj_ssize_t size; 
    427412        int len; 
    428         pj_status_t status; 
    429413 
    430414        pjmedia_rtcp_build_rtcp(&stream->rtcp, &rtcp_pkt, &len); 
    431         size = len; 
    432         status = pj_sock_sendto(stream->skinfo.rtcp_sock, rtcp_pkt, &size, 0, 
    433                                 &stream->rem_rtcp_addr,  
    434                                 sizeof(stream->rem_rtcp_addr)); 
    435 #if 0 
    436         if (status != PJ_SUCCESS) { 
    437             char errmsg[PJ_ERR_MSG_SIZE]; 
    438              
    439             pj_strerror(status, errmsg, sizeof(errmsg)); 
    440             PJ_LOG(4,(port->info.name.ptr, "Error sending RTCP: %s [%d]", 
    441                                  errmsg, status)); 
    442         } 
    443 #endif 
     415 
     416        (*stream->transport->op->send_rtcp)(stream->transport,  
     417                                            rtcp_pkt, len); 
    444418 
    445419        stream->rtcp_last_tx = timestamp; 
     
    587561    /* Send. */ 
    588562    sent = frame_out.size+sizeof(pjmedia_rtp_hdr); 
    589     status = pj_sock_sendto(stream->skinfo.rtp_sock, channel->out_pkt,  
    590                             &sent, 0, &stream->rem_rtp_addr,  
    591                             sizeof(stream->rem_rtp_addr)); 
    592     if (status != PJ_SUCCESS) 
    593         return status; 
     563 
     564    (*stream->transport->op->send_rtp)(stream->transport, 
     565                                       channel->out_pkt, sent); 
     566 
    594567 
    595568    /* Update stat */ 
     
    683656 
    684657/* 
    685  * This callback is called by ioqueue framework on receipt of packets 
     658 * This callback is called by stream transport on receipt of packets 
    686659 * in the RTP socket.  
    687660 */ 
    688 static void on_rx_rtp( pj_ioqueue_key_t *key,  
    689                        pj_ioqueue_op_key_t *op_key,  
     661static void on_rx_rtp( pjmedia_stream *stream,  
     662                       const void *pkt, 
    690663                       pj_ssize_t bytes_read) 
    691664 
    692665{ 
    693     pjmedia_stream *stream = pj_ioqueue_get_user_data(key); 
    694666    pjmedia_channel *channel = stream->dec; 
     667    const pjmedia_rtp_hdr *hdr; 
     668    const void *payload; 
     669    unsigned payloadlen; 
     670    pjmedia_rtp_status seq_st; 
    695671    pj_status_t status; 
    696672 
    697      
    698     PJ_UNUSED_ARG(op_key); 
    699  
    700  
    701     /* 
    702      * Loop while we have packet. 
     673 
     674    /* Update RTP and RTCP session. */ 
     675    status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, bytes_read, 
     676                                    &hdr, &payload, &payloadlen); 
     677    if (status != PJ_SUCCESS) { 
     678        LOGERR_((stream->port.info.name.ptr, "RTP decode error", status)); 
     679        return; 
     680    } 
     681 
     682 
     683    /* Inform RTCP session */ 
     684    pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), 
     685                        pj_ntohl(hdr->ts), payloadlen); 
     686 
     687    /* Handle incoming DTMF. */ 
     688    if (hdr->pt == stream->rx_event_pt) { 
     689        handle_incoming_dtmf(stream, payload, payloadlen); 
     690        return; 
     691    } 
     692 
     693 
     694    /* Update RTP session (also checks if RTP session can accept 
     695     * the incoming packet. 
    703696     */ 
    704     do { 
    705         const pjmedia_rtp_hdr *hdr; 
    706         const void *payload; 
    707         unsigned payloadlen; 
    708         pjmedia_rtp_status seq_st; 
    709  
    710         /* Go straight to read next packet if bytes_read == 0. 
     697    pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st); 
     698    if (seq_st.status.value) { 
     699        TRC_  ((stream->port.info.name.ptr,  
     700                "RTP status: badpt=%d, badssrc=%d, dup=%d, " 
     701                "outorder=%d, probation=%d, restart=%d",  
     702                seq_st.status.flag.badpt, 
     703                seq_st.status.flag.badssrc, 
     704                seq_st.status.flag.dup, 
     705                seq_st.status.flag.outorder, 
     706                seq_st.status.flag.probation, 
     707                seq_st.status.flag.restart)); 
     708 
     709        if (seq_st.status.flag.badpt) { 
     710            PJ_LOG(4,(stream->port.info.name.ptr, 
     711                      "Bad RTP pt %d (expecting %d)", 
     712                      hdr->pt, channel->rtp.out_pt)); 
     713        } 
     714    } 
     715 
     716    /* Skip bad RTP packet */ 
     717    if (seq_st.status.flag.bad) 
     718        return; 
     719 
     720 
     721    /* Put "good" packet to jitter buffer, or reset the jitter buffer 
     722     * when RTP session is restarted. 
     723     */ 
     724    pj_mutex_lock( stream->jb_mutex ); 
     725    if (seq_st.status.flag.restart) { 
     726        status = pjmedia_jbuf_reset(stream->jb); 
     727        PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); 
     728 
     729    } else { 
     730        /* 
     731         * Packets may contain more than one frames, while the jitter 
     732         * buffer can only take one frame per "put" operation. So we need 
     733         * to ask the codec to "parse" the payload into multiple frames. 
    711734         */ 
    712         if (bytes_read == 0) 
    713             goto read_next_packet; 
    714  
    715         if (bytes_read < 0) 
    716             goto read_next_packet; 
    717  
    718         /* Update RTP and RTCP session. */ 
    719         status = pjmedia_rtp_decode_rtp(&channel->rtp,  
    720                                         channel->in_pkt, bytes_read,  
    721                                         &hdr, &payload, &payloadlen); 
     735        enum { MAX = 16 }; 
     736        pj_timestamp ts; 
     737        unsigned i, count = MAX; 
     738        unsigned samples_per_frame; 
     739        pjmedia_frame frames[MAX]; 
     740 
     741        /* Get the timestamp of the first sample */ 
     742        ts.u64 = pj_ntohl(hdr->ts); 
     743 
     744        /* Parse the payload. */ 
     745        status = (*stream->codec->op->parse)(stream->codec, 
     746                                             (void*)payload, 
     747                                             payloadlen, 
     748                                             &ts, 
     749                                             &count, 
     750                                             frames); 
    722751        if (status != PJ_SUCCESS) { 
    723             LOGERR_((stream->port.info.name.ptr, "RTP decode error", status)); 
    724             goto read_next_packet; 
     752            LOGERR_((stream->port.info.name.ptr,  
     753                     "Codec parse() error",  
     754                     status)); 
     755            count = 0; 
    725756        } 
    726757 
    727  
    728         /* Inform RTCP session */ 
    729         pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), 
    730                             pj_ntohl(hdr->ts), payloadlen); 
    731  
    732         /* Handle incoming DTMF. */ 
    733         if (hdr->pt == stream->rx_event_pt) { 
    734             handle_incoming_dtmf(stream, payload, payloadlen); 
    735             goto read_next_packet; 
     758        /* Put each frame to jitter buffer. */ 
     759        samples_per_frame = stream->codec_param.info.frm_ptime *  
     760                            stream->codec_param.info.clock_rate * 
     761                            stream->codec_param.info.channel_cnt / 
     762                            1000; 
     763                             
     764        for (i=0; i<count; ++i) { 
     765            unsigned ext_seq; 
     766 
     767            ext_seq = (unsigned)(frames[i].timestamp.u64 / 
     768                                 samples_per_frame); 
     769            pjmedia_jbuf_put_frame(stream->jb, frames[i].buf,  
     770                                   frames[i].size, ext_seq); 
     771 
    736772        } 
    737  
    738  
    739         /* Update RTP session (also checks if RTP session can accept 
    740          * the incoming packet. 
    741          */ 
    742         pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st); 
    743         if (seq_st.status.value) { 
    744             TRC_  ((stream->port.info.name.ptr,  
    745                     "RTP status: badpt=%d, badssrc=%d, dup=%d, " 
    746                     "outorder=%d, probation=%d, restart=%d",  
    747                     seq_st.status.flag.badpt, 
    748                     seq_st.status.flag.badssrc, 
    749                     seq_st.status.flag.dup, 
    750                     seq_st.status.flag.outorder, 
    751                     seq_st.status.flag.probation, 
    752                     seq_st.status.flag.restart)); 
    753  
    754             if (seq_st.status.flag.badpt) { 
    755                 PJ_LOG(4,(stream->port.info.name.ptr, 
    756                           "Bad RTP pt %d (expecting %d)", 
    757                           hdr->pt, channel->rtp.out_pt)); 
    758             } 
    759         } 
    760  
    761         /* Skip bad RTP packet */ 
    762         if (seq_st.status.flag.bad) 
    763             goto read_next_packet; 
    764  
    765  
    766         /* See if source address of RTP packet is different than the  
    767          * configured address. 
    768          */ 
    769         if ((stream->rem_rtp_addr.sin_addr.s_addr !=  
    770              stream->rtp_src_addr.sin_addr.s_addr) || 
    771             (stream->rem_rtp_addr.sin_port != stream->rtp_src_addr.sin_port)) 
    772         { 
    773             stream->rtp_src_cnt++; 
    774  
    775             if (stream->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) { 
    776              
    777                 stream->rem_rtp_addr = stream->rtp_src_addr; 
    778                 stream->rtp_src_cnt = 0; 
    779  
    780                 PJ_LOG(4,(stream->port.info.name.ptr, 
    781                           "Remote RTP address switched to %s:%d", 
    782                           pj_inet_ntoa(stream->rtp_src_addr.sin_addr), 
    783                           pj_ntohs(stream->rtp_src_addr.sin_port))); 
    784             } 
    785         } 
    786  
    787  
    788  
    789         /* Put "good" packet to jitter buffer, or reset the jitter buffer 
    790          * when RTP session is restarted. 
    791          */ 
    792         pj_mutex_lock( stream->jb_mutex ); 
    793         if (seq_st.status.flag.restart) { 
    794             status = pjmedia_jbuf_reset(stream->jb); 
    795             PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); 
    796  
    797         } else { 
    798             /* 
    799              * Packets may contain more than one frames, while the jitter 
    800              * buffer can only take one frame per "put" operation. So we need 
    801              * to ask the codec to "parse" the payload into multiple frames. 
    802              */ 
    803             enum { MAX = 16 }; 
    804             pj_timestamp ts; 
    805             unsigned i, count = MAX; 
    806             unsigned samples_per_frame; 
    807             pjmedia_frame frames[MAX]; 
    808  
    809             /* Get the timestamp of the first sample */ 
    810             ts.u64 = pj_ntohl(hdr->ts); 
    811  
    812             /* Parse the payload. */ 
    813             status = (*stream->codec->op->parse)(stream->codec, 
    814                                                  (void*)payload, 
    815                                                  payloadlen, 
    816                                                  &ts, 
    817                                                  &count, 
    818                                                  frames); 
    819             if (status != PJ_SUCCESS) { 
    820                 LOGERR_((stream->port.info.name.ptr,  
    821                          "Codec parse() error",  
    822                          status)); 
    823                 count = 0; 
    824             } 
    825  
    826             /* Put each frame to jitter buffer. */ 
    827             samples_per_frame = stream->codec_param.info.frm_ptime *  
    828                                 stream->codec_param.info.clock_rate * 
    829                                 stream->codec_param.info.channel_cnt / 
    830                                 1000; 
    831                                  
    832             for (i=0; i<count; ++i) { 
    833                 unsigned ext_seq; 
    834  
    835                 ext_seq = (unsigned)(frames[i].timestamp.u64 / 
    836                                      samples_per_frame); 
    837                 pjmedia_jbuf_put_frame(stream->jb, frames[i].buf,  
    838                                        frames[i].size, ext_seq); 
    839  
    840             } 
    841         } 
    842         pj_mutex_unlock( stream->jb_mutex ); 
    843  
    844  
    845         /* Check if now is the time to transmit RTCP SR/RR report. 
    846          * We only do this when stream direction is "decoding only",  
    847          * because otherwise check_tx_rtcp() will be handled by put_frame() 
    848          */ 
    849         if (stream->dir == PJMEDIA_DIR_DECODING) { 
    850             check_tx_rtcp(stream, pj_ntohl(hdr->ts)); 
    851         } 
    852  
    853         if (status != 0) { 
    854             LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error",  
    855                     status)); 
    856             goto read_next_packet; 
    857         } 
    858  
    859  
    860 read_next_packet: 
    861         bytes_read = channel->in_pkt_size; 
    862         stream->rtp_addrlen = sizeof(stream->rtp_src_addr); 
    863         status = pj_ioqueue_recvfrom( stream->rtp_key, 
    864                                       &stream->rtp_op_key, 
    865                                       channel->in_pkt,  
    866                                       &bytes_read, 0, 
    867                                       &stream->rtp_src_addr,  
    868                                       &stream->rtp_addrlen); 
    869  
    870         if (status != PJ_SUCCESS) { 
    871             bytes_read = -status; 
    872         } 
    873  
    874     } while (status == PJ_SUCCESS || 
    875              status == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)); 
    876  
    877     if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
    878         char errmsg[PJ_ERR_MSG_SIZE]; 
    879  
    880         pj_strerror(status, errmsg, sizeof(errmsg)); 
    881         PJ_LOG(4,(stream->port.info.name.ptr,  
    882                   "Error reading RTP packet: %s [status=%d]. " 
    883                   "RTP stream thread quitting!", 
    884                   errmsg, status)); 
    885     } 
    886 } 
    887  
    888  
    889 /* 
    890  * This callback is called by ioqueue framework on receipt of packets 
     773    } 
     774    pj_mutex_unlock( stream->jb_mutex ); 
     775 
     776 
     777    /* Check if now is the time to transmit RTCP SR/RR report. 
     778     * We only do this when stream direction is "decoding only",  
     779     * because otherwise check_tx_rtcp() will be handled by put_frame() 
     780     */ 
     781    if (stream->dir == PJMEDIA_DIR_DECODING) { 
     782        check_tx_rtcp(stream, pj_ntohl(hdr->ts)); 
     783    } 
     784 
     785    if (status != 0) { 
     786        LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error",  
     787                status)); 
     788        return; 
     789    } 
     790} 
     791 
     792 
     793/* 
     794 * This callback is called by stream transport on receipt of packets 
    891795 * in the RTCP socket.  
    892796 */ 
    893 static void on_rx_rtcp( pj_ioqueue_key_t *key,  
    894                         pj_ioqueue_op_key_t *op_key,  
     797static void on_rx_rtcp( pjmedia_stream *stream, 
     798                        const void *pkt,  
    895799                        pj_ssize_t bytes_read) 
    896800{ 
    897     pjmedia_stream *stream = pj_ioqueue_get_user_data(key); 
    898     pj_status_t status; 
    899  
    900     PJ_UNUSED_ARG(op_key); 
    901  
    902     do { 
    903         if (bytes_read > 0) { 
    904             pjmedia_rtcp_rx_rtcp(&stream->rtcp, stream->rtcp_pkt,  
    905                                  bytes_read); 
    906         } 
    907  
    908         bytes_read = stream->rtcp_pkt_size; 
    909         stream->rtcp_addrlen = sizeof(stream->rem_rtcp_addr); 
    910         status = pj_ioqueue_recvfrom( stream->rtcp_key, 
    911                                       &stream->rtcp_op_key, 
    912                                       stream->rtcp_pkt, 
    913                                       &bytes_read, 0, 
    914                                       &stream->rem_rtcp_addr, 
    915                                       &stream->rtcp_addrlen); 
    916  
    917     } while (status == PJ_SUCCESS); 
    918  
    919     if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
    920         char errmsg[PJ_ERR_MSG_SIZE]; 
    921  
    922         pj_strerror(status, errmsg, sizeof(errmsg)); 
    923         PJ_LOG(4,(stream->port.info.name.ptr,  
    924                   "Error reading RTCP packet: %s [status=%d]", 
    925                   errmsg, status)); 
    926     } 
    927  
     801    pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read); 
    928802} 
    929803 
     
    994868                                           pj_pool_t *pool, 
    995869                                           const pjmedia_stream_info *info, 
     870                                           pjmedia_transport *tp, 
    996871                                           void *user_data, 
    997872                                           pjmedia_stream **p_stream) 
     
    999874{ 
    1000875    pjmedia_stream *stream; 
    1001     pj_ioqueue_callback ioqueue_cb; 
    1002     pj_uint16_t rtcp_port; 
    1003876    unsigned jb_init, jb_max, jb_min_pre, jb_max_pre; 
    1004877    pj_status_t status; 
     
    1037910    stream->dir = info->dir; 
    1038911    stream->user_data = user_data; 
    1039     stream->skinfo = info->sock_info; 
    1040     stream->rem_rtp_addr = info->rem_addr; 
    1041     rtcp_port = (pj_uint16_t) (pj_ntohs(info->rem_addr.sin_port)+1); 
    1042     stream->rem_rtcp_addr = stream->rem_rtp_addr; 
    1043     stream->rem_rtcp_addr.sin_port = pj_htons(rtcp_port); 
    1044912    stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL + (pj_rand() % 8000)) *  
    1045913                            info->fmt.clock_rate / 1000; 
     
    1048916    stream->rx_event_pt = info->rx_event_pt ? info->rx_event_pt : -1; 
    1049917    stream->last_dtmf = -1; 
     918 
     919    /* Attach transport */ 
     920    status = (*tp->op->attach)(tp, stream, &info->rem_addr,  
     921                               sizeof(info->rem_addr), &on_rx_rtp, 
     922                               &on_rx_rtcp); 
     923    if (status != PJ_SUCCESS) 
     924        goto err_cleanup; 
     925 
     926    stream->transport = tp; 
    1050927 
    1051928 
     
    11611038        goto err_cleanup; 
    11621039 
    1163     /*  Register RTP socket to ioqueue */ 
    1164     pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); 
    1165     ioqueue_cb.on_read_complete = &on_rx_rtp; 
    1166  
    1167     status = pj_ioqueue_register_sock( pool,  
    1168                                        pjmedia_endpt_get_ioqueue(endpt),  
    1169                                        stream->skinfo.rtp_sock, 
    1170                                        stream, &ioqueue_cb, &stream->rtp_key); 
    1171     if (status != PJ_SUCCESS) 
    1172         goto err_cleanup; 
    1173  
    1174     /* Init pending operation key. */ 
    1175     pj_ioqueue_op_key_init(&stream->rtp_op_key, sizeof(stream->rtp_op_key)); 
    1176  
    1177     /* Bootstrap the first recvfrom() operation. */ 
    1178     on_rx_rtp( stream->rtp_key, &stream->rtp_op_key, 0); 
    1179  
    1180  
    1181     /* Register RTCP socket to ioqueue. */ 
    1182     if (stream->skinfo.rtcp_sock != PJ_INVALID_SOCKET) { 
    1183         pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); 
    1184         ioqueue_cb.on_read_complete = &on_rx_rtcp; 
    1185  
    1186         status = pj_ioqueue_register_sock( pool,  
    1187                                            pjmedia_endpt_get_ioqueue(endpt), 
    1188                                            stream->skinfo.rtcp_sock, 
    1189                                            stream, &ioqueue_cb,  
    1190                                            &stream->rtcp_key); 
    1191         if (status != PJ_SUCCESS) 
    1192             goto err_cleanup; 
    1193     } 
    1194  
    1195     /* Init pending operation key. */ 
    1196     pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key)); 
    1197  
    1198     stream->rtcp_pkt_size = sizeof(stream->rtcp_pkt); 
    1199  
    1200     /* Bootstrap the first recvfrom() operation. */ 
    1201     on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0); 
     1040 
    12021041 
    12031042    /* Success! */ 
     
    12281067 
    12291068 
    1230     /* Unregister from ioqueue. */ 
    1231     if (stream->rtp_key) { 
    1232         pj_ioqueue_unregister(stream->rtp_key); 
    1233         stream->rtp_key = NULL; 
    1234     } 
    1235     if (stream->rtcp_key) { 
    1236         pj_ioqueue_unregister(stream->rtcp_key); 
    1237         stream->rtcp_key = NULL; 
     1069    /* Detach from transport */ 
     1070    if (stream->transport) { 
     1071        (*stream->transport->op->detach)(stream->transport, stream); 
     1072        stream->transport = NULL; 
    12381073    } 
    12391074 
     
    12661101    *p_port = &stream->port; 
    12671102    return PJ_SUCCESS; 
     1103} 
     1104 
     1105 
     1106/* 
     1107 * Get the transport object 
     1108 */ 
     1109PJ_DEF(pjmedia_transport*) pjmedia_stream_get_transport(pjmedia_stream *st) 
     1110{ 
     1111    return st->transport; 
    12681112} 
    12691113 
Note: See TracChangeset for help on using the changeset viewer.