Changeset 350 for pjproject/trunk/pjmedia/src/pjmedia/stream.c
- Timestamp:
- Mar 22, 2006 11:59:11 AM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjmedia/src/pjmedia/stream.c
r327 r350 27 27 #include <pj/compat/socket.h> 28 28 #include <pj/errno.h> 29 #include <pj/ioqueue.h> 29 30 #include <pj/log.h> 30 31 #include <pj/os.h> … … 79 80 struct pjmedia_stream 80 81 { 82 pjmedia_endpt *endpt; /**< Media endpoint. */ 83 pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */ 84 81 85 pjmedia_port port; /**< Port interface. */ 82 86 pjmedia_channel *enc; /**< Encoding channel. */ … … 87 91 void *user_data; /**< User data. */ 88 92 89 pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */90 93 pjmedia_codec *codec; /**< Codec instance being used. */ 91 94 pj_size_t frame_size; /**< Size of encoded frame. */ … … 97 100 pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address. */ 98 101 99 pj_sockaddr_in rem_src_rtp; /**< addr of src pkt from remote*/100 unsigned rem_src_cnt; /**< if different, # of pkt rcv */101 102 102 103 pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */ 103 104 104 pj_bool_t quit_flag; /**< To signal thread exit. */ 105 pj_thread_t *thread; /**< Jitter buffer's thread. */ 105 pj_ioqueue_key_t *rtp_key; /**< RTP ioqueue key. */ 106 pj_ioqueue_op_key_t rtp_op_key; /**< The pending read op key. */ 107 pj_sockaddr_in rtp_src_addr; /**< addr of src pkt from remote*/ 108 unsigned rtp_src_cnt; /**< if different, # of pkt rcv */ 109 int rtp_addrlen; /**< Address length. */ 110 111 pj_ioqueue_key_t *rtcp_key; /**< RTCP ioqueue key. */ 112 pj_ioqueue_op_key_t rtcp_op_key; /**< The pending read op key. */ 113 106 114 107 115 /* RFC 2833 DTMF transmission queue: */ … … 152 160 pj_status_t status; 153 161 struct pjmedia_frame frame_in, frame_out; 154 155 /* Do nothing if we're quitting. */156 if (stream->quit_flag) {157 frame->type = PJMEDIA_FRAME_TYPE_NONE;158 return PJ_SUCCESS;159 }160 162 161 163 /* Lock jitter buffer mutex */ … … 272 274 pj_ssize_t sent; 273 275 274 /* Check if stream is quitting. */275 if (stream->quit_flag)276 return -1;277 278 276 /* Number of samples in the frame */ 279 277 ts_len = frame->size / 2; … … 439 437 440 438 /* 441 * This thread will poll the socket for incoming packets, and put 442 * the packets to jitter buffer. 443 */ 444 static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg) 445 { 446 pjmedia_stream *stream = arg; 439 * This callback is called by ioqueue framework on receipt of packets 440 * in the RTP socket. 441 */ 442 static void on_rx_rtp( pj_ioqueue_key_t *key, 443 pj_ioqueue_op_key_t *op_key, 444 pj_ssize_t bytes_read) 445 446 { 447 pjmedia_stream *stream = pj_ioqueue_get_user_data(key); 447 448 pjmedia_channel *channel = stream->dec; 448 449 450 while (!stream->quit_flag) { 451 pj_ssize_t len; 449 pj_status_t status; 450 451 452 PJ_UNUSED_ARG(op_key); 453 454 455 /* 456 * Loop while we have packet. 457 */ 458 do { 452 459 const pjmedia_rtp_hdr *hdr; 453 460 const void *payload; 454 461 unsigned payloadlen; 455 int addrlen; 456 int status; 457 458 /* Wait for packet. */ 459 pj_fd_set_t fds; 460 pj_time_val timeout; 461 462 PJ_FD_ZERO (&fds); 463 PJ_FD_SET (stream->skinfo.rtp_sock, &fds); 464 timeout.sec = 0; 465 timeout.msec = 1; 466 467 /* Wait with timeout. */ 468 status = pj_sock_select(FD_SETSIZE, &fds, NULL, NULL, &timeout); 469 if (status < 0) { 470 TRACE_((THIS_FILE, "Jitter buffer select() error", 471 pj_get_netos_error())); 472 pj_thread_sleep(500); 473 continue; 474 } else if (status == 0) 475 continue; 476 477 /* Get packet from socket. */ 478 len = channel->in_pkt_size; 479 addrlen = sizeof(stream->rem_src_rtp); 480 status = pj_sock_recvfrom(stream->skinfo.rtp_sock, 481 channel->in_pkt, &len, 0, 482 &stream->rem_src_rtp, &addrlen); 483 if (len < 1 || status != PJ_SUCCESS) { 484 if (pj_get_netos_error() == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { 485 /* On Win2K SP2 (or above) and WinXP, recv() will get 486 * WSAECONNRESET when the sending side receives ICMP port 487 * unreachable. 488 */ 489 continue; 490 } 491 pj_thread_sleep(1); 492 continue; 493 } 494 495 if (channel->paused) 496 continue; 462 463 /* Go straight to read next packet if bytes_read == 0. 464 */ 465 if (bytes_read == 0) 466 goto read_next_packet; 467 497 468 498 469 /* Update RTP and RTCP session. */ 499 status = pjmedia_rtp_decode_rtp(&channel->rtp, channel->in_pkt, len, 500 &hdr, &payload, &payloadlen); 470 status = pjmedia_rtp_decode_rtp(&channel->rtp, 471 channel->in_pkt, bytes_read, 472 &hdr, &payload, &payloadlen); 501 473 if (status != PJ_SUCCESS) { 502 474 TRACE_((THIS_FILE, "RTP decode error", status)); 503 continue;475 goto read_next_packet; 504 476 } 477 505 478 506 479 /* Handle incoming DTMF. */ 507 480 if (hdr->pt == stream->rx_event_pt) { 508 481 handle_incoming_dtmf(stream, payload, payloadlen); 509 continue;482 goto read_next_packet; 510 483 } 511 484 485 486 /* Update RTP session (also checks if RTP session can accept 487 * the incoming packet. 488 */ 512 489 status = pjmedia_rtp_session_update(&channel->rtp, hdr); 513 490 if (status != 0 && … … 519 496 PJ_LOG(4,(THIS_FILE,"RTP packet detail: pt=%d, seq=%d", 520 497 hdr->pt, pj_ntohs(hdr->seq))); 521 continue;498 goto read_next_packet; 522 499 } 523 pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts)); 500 501 502 /* Update the RTCP session. */ 503 pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), 504 pj_ntohl(hdr->ts)); 505 524 506 525 507 /* Update stat */ 526 508 stream->stat.dec.pkt++; 527 stream->stat.dec.bytes += len; 509 stream->stat.dec.bytes += bytes_read; 510 528 511 529 512 /* See if source address of RTP packet is different than the … … 531 514 */ 532 515 if ((stream->rem_rtp_addr.sin_addr.s_addr != 533 stream->r em_src_rtp.sin_addr.s_addr) ||534 (stream->rem_rtp_addr.sin_port != stream->r em_src_rtp.sin_port))516 stream->rtp_src_addr.sin_addr.s_addr) || 517 (stream->rem_rtp_addr.sin_port != stream->rtp_src_addr.sin_port)) 535 518 { 536 stream->r em_src_cnt++;537 538 if (stream->r em_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) {519 stream->rtp_src_cnt++; 520 521 if (stream->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) { 539 522 540 stream->rem_rtp_addr = stream->r em_src_rtp;541 stream->r em_src_cnt = 0;523 stream->rem_rtp_addr = stream->rtp_src_addr; 524 stream->rtp_src_cnt = 0; 542 525 543 526 PJ_LOG(4,(THIS_FILE,"Remote RTP address switched to %s:%d", 544 pj_inet_ntoa(stream->rem_src_rtp.sin_addr),545 pj_ntohs(stream->r em_src_rtp.sin_port)));527 pj_inet_ntoa(stream->rtp_src_addr.sin_addr), 528 pj_ntohs(stream->rtp_src_addr.sin_port))); 546 529 } 547 530 } 548 531 532 549 533 /* Put to jitter buffer. */ 550 534 pj_mutex_lock( stream->jb_mutex ); 551 status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen, pj_ntohs(hdr->seq)); 535 status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen, 536 pj_ntohs(hdr->seq)); 552 537 pj_mutex_unlock( stream->jb_mutex ); 553 538 554 539 if (status != 0) { 555 540 TRACE_((THIS_FILE, "Jitter buffer put() error", status)); 556 continue;541 goto read_next_packet; 557 542 } 558 } 559 560 return 0; 543 544 read_next_packet: 545 bytes_read = channel->in_pkt_size; 546 stream->rtp_addrlen = sizeof(stream->rtp_src_addr); 547 status = pj_ioqueue_recvfrom( stream->rtp_key, 548 &stream->rtp_op_key, 549 channel->in_pkt, 550 &bytes_read, 0, 551 &stream->rtp_src_addr, 552 &stream->rtp_addrlen); 553 554 } while (status == PJ_SUCCESS); 555 556 if (status != PJ_SUCCESS && status != PJ_EPENDING) { 557 char errmsg[PJ_ERR_MSG_SIZE]; 558 559 pj_strerror(status, errmsg, sizeof(errmsg)); 560 PJ_LOG(4,(THIS_FILE, "Error reading RTP packet: %s [status=%d]", 561 errmsg, status)); 562 } 563 } 564 565 566 /* 567 * This callback is called by ioqueue framework on receipt of packets 568 * in the RTCP socket. 569 */ 570 static void on_rx_rtcp( pj_ioqueue_key_t *key, 571 pj_ioqueue_op_key_t *op_key, 572 pj_ssize_t bytes_read) 573 { 574 PJ_UNUSED_ARG(key); 575 PJ_UNUSED_ARG(op_key); 576 PJ_UNUSED_ARG(bytes_read); 561 577 } 562 578 … … 642 658 pjmedia_stream *stream; 643 659 pjmedia_codec_param codec_param; 660 pj_ioqueue_callback ioqueue_cb; 644 661 pj_status_t status; 645 662 … … 668 685 669 686 /* Init stream: */ 670 687 stream->endpt = endpt; 688 stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); 671 689 stream->dir = info->dir; 672 690 stream->user_data = user_data; 673 stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt);674 691 stream->skinfo = info->sock_info; 675 692 stream->rem_rtp_addr = info->rem_addr; … … 733 750 734 751 735 /* Create jitter buffer thread: */736 737 status = pj_thread_create(pool, "decode",738 &jitter_buffer_thread, stream,739 0, PJ_THREAD_SUSPENDED, &stream->thread);740 if (status != PJ_SUCCESS)741 goto err_cleanup;742 743 744 752 /* Create decoder channel: */ 745 753 … … 757 765 goto err_cleanup; 758 766 759 /* Resume jitter buffer thread. */ 760 status = pj_thread_resume( stream->thread ); 767 /* Register RTP socket to ioqueue */ 768 pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); 769 ioqueue_cb.on_read_complete = &on_rx_rtp; 770 771 status = pj_ioqueue_register_sock( pool, 772 pjmedia_endpt_get_ioqueue(endpt), 773 stream->skinfo.rtp_sock, 774 stream, &ioqueue_cb, &stream->rtp_key); 761 775 if (status != PJ_SUCCESS) 762 776 goto err_cleanup; 777 778 /* Init pending operation key. */ 779 pj_ioqueue_op_key_init(&stream->rtp_op_key, sizeof(stream->rtp_op_key)); 780 781 /* Bootstrap the first recvfrom() operation. */ 782 on_rx_rtp( stream->rtp_key, &stream->rtp_op_key, 0); 783 784 785 /* Register RTCP socket to ioqueue. */ 786 if (stream->skinfo.rtcp_sock != PJ_INVALID_SOCKET) { 787 pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); 788 ioqueue_cb.on_read_complete = &on_rx_rtcp; 789 790 status = pj_ioqueue_register_sock( pool, 791 pjmedia_endpt_get_ioqueue(endpt), 792 stream->skinfo.rtcp_sock, 793 stream, &ioqueue_cb, 794 &stream->rtcp_key); 795 if (status != PJ_SUCCESS) 796 goto err_cleanup; 797 } 798 799 /* Init pending operation key. */ 800 pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key)); 801 802 /* Bootstrap the first recvfrom() operation. */ 803 on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0); 763 804 764 805 /* Success! */ … … 781 822 PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL); 782 823 783 /* Signal threads to quit. */ 784 785 stream->quit_flag = 1; 786 787 788 /* Close encoding sound stream. */ 789 790 /* 791 if (stream->enc && stream->enc->snd_stream) { 792 793 pjmedia_snd_stream_stop(stream->enc->snd_stream); 794 pjmedia_snd_stream_close(stream->enc->snd_stream); 795 stream->enc->snd_stream = NULL; 796 797 } 798 */ 799 800 /* Close decoding sound stream. */ 801 802 /* 803 if (stream->dec && stream->dec->snd_stream) { 804 805 pjmedia_snd_stream_stop(stream->dec->snd_stream); 806 pjmedia_snd_stream_close(stream->dec->snd_stream); 807 stream->dec->snd_stream = NULL; 808 809 } 810 */ 811 812 /* Wait for jitter buffer thread to quit: */ 813 814 if (stream->thread) { 815 pj_thread_join(stream->thread); 816 pj_thread_destroy(stream->thread); 817 stream->thread = NULL; 824 825 /* This function may be called when stream is partly initialized. */ 826 if (stream->jb_mutex) 827 pj_mutex_lock(stream->jb_mutex); 828 829 830 /* Unregister from ioqueue. */ 831 if (stream->rtp_key) { 832 pj_ioqueue_unregister(stream->rtp_key); 833 stream->rtp_key = NULL; 834 } 835 if (stream->rtcp_key) { 836 pj_ioqueue_unregister(stream->rtcp_key); 837 stream->rtcp_key = NULL; 818 838 } 819 839
Note: See TracChangeset
for help on using the changeset viewer.