Ignore:
Timestamp:
Feb 8, 2006 10:43:39 PM (18 years ago)
Author:
bennylp
Message:

Finished new pjmedia rewrite

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/trunk/pjmedia/src/pjmedia/stream.c

    r121 r159  
    3232 
    3333 
    34 #define THISFILE    "stream.c" 
    35 #define ERRLEVEL    1 
    36  
    37 #define PJ_MAX_FRAME_DURATION_MS    200 
    38 #define PJ_MAX_BUFFER_SIZE_MS       2000 
    39 #define PJ_MAX_MTU                  1500 
     34#define THIS_FILE                       "stream.c" 
     35#define ERRLEVEL                        1 
     36#define TRACE_(expr)                    PJ_LOG(3,expr) 
     37 
     38#define PJMEDIA_MAX_FRAME_DURATION_MS   200 
     39#define PJMEDIA_MAX_BUFFER_SIZE_MS      2000 
     40#define PJMEDIA_MAX_MTU                 1500 
    4041 
    4142struct jb_frame 
     
    4950#define pj_fifobuf_free(fifo, buf)      free(buf) 
    5051 
    51 enum stream_state 
    52 { 
    53     STREAM_STOPPED, 
    54     STREAM_STARTED, 
     52 
     53/** 
     54 * Media channel. 
     55 */ 
     56struct pjmedia_channel 
     57{ 
     58    pjmedia_stream         *stream;         /**< Parent stream.             */ 
     59    pjmedia_dir             dir;            /**< Channel direction.         */ 
     60    unsigned                pt;             /**< Payload type.              */ 
     61    pj_bool_t               paused;         /**< Paused?.                   */ 
     62    pj_snd_stream_info      snd_info;       /**< Sound stream param.        */ 
     63    pj_snd_stream          *snd_stream;     /**< Sound stream.              */ 
     64    unsigned                in_pkt_size;    /**< Size of input buffer.      */ 
     65    void                   *in_pkt;         /**< Input buffer.              */ 
     66    unsigned                out_pkt_size;   /**< Size of output buffer.     */ 
     67    void                   *out_pkt;        /**< Output buffer.             */ 
     68    unsigned                pcm_buf_size;   /**< Size of PCM buffer.        */ 
     69    void                   *pcm_buf;        /**< PCM buffer.                */ 
     70    pj_rtp_session          rtp;            /**< RTP session.               */ 
    5571}; 
    5672 
    57 struct pj_media_stream_t 
    58 { 
    59     pj_media_dir_t          dir; 
    60     int                     pt; 
    61     int                     state; 
    62     pj_media_stream_stat    stat; 
    63     pj_media_stream_t      *peer; 
    64     pj_snd_stream_info      snd_info; 
    65     pj_snd_stream          *snd_stream; 
    66     pj_mutex_t             *mutex; 
    67     unsigned                in_pkt_size; 
    68     void                   *in_pkt; 
    69     unsigned                out_pkt_size; 
    70     void                   *out_pkt; 
    71     unsigned                pcm_buf_size; 
    72     void                   *pcm_buf; 
    73     //pj_fifobuf_t          fifobuf; 
    74     pj_codec_mgr           *codec_mgr; 
    75     pj_codec               *codec; 
    76     pj_rtp_session          rtp; 
    77     pj_rtcp_session        *rtcp; 
    78     pj_jitter_buffer       *jb; 
    79     pj_sock_t               rtp_sock; 
    80     pj_sock_t               rtcp_sock; 
    81     pj_sockaddr_in          dst_addr; 
    82     pj_thread_t            *transport_thread; 
    83     int                     thread_quit_flag; 
     73 
     74/** 
     75 * This structure describes media stream. 
     76 * A media stream is bidirectional media transmission between two endpoints. 
     77 * It consists of two channels, i.e. encoding and decoding channels. 
     78 * A media stream corresponds to a single "m=" line in a SDP session 
     79 * description. 
     80 */ 
     81struct pjmedia_stream 
     82{ 
     83    pjmedia_channel         *enc;           /**< Encoding channel.          */ 
     84    pjmedia_channel         *dec;           /**< Decoding channel.          */ 
     85 
     86    pjmedia_dir              dir;           /**< Stream direction.          */ 
     87    pjmedia_stream_stat      stat;          /**< Stream statistics.         */ 
     88 
     89    pjmedia_codec_mgr       *codec_mgr;     /**< Codec manager instance.    */ 
     90    pjmedia_codec           *codec;         /**< Codec instance being used. */ 
     91 
     92    pj_mutex_t              *jb_mutex; 
     93    pj_jitter_buffer         jb;            /**< Jitter buffer.             */ 
     94 
     95    pj_sock_t                rtp_sock;      /**< RTP socket.                */ 
     96    pj_sock_t                rtcp_sock;     /**< RTCP socket.               */ 
     97    pj_sockaddr_in           dst_addr;      /**< Destination RTP address.   */ 
     98 
     99    pj_rtcp_session          rtcp;          /**< RTCP for incoming RTP.     */ 
     100 
     101    pj_bool_t                quit_flag;     /**< To signal thread exit.     */ 
     102    pj_thread_t             *thread;        /**< Jitter buffer's thread.    */ 
    84103}; 
    85104 
    86105 
     106 
     107/* 
     108 * play_callback() 
     109 * 
     110 * This callback is called by sound device's player thread when it 
     111 * needs to feed the player with some frames. 
     112 */ 
    87113static pj_status_t play_callback(/* in */   void *user_data, 
    88114                                 /* in */   pj_uint32_t timestamp, 
     
    90116                                 /*inout*/  unsigned size) 
    91117{ 
    92     pj_media_stream_t *channel = user_data; 
     118    pjmedia_channel *channel = user_data; 
     119    pjmedia_stream *stream = channel->stream; 
    93120    struct jb_frame *jb_frame; 
    94121    void *p; 
    95122    pj_uint32_t extseq; 
    96123    pj_status_t status; 
    97     struct pj_audio_frame frame_in, frame_out; 
     124    struct pjmedia_frame frame_in, frame_out; 
    98125 
    99126    PJ_UNUSED_ARG(timestamp); 
    100127 
    101     /* Lock mutex */ 
    102     pj_mutex_lock (channel->mutex); 
    103  
    104     if (!channel->codec) { 
    105         pj_mutex_unlock (channel->mutex); 
     128    /* Do nothing if we're quitting. */ 
     129    if (stream->quit_flag) 
    106130        return -1; 
    107     } 
     131 
     132    /* Lock jitter buffer mutex */ 
     133    pj_mutex_lock( stream->jb_mutex ); 
    108134 
    109135    /* Get frame from jitter buffer. */ 
    110     status = pj_jb_get (channel->jb, &extseq, &p); 
     136    status = pj_jb_get(&stream->jb, &extseq, &p); 
     137 
     138    /* Unlock jitter buffer mutex. */ 
     139    pj_mutex_unlock( stream->jb_mutex ); 
     140 
    111141    jb_frame = p; 
    112     if (status != 0 || jb_frame == NULL) { 
     142    if (status != PJ_SUCCESS || jb_frame == NULL) { 
    113143        pj_memset(frame, 0, size); 
    114         pj_mutex_unlock(channel->mutex); 
    115144        return 0; 
    116145    } 
     
    119148    frame_in.buf = jb_frame->buf; 
    120149    frame_in.size = jb_frame->size; 
    121     frame_in.type = PJ_AUDIO_FRAME_AUDIO;  /* ignored */ 
     150    frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO;  /* ignored */ 
    122151    frame_out.buf = channel->pcm_buf; 
    123     status = channel->codec->op->decode (channel->codec, &frame_in, 
    124                                          channel->pcm_buf_size, &frame_out); 
     152    status = stream->codec->op->decode( stream->codec, &frame_in, 
     153                                        channel->pcm_buf_size, &frame_out); 
    125154    if (status != 0) { 
    126         PJ_LOG(3, (THISFILE, "decode() has return error status %d",  
    127                           status)); 
     155        TRACE_((THIS_FILE, "decode() has return error status %d", status)); 
    128156 
    129157        pj_memset(frame, 0, size); 
    130158        pj_fifobuf_free (&channel->fifobuf, jb_frame); 
    131         pj_mutex_unlock(channel->mutex); 
    132159        return 0; 
    133160    } 
     
    135162    /* Put in sound buffer. */ 
    136163    if (frame_out.size > size) { 
    137         PJ_LOG(3, (THISFILE, "Sound playout buffer truncated %d bytes",  
    138                           frame_out.size - size)); 
     164        TRACE_((THIS_FILE, "Sound playout buffer truncated %d bytes",  
     165                frame_out.size - size)); 
    139166        frame_out.size = size; 
    140167    } 
    141168 
    142169    pj_memcpy(frame, frame_out.buf, size); 
    143  
    144170    pj_fifobuf_free (&channel->fifobuf, jb_frame); 
    145     pj_mutex_unlock(channel->mutex); 
     171 
    146172    return 0; 
    147173} 
    148174 
     175 
     176/** 
     177 * rec_callback() 
     178 * 
     179 * This callback is called when the mic device has gathered 
     180 * enough audio samples. We will encode the audio samples and 
     181 * send it to remote. 
     182 */ 
    149183static pj_status_t rec_callback( /* in */ void *user_data, 
    150184                                 /* in */ pj_uint32_t timestamp, 
     
    152186                                 /* in */ unsigned size) 
    153187{ 
    154     pj_media_stream_t *channel = user_data; 
     188    pjmedia_channel *channel = user_data; 
     189    pjmedia_stream *stream = channel->stream; 
    155190    pj_status_t status = 0; 
    156     struct pj_audio_frame frame_in, frame_out; 
     191    struct pjmedia_frame frame_in, frame_out; 
    157192    int ts_len; 
    158193    void *rtphdr; 
    159194    int rtphdrlen; 
    160195    pj_ssize_t sent; 
    161 #if 0 
    162     static FILE *fhnd = NULL; 
    163 #endif 
     196 
    164197 
    165198    PJ_UNUSED_ARG(timestamp); 
    166199 
    167     /* Start locking channel mutex */ 
    168     pj_mutex_lock (channel->mutex); 
    169  
    170     if (!channel->codec) { 
    171         status = -1; 
    172         goto on_return; 
    173     } 
     200    /* Check if stream is quitting. */ 
     201    if (stream->quit_flag) 
     202        return -1; 
    174203 
    175204    /* Encode. */ 
    176     frame_in.type = PJ_MEDIA_TYPE_AUDIO; 
     205    frame_in.type = PJMEDIA_TYPE_AUDIO; 
    177206    frame_in.buf = (void*)frame; 
    178207    frame_in.size = size; 
    179208    frame_out.buf = ((char*)channel->out_pkt) + sizeof(pj_rtp_hdr); 
    180     status = channel->codec->op->encode (channel->codec, &frame_in,  
    181                                          channel->out_pkt_size - sizeof(pj_rtp_hdr),  
    182                                          &frame_out); 
     209    status = stream->codec->op->encode( stream->codec, &frame_in,  
     210                                        channel->out_pkt_size - sizeof(pj_rtp_hdr),  
     211                                        &frame_out); 
    183212    if (status != 0) { 
    184         PJ_LOG(3,(THISFILE, "Codec encode() has returned error status %d",  
    185                              status)); 
    186         goto on_return; 
     213        TRACE_((THIS_FILE, "Codec encode() has returned error status %d",  
     214                status)); 
     215        return status; 
    187216    } 
    188217 
    189218    /* Encapsulate. */ 
    190219    ts_len = size / (channel->snd_info.bits_per_sample / 8); 
    191     status = pj_rtp_encode_rtp (&channel->rtp, channel->pt, 0,  
     220    status = pj_rtp_encode_rtp( &channel->rtp,  
     221                                channel->pt, 0,  
    192222                                frame_out.size, ts_len,  
    193223                                (const void**)&rtphdr, &rtphdrlen); 
    194224    if (status != 0) { 
    195         PJ_LOG(3,(THISFILE, "RTP encode_rtp() has returned error status %d",  
    196                             status)); 
    197         goto on_return; 
     225        TRACE_((THIS_FILE, "RTP encode_rtp() has returned error status %d",  
     226                           status)); 
     227        return status; 
    198228    } 
    199229 
     
    201231        /* We don't support RTP with extended header yet. */ 
    202232        PJ_TODO(SUPPORT_SENDING_RTP_WITH_EXTENDED_HEADER); 
    203         PJ_LOG(3,(THISFILE, "Unsupported extended RTP header for transmission")); 
    204         goto on_return; 
     233        TRACE_((THIS_FILE, "Unsupported extended RTP header for transmission")); 
     234        return 0; 
    205235    } 
    206236 
     
    209239    /* Send. */ 
    210240    sent = frame_out.size+sizeof(pj_rtp_hdr); 
    211     status = pj_sock_sendto (channel->rtp_sock, channel->out_pkt, &sent, 0,  
    212                            &channel->dst_addr, sizeof(channel->dst_addr)); 
    213     if (status != PJ_SUCCESS) 
    214         goto on_return; 
     241    status = pj_sock_sendto(stream->rtp_sock, channel->out_pkt, &sent, 0,  
     242                            &stream->dst_addr, sizeof(stream->dst_addr)); 
     243    if (status != PJ_SUCCESS) 
     244        return status; 
    215245 
    216246    /* Update stat */ 
    217     channel->stat.pkt_tx++; 
    218     channel->stat.oct_tx += frame_out.size+sizeof(pj_rtp_hdr); 
    219  
    220 #if 0 
    221     if (fhnd == NULL) { 
    222         fhnd = fopen("RTP.DAT", "wb"); 
    223         if (fhnd) { 
    224             fwrite (channel->out_pkt, frame_out.size+sizeof(pj_rtp_hdr), 1, fhnd); 
    225             fclose(fhnd); 
    226         } 
    227     } 
    228 #endif 
    229  
    230 on_return: 
    231     pj_mutex_unlock (channel->mutex); 
    232     return status; 
    233 } 
    234  
    235  
    236 static int PJ_THREAD_FUNC stream_decoder_transport_thread (void*arg) 
    237 { 
    238     pj_media_stream_t *channel = arg; 
    239  
    240     while (!channel->thread_quit_flag) { 
     247    stream->stat.enc.pkt++; 
     248    stream->stat.enc.bytes += frame_out.size+sizeof(pj_rtp_hdr); 
     249 
     250    return 0; 
     251} 
     252 
     253 
     254/* 
     255 * This thread will poll the socket for incoming packets, and put 
     256 * the packets to jitter buffer. 
     257 */ 
     258static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg) 
     259{ 
     260    pjmedia_stream *stream = arg; 
     261    pjmedia_channel *channel = stream->dec; 
     262 
     263    while (!stream->quit_flag) { 
    241264        pj_ssize_t len, size; 
    242265        const pj_rtp_hdr *hdr; 
     
    251274 
    252275        PJ_FD_ZERO (&fds); 
    253         PJ_FD_SET (channel->rtp_sock, &fds); 
     276        PJ_FD_SET (stream->rtp_sock, &fds); 
    254277        timeout.sec = 0; 
    255         timeout.msec = 100; 
     278        timeout.msec = 1; 
    256279 
    257280        /* Wait with timeout. */ 
    258         status = pj_sock_select(channel->rtp_sock, &fds, NULL, NULL, &timeout); 
     281        status = pj_sock_select(stream->rtp_sock, &fds, NULL, NULL, &timeout); 
    259282        if (status != 1) 
    260283            continue; 
     
    262285        /* Get packet from socket. */ 
    263286        len = channel->in_pkt_size; 
    264         status = pj_sock_recv (channel->rtp_sock, channel->in_pkt, &len, 0); 
     287        status = pj_sock_recv(stream->rtp_sock, channel->in_pkt, &len, 0); 
    265288        if (len < 1 || status != PJ_SUCCESS) { 
    266289            if (pj_get_netos_error() == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { 
    267                 /* On Win2K SP2 (or above) and WinXP, recv() will get WSAECONNRESET 
    268                    when the sending side receives ICMP port unreachable. 
     290                /* On Win2K SP2 (or above) and WinXP, recv() will get  
     291                 * WSAECONNRESET when the sending side receives ICMP port  
     292                 * unreachable. 
    269293                 */ 
    270294                continue; 
    271295            } 
    272             //pj_perror(THISFILE, "Error receiving packet from socket (len=%d)", len); 
    273296            pj_thread_sleep(1); 
    274297            continue; 
    275298        } 
    276299 
    277         if (channel->state != STREAM_STARTED) 
     300        if (channel->paused) 
    278301            continue; 
    279302 
    280         if (channel->thread_quit_flag) 
    281             break; 
    282  
    283         /* Start locking the channel. */ 
    284         pj_mutex_lock (channel->mutex); 
    285  
    286303        /* Update RTP and RTCP session. */ 
    287         status = pj_rtp_decode_rtp (&channel->rtp, channel->in_pkt, len, &hdr, &payload, &payloadlen); 
    288         if (status != 0) { 
    289             pj_mutex_unlock (channel->mutex); 
    290             PJ_LOG(4,(THISFILE, "RTP decode_rtp() has returned error status %d", status)); 
     304        status = pj_rtp_decode_rtp(&channel->rtp, channel->in_pkt, len,  
     305                                   &hdr, &payload, &payloadlen); 
     306        if (status != PJ_SUCCESS) { 
     307            TRACE_((THIS_FILE, "RTP decode_rtp() has returned error status %d", 
     308                    status)); 
    291309            continue; 
    292310        } 
    293         status = pj_rtp_session_update (&channel->rtp, hdr); 
    294         if (status != 0 && status != PJ_RTP_ERR_SESSION_PROBATION && status != PJ_RTP_ERR_SESSION_RESTARTED) { 
    295             pj_mutex_unlock (channel->mutex); 
    296             PJ_LOG(4,(THISFILE, "RTP session_update() has returned error status %d", status)); 
     311 
     312        status = pj_rtp_session_update(&channel->rtp, hdr); 
     313        if (status != 0 &&  
     314            status != PJMEDIA_RTP_ERR_SESSION_PROBATION &&  
     315            status != PJMEDIA_RTP_ERR_SESSION_RESTARTED)  
     316        { 
     317            TRACE_((THIS_FILE,  
     318                    "RTP session_update() has returned error status %d",  
     319                    status)); 
    297320            continue; 
    298321        } 
    299         pj_rtcp_rx_rtp (channel->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts)); 
     322        pj_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts)); 
    300323 
    301324        /* Update stat */ 
    302         channel->stat.pkt_rx++; 
    303         channel->stat.oct_rx += len; 
     325        stream->stat.dec.pkt++; 
     326        stream->stat.dec.bytes += len; 
    304327 
    305328        /* Copy to FIFO buffer. */ 
     
    307330        jb_frame = pj_fifobuf_alloc (&channel->fifobuf, size); 
    308331        if (jb_frame == NULL) { 
    309             pj_mutex_unlock (channel->mutex); 
    310             PJ_LOG(4,(THISFILE, "Unable to allocate %d bytes FIFO buffer", size)); 
     332            TRACE_((THIS_FILE, "Unable to allocate %d bytes FIFO buffer",  
     333                    size)); 
    311334            continue; 
    312335        } 
     
    318341 
    319342        /* Put to jitter buffer. */ 
    320         status = pj_jb_put (channel->jb, pj_ntohs(hdr->seq), jb_frame); 
     343        pj_mutex_lock( stream->jb_mutex ); 
     344        status = pj_jb_put(&stream->jb, pj_ntohs(hdr->seq), jb_frame); 
     345        pj_mutex_unlock( stream->jb_mutex ); 
     346 
    321347        if (status != 0) { 
    322348            pj_fifobuf_unalloc (&channel->fifobuf, jb_frame); 
    323             pj_mutex_unlock (channel->mutex); 
    324             PJ_LOG(4,(THISFILE, "Jitter buffer put() has returned error status %d", status)); 
     349             
     350            TRACE_((THIS_FILE,  
     351                    "Jitter buffer put() has returned error status %d",  
     352                    status)); 
    325353            continue; 
    326354        } 
    327  
    328         pj_mutex_unlock (channel->mutex); 
    329355    } 
    330356 
     
    332358} 
    333359 
    334 static void init_snd_param_from_codec_attr (pj_snd_stream_info *param, 
    335                                             const pj_codec_attr *attr) 
    336 { 
    337     param->bits_per_sample = attr->pcm_bits_per_sample; 
    338     param->bytes_per_frame = 2; 
    339     param->frames_per_packet = attr->sample_rate * attr->ptime / 1000; 
    340     param->samples_per_frame = 1; 
    341     param->samples_per_sec = attr->sample_rate; 
    342 } 
    343  
    344 static pj_media_stream_t *create_channel ( pj_pool_t *pool, 
    345                                            pj_media_dir_t dir, 
    346                                            pj_media_stream_t *peer, 
    347                                            pj_codec_id *codec_id, 
    348                                            pj_media_stream_create_param *param) 
    349 { 
    350     pj_media_stream_t *channel; 
    351     pj_codec_attr codec_attr; 
    352     void *ptr; 
    353     unsigned size; 
     360 
     361/* 
     362 * Create sound stream parameter from codec attributes. 
     363 */ 
     364static void init_snd_param( pj_snd_stream_info *snd_param, 
     365                            const pjmedia_codec_param *codec_param) 
     366{ 
     367    pj_memset(snd_param, 0, sizeof(*snd_param)); 
     368 
     369    snd_param->bits_per_sample   = codec_param->pcm_bits_per_sample; 
     370    snd_param->bytes_per_frame   = 2; 
     371    snd_param->frames_per_packet = codec_param->sample_rate *  
     372                                   codec_param->ptime /  
     373                                   1000; 
     374    snd_param->samples_per_frame = 1; 
     375    snd_param->samples_per_sec   = codec_param->sample_rate; 
     376} 
     377 
     378 
     379/* 
     380 * Create media channel. 
     381 */ 
     382static pj_status_t create_channel( pj_pool_t *pool, 
     383                                   pjmedia_stream *stream, 
     384                                   pjmedia_dir dir, 
     385                                   const pjmedia_stream_info *param, 
     386                                   const pjmedia_codec_param *codec_param, 
     387                                   pjmedia_channel **p_channel) 
     388{ 
     389    pjmedia_channel *channel; 
    354390    pj_status_t status; 
    355391     
    356392    /* Allocate memory for channel descriptor */ 
    357     size = sizeof(pj_media_stream_t); 
    358     channel = pj_pool_calloc(pool, 1, size); 
    359     if (!channel) { 
    360         PJ_LOG(1,(THISFILE, "Unable to allocate %u bytes channel descriptor",  
    361                          size)); 
    362         return NULL; 
    363     } 
    364  
     393 
     394    channel = pj_pool_zalloc(pool, sizeof(pjmedia_channel)); 
     395    PJ_ASSERT_RETURN(channel != NULL, PJ_ENOMEM); 
     396 
     397    /* Init channel info. */ 
     398 
     399    channel->stream = stream; 
    365400    channel->dir = dir; 
    366     channel->pt = codec_id->pt; 
    367     channel->peer = peer; 
    368     channel->codec_mgr = pj_med_mgr_get_codec_mgr (param->mediamgr); 
    369     channel->rtp_sock = param->rtp_sock; 
    370     channel->rtcp_sock = param->rtcp_sock; 
    371     channel->dst_addr = *param->remote_addr; 
    372     channel->state = STREAM_STOPPED; 
    373  
    374     /* Create mutex for the channel. */ 
    375     status = pj_mutex_create_simple(pool, NULL, &channel->mutex); 
    376     if (status != PJ_SUCCESS) 
    377         goto err_cleanup; 
    378  
    379     /* Create and initialize codec, only if peer is not present. 
    380        We only use one codec instance for both encoder and decoder. 
    381      */ 
    382     if (peer && peer->codec) { 
    383         channel->codec = peer->codec; 
    384         status = channel->codec->factory->op->default_attr(channel->codec->factory, codec_id,  
    385                                                            &codec_attr); 
    386         if (status != 0) { 
    387             goto err_cleanup; 
    388         } 
    389  
    390     } else { 
    391         channel->codec = pj_codec_mgr_alloc_codec(channel->codec_mgr, codec_id); 
    392         if (channel->codec == NULL) { 
    393             goto err_cleanup; 
    394         } 
    395  
    396         status = channel->codec->factory->op->default_attr(channel->codec->factory, codec_id,  
    397                                                            &codec_attr); 
    398         if (status != 0) { 
    399             goto err_cleanup; 
    400         } 
    401  
    402         codec_attr.pt = codec_id->pt; 
    403         status = channel->codec->op->open(channel->codec, &codec_attr); 
    404         if (status != 0) { 
    405             goto err_cleanup; 
    406         } 
    407     } 
     401    channel->paused = 1; 
     402    channel->pt = param->fmt.pt; 
    408403 
    409404    /* Allocate buffer for incoming packet. */ 
    410     channel->in_pkt_size = PJ_MAX_MTU; 
    411     channel->in_pkt = pj_pool_alloc(pool, channel->in_pkt_size); 
    412     if (!channel->in_pkt) { 
    413         PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes incoming packet buffer",  
    414                           channel->in_pkt_size)); 
    415         goto err_cleanup; 
    416     } 
    417  
     405 
     406    channel->in_pkt_size = PJMEDIA_MAX_MTU; 
     407    channel->in_pkt = pj_pool_alloc( pool, channel->in_pkt_size ); 
     408    PJ_ASSERT_RETURN(channel->in_pkt != NULL, PJ_ENOMEM); 
     409 
     410     
    418411    /* Allocate buffer for outgoing packet. */ 
     412 
    419413    channel->out_pkt_size = sizeof(pj_rtp_hdr) +  
    420                             codec_attr.avg_bps / 8 * PJ_MAX_FRAME_DURATION_MS / 1000; 
    421     if (channel->out_pkt_size > PJ_MAX_MTU) 
    422         channel->out_pkt_size = PJ_MAX_MTU; 
     414                            codec_param->avg_bps/8 *  
     415                            PJMEDIA_MAX_FRAME_DURATION_MS /  
     416                            1000; 
     417 
     418    if (channel->out_pkt_size > PJMEDIA_MAX_MTU) 
     419        channel->out_pkt_size = PJMEDIA_MAX_MTU; 
     420 
    423421    channel->out_pkt = pj_pool_alloc(pool, channel->out_pkt_size); 
    424     if (!channel->out_pkt) { 
    425         PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes encoding buffer",  
    426                           channel->out_pkt_size)); 
    427         goto err_cleanup; 
    428     } 
    429  
    430     /* Allocate buffer for decoding to PCM */ 
    431     channel->pcm_buf_size = codec_attr.sample_rate *  
    432                             codec_attr.pcm_bits_per_sample / 8 * 
    433                             PJ_MAX_FRAME_DURATION_MS / 1000; 
     422    PJ_ASSERT_RETURN(channel->out_pkt != NULL, PJ_ENOMEM); 
     423 
     424 
     425    /* Allocate buffer for decoding to PCM: */ 
     426 
     427    channel->pcm_buf_size = codec_param->sample_rate *  
     428                            codec_param->pcm_bits_per_sample / 8 * 
     429                            PJMEDIA_MAX_FRAME_DURATION_MS / 1000; 
    434430    channel->pcm_buf = pj_pool_alloc (pool, channel->pcm_buf_size); 
    435     if (!channel->pcm_buf) { 
    436         PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes PCM buffer",  
    437                           channel->pcm_buf_size)); 
    438         goto err_cleanup; 
    439     } 
    440  
    441     /* Allocate buffer for frames put in jitter buffer. */ 
    442     size = codec_attr.avg_bps / 8 * PJ_MAX_BUFFER_SIZE_MS / 1000; 
    443     ptr = pj_pool_alloc(pool, size); 
    444     if (!ptr) { 
    445         PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes jitter buffer",  
    446                           channel->pcm_buf_size)); 
    447         goto err_cleanup; 
    448     } 
    449     //pj_fifobuf_init (&channel->fifobuf, ptr, size); 
     431    PJ_ASSERT_RETURN(channel->pcm_buf != NULL, PJ_ENOMEM); 
     432 
     433 
     434    /* Create RTP and RTCP sessions: */ 
     435 
     436    status = pj_rtp_session_init(&channel->rtp, param->fmt.pt,  
     437                                 param->ssrc); 
     438    if (status != PJ_SUCCESS) 
     439        return status; 
    450440 
    451441    /* Create and initialize sound device */ 
    452     init_snd_param_from_codec_attr (&channel->snd_info, &codec_attr); 
    453  
    454     if (dir == PJ_MEDIA_DIR_ENCODING) 
     442 
     443    init_snd_param(&channel->snd_info, codec_param); 
     444 
     445    if (dir == PJMEDIA_DIR_ENCODING) 
    455446        channel->snd_stream = pj_snd_open_recorder(-1, &channel->snd_info,  
    456447                                                   &rec_callback, channel); 
     
    460451 
    461452    if (!channel->snd_stream) 
    462         goto err_cleanup; 
    463  
    464     /* Create RTP and RTCP sessions. */ 
    465     if (pj_rtp_session_init(&channel->rtp, codec_id->pt, param->ssrc) != 0) { 
    466         PJ_LOG(1, (THISFILE, "RTP session initialization error")); 
    467         goto err_cleanup; 
    468     } 
    469  
    470     /* For decoder, create RTCP session, jitter buffer, and transport thread. */ 
    471     if (dir == PJ_MEDIA_DIR_DECODING) { 
    472         channel->rtcp = pj_pool_calloc(pool, 1, sizeof(pj_rtcp_session)); 
    473         if (!channel->rtcp) { 
    474             PJ_LOG(1, (THISFILE, "Unable to allocate RTCP session")); 
    475             goto err_cleanup; 
    476         } 
    477  
    478         pj_rtcp_init(channel->rtcp, param->ssrc); 
    479  
    480         channel->jb = pj_pool_calloc(pool, 1, sizeof(pj_jitter_buffer)); 
    481         if (!channel->jb) { 
    482             PJ_LOG(1, (THISFILE, "Unable to allocate jitter buffer descriptor")); 
    483             goto err_cleanup; 
    484         } 
    485         if (pj_jb_init(channel->jb, pool, param->jb_min, param->jb_max, param->jb_maxcnt)) { 
    486             PJ_LOG(1, (THISFILE, "Unable to allocate jitter buffer")); 
    487             goto err_cleanup; 
    488         } 
    489  
    490         status = pj_thread_create(pool, "decode",  
    491                                   &stream_decoder_transport_thread, channel, 
    492                                   0, 0, &channel->transport_thread); 
    493         if (status != PJ_SUCCESS) { 
    494             //pj_perror(THISFILE, "Unable to create transport thread"); 
    495             goto err_cleanup; 
    496         } 
    497     } 
     453        return -1; 
     454 
    498455 
    499456    /* Done. */ 
    500     return channel; 
     457    *p_channel = channel; 
     458    return PJ_SUCCESS; 
     459} 
     460 
     461 
     462/* 
     463 * Create media stream. 
     464 */ 
     465PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, 
     466                                           pj_pool_t *pool, 
     467                                           const pjmedia_stream_info *info, 
     468                                           pjmedia_stream **p_stream) 
     469 
     470{ 
     471    pjmedia_stream *stream; 
     472    pjmedia_codec_param codec_param; 
     473    pj_status_t status; 
     474 
     475    PJ_ASSERT_RETURN(pool && info && p_stream, PJ_EINVAL); 
     476 
     477 
     478    /* Allocate the media stream: */ 
     479 
     480    stream = pj_pool_zalloc(pool, sizeof(pjmedia_stream)); 
     481    PJ_ASSERT_RETURN(stream != NULL, PJ_ENOMEM); 
     482 
     483 
     484    /* Init stream: */ 
     485    
     486    stream->dir = info->dir; 
     487    stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); 
     488 
     489    /* Create mutex to protect jitter buffer: */ 
     490 
     491    status = pj_mutex_create_simple(pool, NULL, &stream->jb_mutex); 
     492    if (status != PJ_SUCCESS) 
     493        goto err_cleanup; 
     494 
     495 
     496    /* Create and initialize codec: */ 
     497 
     498    status = pjmedia_codec_mgr_alloc_codec( stream->codec_mgr, 
     499                                            &info->fmt, &stream->codec); 
     500    if (status != PJ_SUCCESS) 
     501        goto err_cleanup; 
     502 
     503 
     504    /* Get default codec param: */ 
     505 
     506    status = stream->codec->op->default_attr(stream->codec, &codec_param); 
     507    if (status != PJ_SUCCESS) 
     508        goto err_cleanup; 
     509 
     510 
     511    /* Open the codec: */ 
     512 
     513    status = stream->codec->op->open(stream->codec, &codec_param); 
     514    if (status != PJ_SUCCESS) 
     515        goto err_cleanup; 
     516 
     517 
     518    /* Init RTCP session: */ 
     519 
     520    pj_rtcp_init(&stream->rtcp, info->ssrc); 
     521 
     522 
     523    /* Init jitter buffer: */ 
     524 
     525    status = pj_jb_init(&stream->jb, pool,  
     526                        info->jb_min, info->jb_max, info->jb_maxcnt); 
     527    if (status != PJ_SUCCESS) 
     528        goto err_cleanup; 
     529 
     530 
     531    /*  Create jitter buffer thread: */ 
     532 
     533    status = pj_thread_create(pool, "decode",  
     534                              &jitter_buffer_thread, stream, 
     535                              0, 0, &stream->thread); 
     536    if (status != PJ_SUCCESS) 
     537        goto err_cleanup; 
     538 
     539 
     540    /* Create decoder channel: */ 
     541 
     542    status = create_channel( pool, stream, PJMEDIA_DIR_DECODING, info, 
     543                             &codec_param, &stream->dec); 
     544    if (status != PJ_SUCCESS) 
     545        goto err_cleanup; 
     546 
     547 
     548    /* Create encoder channel: */ 
     549 
     550    status = create_channel( pool, stream, PJMEDIA_DIR_ENCODING, info, 
     551                             &codec_param, &stream->enc); 
     552    if (status != PJ_SUCCESS) 
     553        goto err_cleanup; 
     554 
     555 
     556    /* Success! */ 
     557    *p_stream = stream; 
     558    return PJ_SUCCESS; 
     559 
    501560 
    502561err_cleanup: 
    503     pj_media_stream_destroy(channel); 
    504     return NULL; 
    505 } 
    506  
    507  
    508 PJ_DEF(pj_status_t) pj_media_stream_create (pj_pool_t *pool, 
    509                                             pj_media_stream_t **enc_stream, 
    510                                             pj_media_stream_t **dec_stream, 
    511                                             pj_media_stream_create_param *param) 
    512 { 
    513     *dec_stream = *enc_stream = NULL; 
    514  
    515     if (param->dir & PJ_MEDIA_DIR_DECODING) { 
    516         *dec_stream =  
    517             create_channel(pool, PJ_MEDIA_DIR_DECODING, NULL, param->codec_id, param); 
    518         if (!*dec_stream) 
    519             return -1; 
    520     } 
    521  
    522     if (param->dir & PJ_MEDIA_DIR_ENCODING) { 
    523         *enc_stream =  
    524             create_channel(pool, PJ_MEDIA_DIR_ENCODING, *dec_stream, param->codec_id, param); 
    525         if (!*enc_stream) { 
    526             if (*dec_stream) { 
    527                 pj_media_stream_destroy(*dec_stream); 
    528                 *dec_stream = NULL; 
    529             } 
    530             return -1; 
    531         } 
    532  
    533         if (*dec_stream) { 
    534             (*dec_stream)->peer = *enc_stream; 
    535         } 
    536     } 
    537  
    538     return 0; 
    539 } 
    540  
    541 PJ_DEF(pj_status_t) pj_media_stream_start (pj_media_stream_t *channel) 
    542 { 
    543     pj_status_t status; 
    544  
    545     status = pj_snd_stream_start(channel->snd_stream); 
    546  
    547     if (status == 0) 
    548         channel->state = STREAM_STARTED; 
     562    pjmedia_stream_destroy(stream); 
    549563    return status; 
    550564} 
    551565 
    552 PJ_DEF(pj_status_t)  pj_media_stream_get_stat (const pj_media_stream_t *stream, 
    553                                                pj_media_stream_stat *stat) 
    554 { 
    555     if (stream->dir == PJ_MEDIA_DIR_ENCODING) { 
    556         pj_memcpy (stat, &stream->stat, sizeof(*stat)); 
    557     } else { 
    558         pj_rtcp_pkt *rtcp_pkt; 
    559         int len; 
    560  
    561         pj_memset (stat, 0, sizeof(*stat)); 
    562         pj_assert (stream->rtcp != 0); 
    563         pj_rtcp_build_rtcp (stream->rtcp, &rtcp_pkt, &len); 
    564  
    565         stat->pkt_rx = stream->stat.pkt_rx; 
    566         stat->oct_rx = stream->stat.oct_rx; 
    567  
    568         PJ_TODO(SUPPORT_JITTER_CALCULATION_FOR_NON_8KHZ_SAMPLE_RATE) 
    569         stat->jitter = pj_ntohl(rtcp_pkt->rr.jitter) / 8; 
    570         stat->pkt_lost = (rtcp_pkt->rr.total_lost_2 << 16) + 
    571                          (rtcp_pkt->rr.total_lost_1 << 8) + 
    572                          rtcp_pkt->rr.total_lost_0; 
    573     } 
    574     return 0; 
    575 } 
    576  
    577 PJ_DEF(pj_status_t) pj_media_stream_pause (pj_media_stream_t *channel) 
    578 { 
    579     PJ_UNUSED_ARG(channel); 
    580     return -1; 
    581 } 
    582  
    583 PJ_DEF(pj_status_t) pj_media_stream_resume (pj_media_stream_t *channel) 
    584 { 
    585     PJ_UNUSED_ARG(channel); 
    586     return -1; 
    587 } 
    588  
    589 PJ_DEF(pj_status_t) pj_media_stream_destroy (pj_media_stream_t *channel) 
    590 { 
    591     channel->thread_quit_flag = 1; 
    592  
    593     pj_mutex_lock (channel->mutex); 
    594     if (channel->peer) 
    595         pj_mutex_lock (channel->peer->mutex); 
    596  
    597     if (channel->jb) { 
    598         /* No need to deinitialize jitter buffer. */ 
    599     } 
    600     if (channel->transport_thread) { 
    601         pj_thread_join(channel->transport_thread); 
    602         pj_thread_destroy(channel->transport_thread); 
    603         channel->transport_thread = NULL; 
    604     } 
    605     if (channel->snd_stream != NULL) { 
    606         pj_mutex_unlock (channel->mutex); 
    607         pj_snd_stream_stop(channel->snd_stream); 
    608         pj_mutex_lock (channel->mutex); 
    609         pj_snd_stream_close(channel->snd_stream); 
    610         channel->snd_stream = NULL; 
    611     } 
    612     if (channel->codec) { 
    613         channel->codec->op->close(channel->codec); 
    614         pj_codec_mgr_dealloc_codec(channel->codec_mgr, channel->codec); 
    615         channel->codec = NULL; 
    616     } 
    617     if (channel->peer) { 
    618         pj_media_stream_t *peer = channel->peer; 
    619         peer->peer = NULL; 
    620         peer->codec = NULL; 
    621         peer->thread_quit_flag = 1; 
    622         if (peer->transport_thread) { 
    623             pj_mutex_unlock (peer->mutex); 
    624             pj_thread_join(peer->transport_thread); 
    625             pj_mutex_lock (peer->mutex); 
    626             pj_thread_destroy(peer->transport_thread); 
    627             peer->transport_thread = NULL; 
    628         } 
    629         if (peer->snd_stream) { 
    630             pj_mutex_unlock (peer->mutex); 
    631             pj_snd_stream_stop(peer->snd_stream); 
    632             pj_mutex_lock (peer->mutex); 
    633             pj_snd_stream_close(peer->snd_stream); 
    634             peer->snd_stream = NULL; 
    635         } 
    636     } 
    637  
    638     channel->state = STREAM_STOPPED; 
    639  
    640     if (channel->peer) 
    641         pj_mutex_unlock (channel->peer->mutex); 
    642     pj_mutex_unlock(channel->mutex); 
    643     pj_mutex_destroy(channel->mutex); 
    644  
    645     return 0; 
    646 } 
    647  
     566 
     567/* 
     568 * Destroy stream. 
     569 */ 
     570PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream ) 
     571{ 
     572 
     573    PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL); 
     574 
     575    /* Signal threads to quit. */ 
     576 
     577    stream->quit_flag = 1; 
     578 
     579 
     580    /* Close encoding sound stream. */ 
     581     
     582    if (stream->enc && stream->enc->snd_stream) { 
     583 
     584        pj_snd_stream_stop(stream->enc->snd_stream); 
     585        pj_snd_stream_close(stream->enc->snd_stream); 
     586        stream->enc->snd_stream = NULL; 
     587 
     588    } 
     589 
     590    /* Close decoding sound stream. */ 
     591 
     592    if (stream->dec && stream->dec->snd_stream) { 
     593 
     594        pj_snd_stream_stop(stream->dec->snd_stream); 
     595        pj_snd_stream_close(stream->dec->snd_stream); 
     596        stream->dec->snd_stream = NULL; 
     597 
     598    } 
     599 
     600    /* Wait for jitter buffer thread to quit: */ 
     601 
     602    if (stream->thread) { 
     603        pj_thread_join(stream->thread); 
     604        pj_thread_destroy(stream->thread); 
     605        stream->thread = NULL; 
     606    } 
     607 
     608    /* Free codec. */ 
     609 
     610    if (stream->codec) { 
     611        stream->codec->op->close(stream->codec); 
     612        pjmedia_codec_mgr_dealloc_codec(stream->codec_mgr, stream->codec); 
     613        stream->codec = NULL; 
     614    } 
     615 
     616    /* Free mutex */ 
     617     
     618    if (stream->jb_mutex) { 
     619        pj_mutex_destroy(stream->jb_mutex); 
     620        stream->jb_mutex = NULL; 
     621    } 
     622 
     623    return PJ_SUCCESS; 
     624} 
     625 
     626 
     627 
     628/* 
     629 * Start stream. 
     630 */ 
     631PJ_DEF(pj_status_t) pjmedia_stream_start(pjmedia_stream *stream) 
     632{ 
     633 
     634    PJ_ASSERT_RETURN(stream && stream->enc && stream->dec, PJ_EINVALIDOP); 
     635 
     636    if (stream->enc && (stream->dir & PJMEDIA_DIR_ENCODING)) { 
     637        stream->enc->paused = 0; 
     638        pj_snd_stream_start(stream->enc->snd_stream); 
     639    } 
     640 
     641    if (stream->dec && (stream->dir & PJMEDIA_DIR_DECODING)) { 
     642        stream->dec->paused = 0; 
     643        pj_snd_stream_start(stream->dec->snd_stream); 
     644    } 
     645 
     646    return PJ_SUCCESS; 
     647} 
     648 
     649 
     650/* 
     651 * Get stream statistics. 
     652 */ 
     653PJ_DEF(pj_status_t) pjmedia_stream_get_stat( const pjmedia_stream *stream, 
     654                                             pjmedia_stream_stat *stat) 
     655{ 
     656    PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL); 
     657 
     658    pj_memcpy(stat, &stream->stat, sizeof(pjmedia_stream_stat)); 
     659 
     660    return PJ_SUCCESS; 
     661} 
     662 
     663 
     664/* 
     665 * Pause stream. 
     666 */ 
     667PJ_DEF(pj_status_t) pjmedia_stream_pause( pjmedia_stream *stream, 
     668                                          pjmedia_dir dir) 
     669{ 
     670    PJ_ASSERT_RETURN(stream, PJ_EINVAL); 
     671 
     672    if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) 
     673        stream->enc->paused = 1; 
     674 
     675    if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) 
     676        stream->dec->paused = 1; 
     677 
     678    return PJ_SUCCESS; 
     679} 
     680 
     681 
     682/* 
     683 * Resume stream 
     684 */ 
     685PJ_DEF(pj_status_t) pjmedia_stream_resume( pjmedia_stream *stream, 
     686                                           pjmedia_dir dir) 
     687{ 
     688    PJ_ASSERT_RETURN(stream, PJ_EINVAL); 
     689 
     690    if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) 
     691        stream->enc->paused = 1; 
     692 
     693    if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) 
     694        stream->dec->paused = 1; 
     695 
     696    return PJ_SUCCESS; 
     697} 
     698 
Note: See TracChangeset for help on using the changeset viewer.