Changeset 159 for pjproject/trunk/pjmedia/src/pjmedia/stream.c
- Timestamp:
- Feb 8, 2006 10:43:39 PM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjmedia/src/pjmedia/stream.c
r121 r159 32 32 33 33 34 #define THISFILE "stream.c" 35 #define ERRLEVEL 1 36 37 #define PJ_MAX_FRAME_DURATION_MS 200 38 #define PJ_MAX_BUFFER_SIZE_MS 2000 39 #define PJ_MAX_MTU 1500 34 #define THIS_FILE "stream.c" 35 #define ERRLEVEL 1 36 #define TRACE_(expr) PJ_LOG(3,expr) 37 38 #define PJMEDIA_MAX_FRAME_DURATION_MS 200 39 #define PJMEDIA_MAX_BUFFER_SIZE_MS 2000 40 #define PJMEDIA_MAX_MTU 1500 40 41 41 42 struct jb_frame … … 49 50 #define pj_fifobuf_free(fifo, buf) free(buf) 50 51 51 enum stream_state 52 { 53 STREAM_STOPPED, 54 STREAM_STARTED, 52 53 /** 54 * Media channel. 55 */ 56 struct pjmedia_channel 57 { 58 pjmedia_stream *stream; /**< Parent stream. */ 59 pjmedia_dir dir; /**< Channel direction. */ 60 unsigned pt; /**< Payload type. */ 61 pj_bool_t paused; /**< Paused?. */ 62 pj_snd_stream_info snd_info; /**< Sound stream param. */ 63 pj_snd_stream *snd_stream; /**< Sound stream. */ 64 unsigned in_pkt_size; /**< Size of input buffer. */ 65 void *in_pkt; /**< Input buffer. */ 66 unsigned out_pkt_size; /**< Size of output buffer. */ 67 void *out_pkt; /**< Output buffer. */ 68 unsigned pcm_buf_size; /**< Size of PCM buffer. */ 69 void *pcm_buf; /**< PCM buffer. */ 70 pj_rtp_session rtp; /**< RTP session. */ 55 71 }; 56 72 57 struct pj_media_stream_t 58 { 59 pj_media_dir_t dir; 60 int pt; 61 int state; 62 pj_media_stream_stat stat; 63 pj_media_stream_t *peer; 64 pj_snd_stream_info snd_info; 65 pj_snd_stream *snd_stream; 66 pj_mutex_t *mutex; 67 unsigned in_pkt_size; 68 void *in_pkt; 69 unsigned out_pkt_size; 70 void *out_pkt; 71 unsigned pcm_buf_size; 72 void *pcm_buf; 73 //pj_fifobuf_t fifobuf; 74 pj_codec_mgr *codec_mgr; 75 pj_codec *codec; 76 pj_rtp_session rtp; 77 pj_rtcp_session *rtcp; 78 pj_jitter_buffer *jb; 79 pj_sock_t rtp_sock; 80 pj_sock_t rtcp_sock; 81 pj_sockaddr_in dst_addr; 82 pj_thread_t *transport_thread; 83 int thread_quit_flag; 73 74 /** 75 * This structure describes media stream. 76 * A media stream is bidirectional media transmission between two endpoints. 77 * It consists of two channels, i.e. encoding and decoding channels. 78 * A media stream corresponds to a single "m=" line in a SDP session 79 * description. 80 */ 81 struct pjmedia_stream 82 { 83 pjmedia_channel *enc; /**< Encoding channel. */ 84 pjmedia_channel *dec; /**< Decoding channel. */ 85 86 pjmedia_dir dir; /**< Stream direction. */ 87 pjmedia_stream_stat stat; /**< Stream statistics. */ 88 89 pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */ 90 pjmedia_codec *codec; /**< Codec instance being used. */ 91 92 pj_mutex_t *jb_mutex; 93 pj_jitter_buffer jb; /**< Jitter buffer. */ 94 95 pj_sock_t rtp_sock; /**< RTP socket. */ 96 pj_sock_t rtcp_sock; /**< RTCP socket. */ 97 pj_sockaddr_in dst_addr; /**< Destination RTP address. */ 98 99 pj_rtcp_session rtcp; /**< RTCP for incoming RTP. */ 100 101 pj_bool_t quit_flag; /**< To signal thread exit. */ 102 pj_thread_t *thread; /**< Jitter buffer's thread. */ 84 103 }; 85 104 86 105 106 107 /* 108 * play_callback() 109 * 110 * This callback is called by sound device's player thread when it 111 * needs to feed the player with some frames. 112 */ 87 113 static pj_status_t play_callback(/* in */ void *user_data, 88 114 /* in */ pj_uint32_t timestamp, … … 90 116 /*inout*/ unsigned size) 91 117 { 92 pj_media_stream_t *channel = user_data; 118 pjmedia_channel *channel = user_data; 119 pjmedia_stream *stream = channel->stream; 93 120 struct jb_frame *jb_frame; 94 121 void *p; 95 122 pj_uint32_t extseq; 96 123 pj_status_t status; 97 struct pj _audio_frame frame_in, frame_out;124 struct pjmedia_frame frame_in, frame_out; 98 125 99 126 PJ_UNUSED_ARG(timestamp); 100 127 101 /* Lock mutex */ 102 pj_mutex_lock (channel->mutex); 103 104 if (!channel->codec) { 105 pj_mutex_unlock (channel->mutex); 128 /* Do nothing if we're quitting. */ 129 if (stream->quit_flag) 106 130 return -1; 107 } 131 132 /* Lock jitter buffer mutex */ 133 pj_mutex_lock( stream->jb_mutex ); 108 134 109 135 /* Get frame from jitter buffer. */ 110 status = pj_jb_get (channel->jb, &extseq, &p); 136 status = pj_jb_get(&stream->jb, &extseq, &p); 137 138 /* Unlock jitter buffer mutex. */ 139 pj_mutex_unlock( stream->jb_mutex ); 140 111 141 jb_frame = p; 112 if (status != 0|| jb_frame == NULL) {142 if (status != PJ_SUCCESS || jb_frame == NULL) { 113 143 pj_memset(frame, 0, size); 114 pj_mutex_unlock(channel->mutex);115 144 return 0; 116 145 } … … 119 148 frame_in.buf = jb_frame->buf; 120 149 frame_in.size = jb_frame->size; 121 frame_in.type = PJ _AUDIO_FRAME_AUDIO; /* ignored */150 frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO; /* ignored */ 122 151 frame_out.buf = channel->pcm_buf; 123 status = channel->codec->op->decode (channel->codec, &frame_in,124 152 status = stream->codec->op->decode( stream->codec, &frame_in, 153 channel->pcm_buf_size, &frame_out); 125 154 if (status != 0) { 126 PJ_LOG(3, (THISFILE, "decode() has return error status %d", 127 status)); 155 TRACE_((THIS_FILE, "decode() has return error status %d", status)); 128 156 129 157 pj_memset(frame, 0, size); 130 158 pj_fifobuf_free (&channel->fifobuf, jb_frame); 131 pj_mutex_unlock(channel->mutex);132 159 return 0; 133 160 } … … 135 162 /* Put in sound buffer. */ 136 163 if (frame_out.size > size) { 137 PJ_LOG(3, (THISFILE, "Sound playout buffer truncated %d bytes",138 164 TRACE_((THIS_FILE, "Sound playout buffer truncated %d bytes", 165 frame_out.size - size)); 139 166 frame_out.size = size; 140 167 } 141 168 142 169 pj_memcpy(frame, frame_out.buf, size); 143 144 170 pj_fifobuf_free (&channel->fifobuf, jb_frame); 145 pj_mutex_unlock(channel->mutex); 171 146 172 return 0; 147 173 } 148 174 175 176 /** 177 * rec_callback() 178 * 179 * This callback is called when the mic device has gathered 180 * enough audio samples. We will encode the audio samples and 181 * send it to remote. 182 */ 149 183 static pj_status_t rec_callback( /* in */ void *user_data, 150 184 /* in */ pj_uint32_t timestamp, … … 152 186 /* in */ unsigned size) 153 187 { 154 pj_media_stream_t *channel = user_data; 188 pjmedia_channel *channel = user_data; 189 pjmedia_stream *stream = channel->stream; 155 190 pj_status_t status = 0; 156 struct pj _audio_frame frame_in, frame_out;191 struct pjmedia_frame frame_in, frame_out; 157 192 int ts_len; 158 193 void *rtphdr; 159 194 int rtphdrlen; 160 195 pj_ssize_t sent; 161 #if 0 162 static FILE *fhnd = NULL; 163 #endif 196 164 197 165 198 PJ_UNUSED_ARG(timestamp); 166 199 167 /* Start locking channel mutex */ 168 pj_mutex_lock (channel->mutex); 169 170 if (!channel->codec) { 171 status = -1; 172 goto on_return; 173 } 200 /* Check if stream is quitting. */ 201 if (stream->quit_flag) 202 return -1; 174 203 175 204 /* Encode. */ 176 frame_in.type = PJ _MEDIA_TYPE_AUDIO;205 frame_in.type = PJMEDIA_TYPE_AUDIO; 177 206 frame_in.buf = (void*)frame; 178 207 frame_in.size = size; 179 208 frame_out.buf = ((char*)channel->out_pkt) + sizeof(pj_rtp_hdr); 180 status = channel->codec->op->encode (channel->codec, &frame_in,181 182 209 status = stream->codec->op->encode( stream->codec, &frame_in, 210 channel->out_pkt_size - sizeof(pj_rtp_hdr), 211 &frame_out); 183 212 if (status != 0) { 184 PJ_LOG(3,(THISFILE, "Codec encode() has returned error status %d",185 186 goto on_return;213 TRACE_((THIS_FILE, "Codec encode() has returned error status %d", 214 status)); 215 return status; 187 216 } 188 217 189 218 /* Encapsulate. */ 190 219 ts_len = size / (channel->snd_info.bits_per_sample / 8); 191 status = pj_rtp_encode_rtp (&channel->rtp, channel->pt, 0, 220 status = pj_rtp_encode_rtp( &channel->rtp, 221 channel->pt, 0, 192 222 frame_out.size, ts_len, 193 223 (const void**)&rtphdr, &rtphdrlen); 194 224 if (status != 0) { 195 PJ_LOG(3,(THISFILE, "RTP encode_rtp() has returned error status %d",196 197 goto on_return;225 TRACE_((THIS_FILE, "RTP encode_rtp() has returned error status %d", 226 status)); 227 return status; 198 228 } 199 229 … … 201 231 /* We don't support RTP with extended header yet. */ 202 232 PJ_TODO(SUPPORT_SENDING_RTP_WITH_EXTENDED_HEADER); 203 PJ_LOG(3,(THISFILE, "Unsupported extended RTP header for transmission"));204 goto on_return;233 TRACE_((THIS_FILE, "Unsupported extended RTP header for transmission")); 234 return 0; 205 235 } 206 236 … … 209 239 /* Send. */ 210 240 sent = frame_out.size+sizeof(pj_rtp_hdr); 211 status = pj_sock_sendto (channel->rtp_sock, channel->out_pkt, &sent, 0,212 &channel->dst_addr, sizeof(channel->dst_addr));213 if (status != PJ_SUCCESS) 214 goto on_return;241 status = pj_sock_sendto(stream->rtp_sock, channel->out_pkt, &sent, 0, 242 &stream->dst_addr, sizeof(stream->dst_addr)); 243 if (status != PJ_SUCCESS) 244 return status; 215 245 216 246 /* Update stat */ 217 channel->stat.pkt_tx++; 218 channel->stat.oct_tx += frame_out.size+sizeof(pj_rtp_hdr); 219 220 #if 0 221 if (fhnd == NULL) { 222 fhnd = fopen("RTP.DAT", "wb"); 223 if (fhnd) { 224 fwrite (channel->out_pkt, frame_out.size+sizeof(pj_rtp_hdr), 1, fhnd); 225 fclose(fhnd); 226 } 227 } 228 #endif 229 230 on_return: 231 pj_mutex_unlock (channel->mutex); 232 return status; 233 } 234 235 236 static int PJ_THREAD_FUNC stream_decoder_transport_thread (void*arg) 237 { 238 pj_media_stream_t *channel = arg; 239 240 while (!channel->thread_quit_flag) { 247 stream->stat.enc.pkt++; 248 stream->stat.enc.bytes += frame_out.size+sizeof(pj_rtp_hdr); 249 250 return 0; 251 } 252 253 254 /* 255 * This thread will poll the socket for incoming packets, and put 256 * the packets to jitter buffer. 257 */ 258 static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg) 259 { 260 pjmedia_stream *stream = arg; 261 pjmedia_channel *channel = stream->dec; 262 263 while (!stream->quit_flag) { 241 264 pj_ssize_t len, size; 242 265 const pj_rtp_hdr *hdr; … … 251 274 252 275 PJ_FD_ZERO (&fds); 253 PJ_FD_SET ( channel->rtp_sock, &fds);276 PJ_FD_SET (stream->rtp_sock, &fds); 254 277 timeout.sec = 0; 255 timeout.msec = 1 00;278 timeout.msec = 1; 256 279 257 280 /* Wait with timeout. */ 258 status = pj_sock_select( channel->rtp_sock, &fds, NULL, NULL, &timeout);281 status = pj_sock_select(stream->rtp_sock, &fds, NULL, NULL, &timeout); 259 282 if (status != 1) 260 283 continue; … … 262 285 /* Get packet from socket. */ 263 286 len = channel->in_pkt_size; 264 status = pj_sock_recv (channel->rtp_sock, channel->in_pkt, &len, 0);287 status = pj_sock_recv(stream->rtp_sock, channel->in_pkt, &len, 0); 265 288 if (len < 1 || status != PJ_SUCCESS) { 266 289 if (pj_get_netos_error() == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { 267 /* On Win2K SP2 (or above) and WinXP, recv() will get WSAECONNRESET 268 when the sending side receives ICMP port unreachable. 290 /* On Win2K SP2 (or above) and WinXP, recv() will get 291 * WSAECONNRESET when the sending side receives ICMP port 292 * unreachable. 269 293 */ 270 294 continue; 271 295 } 272 //pj_perror(THISFILE, "Error receiving packet from socket (len=%d)", len);273 296 pj_thread_sleep(1); 274 297 continue; 275 298 } 276 299 277 if (channel-> state != STREAM_STARTED)300 if (channel->paused) 278 301 continue; 279 302 280 if (channel->thread_quit_flag)281 break;282 283 /* Start locking the channel. */284 pj_mutex_lock (channel->mutex);285 286 303 /* Update RTP and RTCP session. */ 287 status = pj_rtp_decode_rtp (&channel->rtp, channel->in_pkt, len, &hdr, &payload, &payloadlen); 288 if (status != 0) { 289 pj_mutex_unlock (channel->mutex); 290 PJ_LOG(4,(THISFILE, "RTP decode_rtp() has returned error status %d", status)); 304 status = pj_rtp_decode_rtp(&channel->rtp, channel->in_pkt, len, 305 &hdr, &payload, &payloadlen); 306 if (status != PJ_SUCCESS) { 307 TRACE_((THIS_FILE, "RTP decode_rtp() has returned error status %d", 308 status)); 291 309 continue; 292 310 } 293 status = pj_rtp_session_update (&channel->rtp, hdr); 294 if (status != 0 && status != PJ_RTP_ERR_SESSION_PROBATION && status != PJ_RTP_ERR_SESSION_RESTARTED) { 295 pj_mutex_unlock (channel->mutex); 296 PJ_LOG(4,(THISFILE, "RTP session_update() has returned error status %d", status)); 311 312 status = pj_rtp_session_update(&channel->rtp, hdr); 313 if (status != 0 && 314 status != PJMEDIA_RTP_ERR_SESSION_PROBATION && 315 status != PJMEDIA_RTP_ERR_SESSION_RESTARTED) 316 { 317 TRACE_((THIS_FILE, 318 "RTP session_update() has returned error status %d", 319 status)); 297 320 continue; 298 321 } 299 pj_rtcp_rx_rtp (channel->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts));322 pj_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts)); 300 323 301 324 /* Update stat */ 302 channel->stat.pkt_rx++;303 channel->stat.oct_rx+= len;325 stream->stat.dec.pkt++; 326 stream->stat.dec.bytes += len; 304 327 305 328 /* Copy to FIFO buffer. */ … … 307 330 jb_frame = pj_fifobuf_alloc (&channel->fifobuf, size); 308 331 if (jb_frame == NULL) { 309 pj_mutex_unlock (channel->mutex);310 PJ_LOG(4,(THISFILE, "Unable to allocate %d bytes FIFO buffer",size));332 TRACE_((THIS_FILE, "Unable to allocate %d bytes FIFO buffer", 333 size)); 311 334 continue; 312 335 } … … 318 341 319 342 /* Put to jitter buffer. */ 320 status = pj_jb_put (channel->jb, pj_ntohs(hdr->seq), jb_frame); 343 pj_mutex_lock( stream->jb_mutex ); 344 status = pj_jb_put(&stream->jb, pj_ntohs(hdr->seq), jb_frame); 345 pj_mutex_unlock( stream->jb_mutex ); 346 321 347 if (status != 0) { 322 348 pj_fifobuf_unalloc (&channel->fifobuf, jb_frame); 323 pj_mutex_unlock (channel->mutex); 324 PJ_LOG(4,(THISFILE, "Jitter buffer put() has returned error status %d", status)); 349 350 TRACE_((THIS_FILE, 351 "Jitter buffer put() has returned error status %d", 352 status)); 325 353 continue; 326 354 } 327 328 pj_mutex_unlock (channel->mutex);329 355 } 330 356 … … 332 358 } 333 359 334 static void init_snd_param_from_codec_attr (pj_snd_stream_info *param, 335 const pj_codec_attr *attr) 336 { 337 param->bits_per_sample = attr->pcm_bits_per_sample; 338 param->bytes_per_frame = 2; 339 param->frames_per_packet = attr->sample_rate * attr->ptime / 1000; 340 param->samples_per_frame = 1; 341 param->samples_per_sec = attr->sample_rate; 342 } 343 344 static pj_media_stream_t *create_channel ( pj_pool_t *pool, 345 pj_media_dir_t dir, 346 pj_media_stream_t *peer, 347 pj_codec_id *codec_id, 348 pj_media_stream_create_param *param) 349 { 350 pj_media_stream_t *channel; 351 pj_codec_attr codec_attr; 352 void *ptr; 353 unsigned size; 360 361 /* 362 * Create sound stream parameter from codec attributes. 363 */ 364 static void init_snd_param( pj_snd_stream_info *snd_param, 365 const pjmedia_codec_param *codec_param) 366 { 367 pj_memset(snd_param, 0, sizeof(*snd_param)); 368 369 snd_param->bits_per_sample = codec_param->pcm_bits_per_sample; 370 snd_param->bytes_per_frame = 2; 371 snd_param->frames_per_packet = codec_param->sample_rate * 372 codec_param->ptime / 373 1000; 374 snd_param->samples_per_frame = 1; 375 snd_param->samples_per_sec = codec_param->sample_rate; 376 } 377 378 379 /* 380 * Create media channel. 381 */ 382 static pj_status_t create_channel( pj_pool_t *pool, 383 pjmedia_stream *stream, 384 pjmedia_dir dir, 385 const pjmedia_stream_info *param, 386 const pjmedia_codec_param *codec_param, 387 pjmedia_channel **p_channel) 388 { 389 pjmedia_channel *channel; 354 390 pj_status_t status; 355 391 356 392 /* Allocate memory for channel descriptor */ 357 size = sizeof(pj_media_stream_t); 358 channel = pj_pool_calloc(pool, 1, size); 359 if (!channel) { 360 PJ_LOG(1,(THISFILE, "Unable to allocate %u bytes channel descriptor", 361 size)); 362 return NULL; 363 } 364 393 394 channel = pj_pool_zalloc(pool, sizeof(pjmedia_channel)); 395 PJ_ASSERT_RETURN(channel != NULL, PJ_ENOMEM); 396 397 /* Init channel info. */ 398 399 channel->stream = stream; 365 400 channel->dir = dir; 366 channel->pt = codec_id->pt; 367 channel->peer = peer; 368 channel->codec_mgr = pj_med_mgr_get_codec_mgr (param->mediamgr); 369 channel->rtp_sock = param->rtp_sock; 370 channel->rtcp_sock = param->rtcp_sock; 371 channel->dst_addr = *param->remote_addr; 372 channel->state = STREAM_STOPPED; 373 374 /* Create mutex for the channel. */ 375 status = pj_mutex_create_simple(pool, NULL, &channel->mutex); 376 if (status != PJ_SUCCESS) 377 goto err_cleanup; 378 379 /* Create and initialize codec, only if peer is not present. 380 We only use one codec instance for both encoder and decoder. 381 */ 382 if (peer && peer->codec) { 383 channel->codec = peer->codec; 384 status = channel->codec->factory->op->default_attr(channel->codec->factory, codec_id, 385 &codec_attr); 386 if (status != 0) { 387 goto err_cleanup; 388 } 389 390 } else { 391 channel->codec = pj_codec_mgr_alloc_codec(channel->codec_mgr, codec_id); 392 if (channel->codec == NULL) { 393 goto err_cleanup; 394 } 395 396 status = channel->codec->factory->op->default_attr(channel->codec->factory, codec_id, 397 &codec_attr); 398 if (status != 0) { 399 goto err_cleanup; 400 } 401 402 codec_attr.pt = codec_id->pt; 403 status = channel->codec->op->open(channel->codec, &codec_attr); 404 if (status != 0) { 405 goto err_cleanup; 406 } 407 } 401 channel->paused = 1; 402 channel->pt = param->fmt.pt; 408 403 409 404 /* Allocate buffer for incoming packet. */ 410 channel->in_pkt_size = PJ_MAX_MTU; 411 channel->in_pkt = pj_pool_alloc(pool, channel->in_pkt_size); 412 if (!channel->in_pkt) { 413 PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes incoming packet buffer", 414 channel->in_pkt_size)); 415 goto err_cleanup; 416 } 417 405 406 channel->in_pkt_size = PJMEDIA_MAX_MTU; 407 channel->in_pkt = pj_pool_alloc( pool, channel->in_pkt_size ); 408 PJ_ASSERT_RETURN(channel->in_pkt != NULL, PJ_ENOMEM); 409 410 418 411 /* Allocate buffer for outgoing packet. */ 412 419 413 channel->out_pkt_size = sizeof(pj_rtp_hdr) + 420 codec_attr.avg_bps / 8 * PJ_MAX_FRAME_DURATION_MS / 1000; 421 if (channel->out_pkt_size > PJ_MAX_MTU) 422 channel->out_pkt_size = PJ_MAX_MTU; 414 codec_param->avg_bps/8 * 415 PJMEDIA_MAX_FRAME_DURATION_MS / 416 1000; 417 418 if (channel->out_pkt_size > PJMEDIA_MAX_MTU) 419 channel->out_pkt_size = PJMEDIA_MAX_MTU; 420 423 421 channel->out_pkt = pj_pool_alloc(pool, channel->out_pkt_size); 424 if (!channel->out_pkt) { 425 PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes encoding buffer", 426 channel->out_pkt_size)); 427 goto err_cleanup; 428 } 429 430 /* Allocate buffer for decoding to PCM */ 431 channel->pcm_buf_size = codec_attr.sample_rate * 432 codec_attr.pcm_bits_per_sample / 8 * 433 PJ_MAX_FRAME_DURATION_MS / 1000; 422 PJ_ASSERT_RETURN(channel->out_pkt != NULL, PJ_ENOMEM); 423 424 425 /* Allocate buffer for decoding to PCM: */ 426 427 channel->pcm_buf_size = codec_param->sample_rate * 428 codec_param->pcm_bits_per_sample / 8 * 429 PJMEDIA_MAX_FRAME_DURATION_MS / 1000; 434 430 channel->pcm_buf = pj_pool_alloc (pool, channel->pcm_buf_size); 435 if (!channel->pcm_buf) { 436 PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes PCM buffer", 437 channel->pcm_buf_size)); 438 goto err_cleanup; 439 } 440 441 /* Allocate buffer for frames put in jitter buffer. */ 442 size = codec_attr.avg_bps / 8 * PJ_MAX_BUFFER_SIZE_MS / 1000; 443 ptr = pj_pool_alloc(pool, size); 444 if (!ptr) { 445 PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes jitter buffer", 446 channel->pcm_buf_size)); 447 goto err_cleanup; 448 } 449 //pj_fifobuf_init (&channel->fifobuf, ptr, size); 431 PJ_ASSERT_RETURN(channel->pcm_buf != NULL, PJ_ENOMEM); 432 433 434 /* Create RTP and RTCP sessions: */ 435 436 status = pj_rtp_session_init(&channel->rtp, param->fmt.pt, 437 param->ssrc); 438 if (status != PJ_SUCCESS) 439 return status; 450 440 451 441 /* Create and initialize sound device */ 452 init_snd_param_from_codec_attr (&channel->snd_info, &codec_attr); 453 454 if (dir == PJ_MEDIA_DIR_ENCODING) 442 443 init_snd_param(&channel->snd_info, codec_param); 444 445 if (dir == PJMEDIA_DIR_ENCODING) 455 446 channel->snd_stream = pj_snd_open_recorder(-1, &channel->snd_info, 456 447 &rec_callback, channel); … … 460 451 461 452 if (!channel->snd_stream) 462 goto err_cleanup; 463 464 /* Create RTP and RTCP sessions. */ 465 if (pj_rtp_session_init(&channel->rtp, codec_id->pt, param->ssrc) != 0) { 466 PJ_LOG(1, (THISFILE, "RTP session initialization error")); 467 goto err_cleanup; 468 } 469 470 /* For decoder, create RTCP session, jitter buffer, and transport thread. */ 471 if (dir == PJ_MEDIA_DIR_DECODING) { 472 channel->rtcp = pj_pool_calloc(pool, 1, sizeof(pj_rtcp_session)); 473 if (!channel->rtcp) { 474 PJ_LOG(1, (THISFILE, "Unable to allocate RTCP session")); 475 goto err_cleanup; 476 } 477 478 pj_rtcp_init(channel->rtcp, param->ssrc); 479 480 channel->jb = pj_pool_calloc(pool, 1, sizeof(pj_jitter_buffer)); 481 if (!channel->jb) { 482 PJ_LOG(1, (THISFILE, "Unable to allocate jitter buffer descriptor")); 483 goto err_cleanup; 484 } 485 if (pj_jb_init(channel->jb, pool, param->jb_min, param->jb_max, param->jb_maxcnt)) { 486 PJ_LOG(1, (THISFILE, "Unable to allocate jitter buffer")); 487 goto err_cleanup; 488 } 489 490 status = pj_thread_create(pool, "decode", 491 &stream_decoder_transport_thread, channel, 492 0, 0, &channel->transport_thread); 493 if (status != PJ_SUCCESS) { 494 //pj_perror(THISFILE, "Unable to create transport thread"); 495 goto err_cleanup; 496 } 497 } 453 return -1; 454 498 455 499 456 /* Done. */ 500 return channel; 457 *p_channel = channel; 458 return PJ_SUCCESS; 459 } 460 461 462 /* 463 * Create media stream. 464 */ 465 PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, 466 pj_pool_t *pool, 467 const pjmedia_stream_info *info, 468 pjmedia_stream **p_stream) 469 470 { 471 pjmedia_stream *stream; 472 pjmedia_codec_param codec_param; 473 pj_status_t status; 474 475 PJ_ASSERT_RETURN(pool && info && p_stream, PJ_EINVAL); 476 477 478 /* Allocate the media stream: */ 479 480 stream = pj_pool_zalloc(pool, sizeof(pjmedia_stream)); 481 PJ_ASSERT_RETURN(stream != NULL, PJ_ENOMEM); 482 483 484 /* Init stream: */ 485 486 stream->dir = info->dir; 487 stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); 488 489 /* Create mutex to protect jitter buffer: */ 490 491 status = pj_mutex_create_simple(pool, NULL, &stream->jb_mutex); 492 if (status != PJ_SUCCESS) 493 goto err_cleanup; 494 495 496 /* Create and initialize codec: */ 497 498 status = pjmedia_codec_mgr_alloc_codec( stream->codec_mgr, 499 &info->fmt, &stream->codec); 500 if (status != PJ_SUCCESS) 501 goto err_cleanup; 502 503 504 /* Get default codec param: */ 505 506 status = stream->codec->op->default_attr(stream->codec, &codec_param); 507 if (status != PJ_SUCCESS) 508 goto err_cleanup; 509 510 511 /* Open the codec: */ 512 513 status = stream->codec->op->open(stream->codec, &codec_param); 514 if (status != PJ_SUCCESS) 515 goto err_cleanup; 516 517 518 /* Init RTCP session: */ 519 520 pj_rtcp_init(&stream->rtcp, info->ssrc); 521 522 523 /* Init jitter buffer: */ 524 525 status = pj_jb_init(&stream->jb, pool, 526 info->jb_min, info->jb_max, info->jb_maxcnt); 527 if (status != PJ_SUCCESS) 528 goto err_cleanup; 529 530 531 /* Create jitter buffer thread: */ 532 533 status = pj_thread_create(pool, "decode", 534 &jitter_buffer_thread, stream, 535 0, 0, &stream->thread); 536 if (status != PJ_SUCCESS) 537 goto err_cleanup; 538 539 540 /* Create decoder channel: */ 541 542 status = create_channel( pool, stream, PJMEDIA_DIR_DECODING, info, 543 &codec_param, &stream->dec); 544 if (status != PJ_SUCCESS) 545 goto err_cleanup; 546 547 548 /* Create encoder channel: */ 549 550 status = create_channel( pool, stream, PJMEDIA_DIR_ENCODING, info, 551 &codec_param, &stream->enc); 552 if (status != PJ_SUCCESS) 553 goto err_cleanup; 554 555 556 /* Success! */ 557 *p_stream = stream; 558 return PJ_SUCCESS; 559 501 560 502 561 err_cleanup: 503 pj_media_stream_destroy(channel); 504 return NULL; 505 } 506 507 508 PJ_DEF(pj_status_t) pj_media_stream_create (pj_pool_t *pool, 509 pj_media_stream_t **enc_stream, 510 pj_media_stream_t **dec_stream, 511 pj_media_stream_create_param *param) 512 { 513 *dec_stream = *enc_stream = NULL; 514 515 if (param->dir & PJ_MEDIA_DIR_DECODING) { 516 *dec_stream = 517 create_channel(pool, PJ_MEDIA_DIR_DECODING, NULL, param->codec_id, param); 518 if (!*dec_stream) 519 return -1; 520 } 521 522 if (param->dir & PJ_MEDIA_DIR_ENCODING) { 523 *enc_stream = 524 create_channel(pool, PJ_MEDIA_DIR_ENCODING, *dec_stream, param->codec_id, param); 525 if (!*enc_stream) { 526 if (*dec_stream) { 527 pj_media_stream_destroy(*dec_stream); 528 *dec_stream = NULL; 529 } 530 return -1; 531 } 532 533 if (*dec_stream) { 534 (*dec_stream)->peer = *enc_stream; 535 } 536 } 537 538 return 0; 539 } 540 541 PJ_DEF(pj_status_t) pj_media_stream_start (pj_media_stream_t *channel) 542 { 543 pj_status_t status; 544 545 status = pj_snd_stream_start(channel->snd_stream); 546 547 if (status == 0) 548 channel->state = STREAM_STARTED; 562 pjmedia_stream_destroy(stream); 549 563 return status; 550 564 } 551 565 552 PJ_DEF(pj_status_t) pj_media_stream_get_stat (const pj_media_stream_t *stream, 553 pj_media_stream_stat *stat) 554 { 555 if (stream->dir == PJ_MEDIA_DIR_ENCODING) { 556 pj_memcpy (stat, &stream->stat, sizeof(*stat)); 557 } else { 558 pj_rtcp_pkt *rtcp_pkt; 559 int len; 560 561 pj_memset (stat, 0, sizeof(*stat)); 562 pj_assert (stream->rtcp != 0); 563 pj_rtcp_build_rtcp (stream->rtcp, &rtcp_pkt, &len); 564 565 stat->pkt_rx = stream->stat.pkt_rx; 566 stat->oct_rx = stream->stat.oct_rx; 567 568 PJ_TODO(SUPPORT_JITTER_CALCULATION_FOR_NON_8KHZ_SAMPLE_RATE) 569 stat->jitter = pj_ntohl(rtcp_pkt->rr.jitter) / 8; 570 stat->pkt_lost = (rtcp_pkt->rr.total_lost_2 << 16) + 571 (rtcp_pkt->rr.total_lost_1 << 8) + 572 rtcp_pkt->rr.total_lost_0; 573 } 574 return 0; 575 } 576 577 PJ_DEF(pj_status_t) pj_media_stream_pause (pj_media_stream_t *channel) 578 { 579 PJ_UNUSED_ARG(channel); 580 return -1; 581 } 582 583 PJ_DEF(pj_status_t) pj_media_stream_resume (pj_media_stream_t *channel) 584 { 585 PJ_UNUSED_ARG(channel); 586 return -1; 587 } 588 589 PJ_DEF(pj_status_t) pj_media_stream_destroy (pj_media_stream_t *channel) 590 { 591 channel->thread_quit_flag = 1; 592 593 pj_mutex_lock (channel->mutex); 594 if (channel->peer) 595 pj_mutex_lock (channel->peer->mutex); 596 597 if (channel->jb) { 598 /* No need to deinitialize jitter buffer. */ 599 } 600 if (channel->transport_thread) { 601 pj_thread_join(channel->transport_thread); 602 pj_thread_destroy(channel->transport_thread); 603 channel->transport_thread = NULL; 604 } 605 if (channel->snd_stream != NULL) { 606 pj_mutex_unlock (channel->mutex); 607 pj_snd_stream_stop(channel->snd_stream); 608 pj_mutex_lock (channel->mutex); 609 pj_snd_stream_close(channel->snd_stream); 610 channel->snd_stream = NULL; 611 } 612 if (channel->codec) { 613 channel->codec->op->close(channel->codec); 614 pj_codec_mgr_dealloc_codec(channel->codec_mgr, channel->codec); 615 channel->codec = NULL; 616 } 617 if (channel->peer) { 618 pj_media_stream_t *peer = channel->peer; 619 peer->peer = NULL; 620 peer->codec = NULL; 621 peer->thread_quit_flag = 1; 622 if (peer->transport_thread) { 623 pj_mutex_unlock (peer->mutex); 624 pj_thread_join(peer->transport_thread); 625 pj_mutex_lock (peer->mutex); 626 pj_thread_destroy(peer->transport_thread); 627 peer->transport_thread = NULL; 628 } 629 if (peer->snd_stream) { 630 pj_mutex_unlock (peer->mutex); 631 pj_snd_stream_stop(peer->snd_stream); 632 pj_mutex_lock (peer->mutex); 633 pj_snd_stream_close(peer->snd_stream); 634 peer->snd_stream = NULL; 635 } 636 } 637 638 channel->state = STREAM_STOPPED; 639 640 if (channel->peer) 641 pj_mutex_unlock (channel->peer->mutex); 642 pj_mutex_unlock(channel->mutex); 643 pj_mutex_destroy(channel->mutex); 644 645 return 0; 646 } 647 566 567 /* 568 * Destroy stream. 569 */ 570 PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream ) 571 { 572 573 PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL); 574 575 /* Signal threads to quit. */ 576 577 stream->quit_flag = 1; 578 579 580 /* Close encoding sound stream. */ 581 582 if (stream->enc && stream->enc->snd_stream) { 583 584 pj_snd_stream_stop(stream->enc->snd_stream); 585 pj_snd_stream_close(stream->enc->snd_stream); 586 stream->enc->snd_stream = NULL; 587 588 } 589 590 /* Close decoding sound stream. */ 591 592 if (stream->dec && stream->dec->snd_stream) { 593 594 pj_snd_stream_stop(stream->dec->snd_stream); 595 pj_snd_stream_close(stream->dec->snd_stream); 596 stream->dec->snd_stream = NULL; 597 598 } 599 600 /* Wait for jitter buffer thread to quit: */ 601 602 if (stream->thread) { 603 pj_thread_join(stream->thread); 604 pj_thread_destroy(stream->thread); 605 stream->thread = NULL; 606 } 607 608 /* Free codec. */ 609 610 if (stream->codec) { 611 stream->codec->op->close(stream->codec); 612 pjmedia_codec_mgr_dealloc_codec(stream->codec_mgr, stream->codec); 613 stream->codec = NULL; 614 } 615 616 /* Free mutex */ 617 618 if (stream->jb_mutex) { 619 pj_mutex_destroy(stream->jb_mutex); 620 stream->jb_mutex = NULL; 621 } 622 623 return PJ_SUCCESS; 624 } 625 626 627 628 /* 629 * Start stream. 630 */ 631 PJ_DEF(pj_status_t) pjmedia_stream_start(pjmedia_stream *stream) 632 { 633 634 PJ_ASSERT_RETURN(stream && stream->enc && stream->dec, PJ_EINVALIDOP); 635 636 if (stream->enc && (stream->dir & PJMEDIA_DIR_ENCODING)) { 637 stream->enc->paused = 0; 638 pj_snd_stream_start(stream->enc->snd_stream); 639 } 640 641 if (stream->dec && (stream->dir & PJMEDIA_DIR_DECODING)) { 642 stream->dec->paused = 0; 643 pj_snd_stream_start(stream->dec->snd_stream); 644 } 645 646 return PJ_SUCCESS; 647 } 648 649 650 /* 651 * Get stream statistics. 652 */ 653 PJ_DEF(pj_status_t) pjmedia_stream_get_stat( const pjmedia_stream *stream, 654 pjmedia_stream_stat *stat) 655 { 656 PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL); 657 658 pj_memcpy(stat, &stream->stat, sizeof(pjmedia_stream_stat)); 659 660 return PJ_SUCCESS; 661 } 662 663 664 /* 665 * Pause stream. 666 */ 667 PJ_DEF(pj_status_t) pjmedia_stream_pause( pjmedia_stream *stream, 668 pjmedia_dir dir) 669 { 670 PJ_ASSERT_RETURN(stream, PJ_EINVAL); 671 672 if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) 673 stream->enc->paused = 1; 674 675 if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) 676 stream->dec->paused = 1; 677 678 return PJ_SUCCESS; 679 } 680 681 682 /* 683 * Resume stream 684 */ 685 PJ_DEF(pj_status_t) pjmedia_stream_resume( pjmedia_stream *stream, 686 pjmedia_dir dir) 687 { 688 PJ_ASSERT_RETURN(stream, PJ_EINVAL); 689 690 if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) 691 stream->enc->paused = 1; 692 693 if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) 694 stream->dec->paused = 1; 695 696 return PJ_SUCCESS; 697 } 698
Note: See TracChangeset
for help on using the changeset viewer.