Ignore:
Timestamp:
Mar 22, 2006 11:59:11 AM (18 years ago)
Author:
bennylp
Message:

Redesign RTP/RTCP stuffs so that stream does not create thread implicitly. Changed pjmedia_endpt_create() API.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjmedia/src/pjmedia/stream.c

    r327 r350  
    2727#include <pj/compat/socket.h> 
    2828#include <pj/errno.h> 
     29#include <pj/ioqueue.h> 
    2930#include <pj/log.h> 
    3031#include <pj/os.h> 
     
    7980struct pjmedia_stream 
    8081{ 
     82    pjmedia_endpt           *endpt;         /**< Media endpoint.            */ 
     83    pjmedia_codec_mgr       *codec_mgr;     /**< Codec manager instance.    */ 
     84 
    8185    pjmedia_port             port;          /**< Port interface.            */ 
    8286    pjmedia_channel         *enc;           /**< Encoding channel.          */ 
     
    8791    void                    *user_data;     /**< User data.                 */ 
    8892 
    89     pjmedia_codec_mgr       *codec_mgr;     /**< Codec manager instance.    */ 
    9093    pjmedia_codec           *codec;         /**< Codec instance being used. */ 
    9194    pj_size_t                frame_size;    /**< Size of encoded frame.     */ 
     
    97100    pj_sockaddr_in           rem_rtcp_addr; /**< Remote RTCP address.       */ 
    98101 
    99     pj_sockaddr_in           rem_src_rtp;   /**< addr of src pkt from remote*/ 
    100     unsigned                 rem_src_cnt;   /**< if different, # of pkt rcv */ 
    101102 
    102103    pjmedia_rtcp_session     rtcp;          /**< RTCP for incoming RTP.     */ 
    103104 
    104     pj_bool_t                quit_flag;     /**< To signal thread exit.     */ 
    105     pj_thread_t             *thread;        /**< Jitter buffer's thread.    */ 
     105    pj_ioqueue_key_t        *rtp_key;       /**< RTP ioqueue key.           */ 
     106    pj_ioqueue_op_key_t      rtp_op_key;    /**< The pending read op key.   */ 
     107    pj_sockaddr_in           rtp_src_addr;  /**< addr of src pkt from remote*/ 
     108    unsigned                 rtp_src_cnt;   /**< if different, # of pkt rcv */ 
     109    int                      rtp_addrlen;   /**< Address length.            */ 
     110 
     111    pj_ioqueue_key_t        *rtcp_key;      /**< RTCP ioqueue key.          */ 
     112    pj_ioqueue_op_key_t      rtcp_op_key;   /**< The pending read op key.   */ 
     113 
    106114 
    107115    /* RFC 2833 DTMF transmission queue: */ 
     
    152160    pj_status_t status; 
    153161    struct pjmedia_frame frame_in, frame_out; 
    154  
    155     /* Do nothing if we're quitting. */ 
    156     if (stream->quit_flag) { 
    157         frame->type = PJMEDIA_FRAME_TYPE_NONE; 
    158         return PJ_SUCCESS; 
    159     } 
    160162 
    161163    /* Lock jitter buffer mutex */ 
     
    272274    pj_ssize_t sent; 
    273275 
    274     /* Check if stream is quitting. */ 
    275     if (stream->quit_flag) 
    276         return -1; 
    277  
    278276    /* Number of samples in the frame */ 
    279277    ts_len = frame->size / 2; 
     
    439437 
    440438/* 
    441  * This thread will poll the socket for incoming packets, and put 
    442  * the packets to jitter buffer. 
    443  */ 
    444 static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg) 
    445 { 
    446     pjmedia_stream *stream = arg; 
     439 * This callback is called by ioqueue framework on receipt of packets 
     440 * in the RTP socket.  
     441 */ 
     442static void on_rx_rtp( pj_ioqueue_key_t *key,  
     443                       pj_ioqueue_op_key_t *op_key,  
     444                       pj_ssize_t bytes_read) 
     445 
     446{ 
     447    pjmedia_stream *stream = pj_ioqueue_get_user_data(key); 
    447448    pjmedia_channel *channel = stream->dec; 
    448  
    449  
    450     while (!stream->quit_flag) { 
    451         pj_ssize_t len; 
     449    pj_status_t status; 
     450 
     451     
     452    PJ_UNUSED_ARG(op_key); 
     453 
     454 
     455    /* 
     456     * Loop while we have packet. 
     457     */ 
     458    do { 
    452459        const pjmedia_rtp_hdr *hdr; 
    453460        const void *payload; 
    454461        unsigned payloadlen; 
    455         int addrlen; 
    456         int status; 
    457  
    458         /* Wait for packet. */ 
    459         pj_fd_set_t fds; 
    460         pj_time_val timeout; 
    461  
    462         PJ_FD_ZERO (&fds); 
    463         PJ_FD_SET (stream->skinfo.rtp_sock, &fds); 
    464         timeout.sec = 0; 
    465         timeout.msec = 1; 
    466  
    467         /* Wait with timeout. */ 
    468         status = pj_sock_select(FD_SETSIZE, &fds, NULL, NULL, &timeout); 
    469         if (status < 0) { 
    470             TRACE_((THIS_FILE, "Jitter buffer select() error",  
    471                     pj_get_netos_error())); 
    472             pj_thread_sleep(500); 
    473             continue; 
    474         } else if (status == 0) 
    475             continue; 
    476  
    477         /* Get packet from socket. */ 
    478         len = channel->in_pkt_size; 
    479         addrlen = sizeof(stream->rem_src_rtp); 
    480         status = pj_sock_recvfrom(stream->skinfo.rtp_sock,  
    481                                   channel->in_pkt, &len, 0, 
    482                                   &stream->rem_src_rtp, &addrlen); 
    483         if (len < 1 || status != PJ_SUCCESS) { 
    484             if (pj_get_netos_error() == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { 
    485                 /* On Win2K SP2 (or above) and WinXP, recv() will get  
    486                  * WSAECONNRESET when the sending side receives ICMP port  
    487                  * unreachable. 
    488                  */ 
    489                 continue; 
    490             } 
    491             pj_thread_sleep(1); 
    492             continue; 
    493         } 
    494  
    495         if (channel->paused) 
    496             continue; 
     462 
     463        /* Go straight to read next packet if bytes_read == 0. 
     464         */ 
     465        if (bytes_read == 0) 
     466            goto read_next_packet; 
     467 
    497468 
    498469        /* Update RTP and RTCP session. */ 
    499         status = pjmedia_rtp_decode_rtp(&channel->rtp, channel->in_pkt, len,  
    500                                    &hdr, &payload, &payloadlen); 
     470        status = pjmedia_rtp_decode_rtp(&channel->rtp,  
     471                                        channel->in_pkt, bytes_read,  
     472                                        &hdr, &payload, &payloadlen); 
    501473        if (status != PJ_SUCCESS) { 
    502474            TRACE_((THIS_FILE, "RTP decode error", status)); 
    503             continue; 
     475            goto read_next_packet; 
    504476        } 
     477 
    505478 
    506479        /* Handle incoming DTMF. */ 
    507480        if (hdr->pt == stream->rx_event_pt) { 
    508481            handle_incoming_dtmf(stream, payload, payloadlen); 
    509             continue; 
     482            goto read_next_packet; 
    510483        } 
    511484 
     485 
     486        /* Update RTP session (also checks if RTP session can accept 
     487         * the incoming packet. 
     488         */ 
    512489        status = pjmedia_rtp_session_update(&channel->rtp, hdr); 
    513490        if (status != 0 &&  
     
    519496            PJ_LOG(4,(THIS_FILE,"RTP packet detail: pt=%d, seq=%d", 
    520497                      hdr->pt, pj_ntohs(hdr->seq))); 
    521             continue; 
     498            goto read_next_packet; 
    522499        } 
    523         pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts)); 
     500 
     501 
     502        /* Update the RTCP session. */ 
     503        pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq),  
     504                            pj_ntohl(hdr->ts)); 
     505 
    524506 
    525507        /* Update stat */ 
    526508        stream->stat.dec.pkt++; 
    527         stream->stat.dec.bytes += len; 
     509        stream->stat.dec.bytes += bytes_read; 
     510 
    528511 
    529512        /* See if source address of RTP packet is different than the  
     
    531514         */ 
    532515        if ((stream->rem_rtp_addr.sin_addr.s_addr !=  
    533              stream->rem_src_rtp.sin_addr.s_addr) || 
    534             (stream->rem_rtp_addr.sin_port != stream->rem_src_rtp.sin_port)) 
     516             stream->rtp_src_addr.sin_addr.s_addr) || 
     517            (stream->rem_rtp_addr.sin_port != stream->rtp_src_addr.sin_port)) 
    535518        { 
    536             stream->rem_src_cnt++; 
    537  
    538             if (stream->rem_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) { 
     519            stream->rtp_src_cnt++; 
     520 
     521            if (stream->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) { 
    539522             
    540                 stream->rem_rtp_addr = stream->rem_src_rtp; 
    541                 stream->rem_src_cnt = 0; 
     523                stream->rem_rtp_addr = stream->rtp_src_addr; 
     524                stream->rtp_src_cnt = 0; 
    542525 
    543526                PJ_LOG(4,(THIS_FILE,"Remote RTP address switched to %s:%d", 
    544                           pj_inet_ntoa(stream->rem_src_rtp.sin_addr), 
    545                           pj_ntohs(stream->rem_src_rtp.sin_port))); 
     527                          pj_inet_ntoa(stream->rtp_src_addr.sin_addr), 
     528                          pj_ntohs(stream->rtp_src_addr.sin_port))); 
    546529            } 
    547530        } 
    548531 
     532 
    549533        /* Put to jitter buffer. */ 
    550534        pj_mutex_lock( stream->jb_mutex ); 
    551         status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen, pj_ntohs(hdr->seq)); 
     535        status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen,  
     536                                        pj_ntohs(hdr->seq)); 
    552537        pj_mutex_unlock( stream->jb_mutex ); 
    553538 
    554539        if (status != 0) { 
    555540            TRACE_((THIS_FILE, "Jitter buffer put() error", status)); 
    556             continue; 
     541            goto read_next_packet; 
    557542        } 
    558     } 
    559  
    560     return 0; 
     543 
     544read_next_packet: 
     545        bytes_read = channel->in_pkt_size; 
     546        stream->rtp_addrlen = sizeof(stream->rtp_src_addr); 
     547        status = pj_ioqueue_recvfrom( stream->rtp_key, 
     548                                      &stream->rtp_op_key, 
     549                                      channel->in_pkt,  
     550                                      &bytes_read, 0, 
     551                                      &stream->rtp_src_addr,  
     552                                      &stream->rtp_addrlen); 
     553 
     554    } while (status == PJ_SUCCESS); 
     555 
     556    if (status != PJ_SUCCESS && status != PJ_EPENDING) { 
     557        char errmsg[PJ_ERR_MSG_SIZE]; 
     558 
     559        pj_strerror(status, errmsg, sizeof(errmsg)); 
     560        PJ_LOG(4,(THIS_FILE, "Error reading RTP packet: %s [status=%d]", 
     561                             errmsg, status)); 
     562    } 
     563} 
     564 
     565 
     566/* 
     567 * This callback is called by ioqueue framework on receipt of packets 
     568 * in the RTCP socket.  
     569 */ 
     570static void on_rx_rtcp( pj_ioqueue_key_t *key,  
     571                        pj_ioqueue_op_key_t *op_key,  
     572                        pj_ssize_t bytes_read) 
     573{ 
     574    PJ_UNUSED_ARG(key); 
     575    PJ_UNUSED_ARG(op_key); 
     576    PJ_UNUSED_ARG(bytes_read); 
    561577} 
    562578 
     
    642658    pjmedia_stream *stream; 
    643659    pjmedia_codec_param codec_param; 
     660    pj_ioqueue_callback ioqueue_cb; 
    644661    pj_status_t status; 
    645662 
     
    668685 
    669686    /* Init stream: */ 
    670     
     687    stream->endpt = endpt; 
     688    stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); 
    671689    stream->dir = info->dir; 
    672690    stream->user_data = user_data; 
    673     stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); 
    674691    stream->skinfo = info->sock_info; 
    675692    stream->rem_rtp_addr = info->rem_addr; 
     
    733750 
    734751 
    735     /*  Create jitter buffer thread: */ 
    736  
    737     status = pj_thread_create(pool, "decode",  
    738                               &jitter_buffer_thread, stream, 
    739                               0, PJ_THREAD_SUSPENDED, &stream->thread); 
    740     if (status != PJ_SUCCESS) 
    741         goto err_cleanup; 
    742  
    743  
    744752    /* Create decoder channel: */ 
    745753 
     
    757765        goto err_cleanup; 
    758766 
    759     /* Resume jitter buffer thread. */ 
    760     status = pj_thread_resume( stream->thread ); 
     767    /*  Register RTP socket to ioqueue */ 
     768    pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); 
     769    ioqueue_cb.on_read_complete = &on_rx_rtp; 
     770 
     771    status = pj_ioqueue_register_sock( pool,  
     772                                       pjmedia_endpt_get_ioqueue(endpt),  
     773                                       stream->skinfo.rtp_sock, 
     774                                       stream, &ioqueue_cb, &stream->rtp_key); 
    761775    if (status != PJ_SUCCESS) 
    762776        goto err_cleanup; 
     777 
     778    /* Init pending operation key. */ 
     779    pj_ioqueue_op_key_init(&stream->rtp_op_key, sizeof(stream->rtp_op_key)); 
     780 
     781    /* Bootstrap the first recvfrom() operation. */ 
     782    on_rx_rtp( stream->rtp_key, &stream->rtp_op_key, 0); 
     783 
     784 
     785    /* Register RTCP socket to ioqueue. */ 
     786    if (stream->skinfo.rtcp_sock != PJ_INVALID_SOCKET) { 
     787        pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); 
     788        ioqueue_cb.on_read_complete = &on_rx_rtcp; 
     789 
     790        status = pj_ioqueue_register_sock( pool,  
     791                                           pjmedia_endpt_get_ioqueue(endpt), 
     792                                           stream->skinfo.rtcp_sock, 
     793                                           stream, &ioqueue_cb,  
     794                                           &stream->rtcp_key); 
     795        if (status != PJ_SUCCESS) 
     796            goto err_cleanup; 
     797    } 
     798 
     799    /* Init pending operation key. */ 
     800    pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key)); 
     801 
     802    /* Bootstrap the first recvfrom() operation. */ 
     803    on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0); 
    763804 
    764805    /* Success! */ 
     
    781822    PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL); 
    782823 
    783     /* Signal threads to quit. */ 
    784  
    785     stream->quit_flag = 1; 
    786  
    787  
    788     /* Close encoding sound stream. */ 
    789      
    790     /* 
    791     if (stream->enc && stream->enc->snd_stream) { 
    792  
    793         pjmedia_snd_stream_stop(stream->enc->snd_stream); 
    794         pjmedia_snd_stream_close(stream->enc->snd_stream); 
    795         stream->enc->snd_stream = NULL; 
    796  
    797     } 
    798     */ 
    799  
    800     /* Close decoding sound stream. */ 
    801  
    802     /* 
    803     if (stream->dec && stream->dec->snd_stream) { 
    804  
    805         pjmedia_snd_stream_stop(stream->dec->snd_stream); 
    806         pjmedia_snd_stream_close(stream->dec->snd_stream); 
    807         stream->dec->snd_stream = NULL; 
    808  
    809     } 
    810     */ 
    811  
    812     /* Wait for jitter buffer thread to quit: */ 
    813  
    814     if (stream->thread) { 
    815         pj_thread_join(stream->thread); 
    816         pj_thread_destroy(stream->thread); 
    817         stream->thread = NULL; 
     824 
     825    /* This function may be called when stream is partly initialized. */ 
     826    if (stream->jb_mutex) 
     827        pj_mutex_lock(stream->jb_mutex); 
     828 
     829 
     830    /* Unregister from ioqueue. */ 
     831    if (stream->rtp_key) { 
     832        pj_ioqueue_unregister(stream->rtp_key); 
     833        stream->rtp_key = NULL; 
     834    } 
     835    if (stream->rtcp_key) { 
     836        pj_ioqueue_unregister(stream->rtcp_key); 
     837        stream->rtcp_key = NULL; 
    818838    } 
    819839 
Note: See TracChangeset for help on using the changeset viewer.