Changeset 452 for pjproject/trunk/pjmedia/src/pjmedia/stream.c
- Timestamp:
- May 17, 2006 5:17:39 PM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/trunk/pjmedia/src/pjmedia/stream.c
r450 r452 86 86 void *user_data; /**< User data. */ 87 87 88 pjmedia_transport *transport; /**< Stream transport. */ 89 88 90 pjmedia_codec *codec; /**< Codec instance being used. */ 89 91 pjmedia_codec_param codec_param; /**< Codec param. */ … … 92 94 pjmedia_jbuf *jb; /**< Jitter buffer. */ 93 95 94 pjmedia_sock_info skinfo; /**< Transport info. */95 pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address. */96 pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address. */97 98 99 96 pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */ 100 97 101 pj_ioqueue_key_t *rtp_key; /**< RTP ioqueue key. */102 pj_ioqueue_op_key_t rtp_op_key; /**< The pending read op key. */103 pj_sockaddr_in rtp_src_addr; /**< addr of src pkt from remote*/104 unsigned rtp_src_cnt; /**< if different, # of pkt rcv */105 int rtp_addrlen; /**< Address length. */106 107 pj_ioqueue_key_t *rtcp_key; /**< RTCP ioqueue key. */108 pj_ioqueue_op_key_t rtcp_op_key; /**< The pending read op key. */109 pj_size_t rtcp_pkt_size; /**< Size of RTCP packet buf. */110 char rtcp_pkt[512]; /**< RTCP packet buffer. */111 98 pj_uint32_t rtcp_last_tx; /**< RTCP tx time in timestamp */ 112 99 pj_uint32_t rtcp_interval; /**< Interval, in timestamp. */ 113 int rtcp_addrlen; /**< Address length. */114 100 115 101 /* RFC 2833 DTMF transmission queue: */ … … 424 410 425 411 pjmedia_rtcp_pkt *rtcp_pkt; 426 pj_ssize_t size;427 412 int len; 428 pj_status_t status;429 413 430 414 pjmedia_rtcp_build_rtcp(&stream->rtcp, &rtcp_pkt, &len); 431 size = len; 432 status = pj_sock_sendto(stream->skinfo.rtcp_sock, rtcp_pkt, &size, 0, 433 &stream->rem_rtcp_addr, 434 sizeof(stream->rem_rtcp_addr)); 435 #if 0 436 if (status != PJ_SUCCESS) { 437 char errmsg[PJ_ERR_MSG_SIZE]; 438 439 pj_strerror(status, errmsg, sizeof(errmsg)); 440 PJ_LOG(4,(port->info.name.ptr, "Error sending RTCP: %s [%d]", 441 errmsg, status)); 442 } 443 #endif 415 416 (*stream->transport->op->send_rtcp)(stream->transport, 417 rtcp_pkt, len); 444 418 445 419 stream->rtcp_last_tx = timestamp; … … 587 561 /* Send. */ 588 562 sent = frame_out.size+sizeof(pjmedia_rtp_hdr); 589 status = pj_sock_sendto(stream->skinfo.rtp_sock, channel->out_pkt, 590 &sent, 0, &stream->rem_rtp_addr, 591 sizeof(stream->rem_rtp_addr)); 592 if (status != PJ_SUCCESS) 593 return status; 563 564 (*stream->transport->op->send_rtp)(stream->transport, 565 channel->out_pkt, sent); 566 594 567 595 568 /* Update stat */ … … 683 656 684 657 /* 685 * This callback is called by ioqueue frameworkon receipt of packets658 * This callback is called by stream transport on receipt of packets 686 659 * in the RTP socket. 687 660 */ 688 static void on_rx_rtp( pj _ioqueue_key_t *key,689 pj_ioqueue_op_key_t *op_key, 661 static void on_rx_rtp( pjmedia_stream *stream, 662 const void *pkt, 690 663 pj_ssize_t bytes_read) 691 664 692 665 { 693 pjmedia_stream *stream = pj_ioqueue_get_user_data(key);694 666 pjmedia_channel *channel = stream->dec; 667 const pjmedia_rtp_hdr *hdr; 668 const void *payload; 669 unsigned payloadlen; 670 pjmedia_rtp_status seq_st; 695 671 pj_status_t status; 696 672 697 698 PJ_UNUSED_ARG(op_key); 699 700 701 /* 702 * Loop while we have packet. 673 674 /* Update RTP and RTCP session. */ 675 status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, bytes_read, 676 &hdr, &payload, &payloadlen); 677 if (status != PJ_SUCCESS) { 678 LOGERR_((stream->port.info.name.ptr, "RTP decode error", status)); 679 return; 680 } 681 682 683 /* Inform RTCP session */ 684 pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), 685 pj_ntohl(hdr->ts), payloadlen); 686 687 /* Handle incoming DTMF. */ 688 if (hdr->pt == stream->rx_event_pt) { 689 handle_incoming_dtmf(stream, payload, payloadlen); 690 return; 691 } 692 693 694 /* Update RTP session (also checks if RTP session can accept 695 * the incoming packet. 703 696 */ 704 do { 705 const pjmedia_rtp_hdr *hdr; 706 const void *payload; 707 unsigned payloadlen; 708 pjmedia_rtp_status seq_st; 709 710 /* Go straight to read next packet if bytes_read == 0. 697 pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st); 698 if (seq_st.status.value) { 699 TRC_ ((stream->port.info.name.ptr, 700 "RTP status: badpt=%d, badssrc=%d, dup=%d, " 701 "outorder=%d, probation=%d, restart=%d", 702 seq_st.status.flag.badpt, 703 seq_st.status.flag.badssrc, 704 seq_st.status.flag.dup, 705 seq_st.status.flag.outorder, 706 seq_st.status.flag.probation, 707 seq_st.status.flag.restart)); 708 709 if (seq_st.status.flag.badpt) { 710 PJ_LOG(4,(stream->port.info.name.ptr, 711 "Bad RTP pt %d (expecting %d)", 712 hdr->pt, channel->rtp.out_pt)); 713 } 714 } 715 716 /* Skip bad RTP packet */ 717 if (seq_st.status.flag.bad) 718 return; 719 720 721 /* Put "good" packet to jitter buffer, or reset the jitter buffer 722 * when RTP session is restarted. 723 */ 724 pj_mutex_lock( stream->jb_mutex ); 725 if (seq_st.status.flag.restart) { 726 status = pjmedia_jbuf_reset(stream->jb); 727 PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); 728 729 } else { 730 /* 731 * Packets may contain more than one frames, while the jitter 732 * buffer can only take one frame per "put" operation. So we need 733 * to ask the codec to "parse" the payload into multiple frames. 711 734 */ 712 if (bytes_read == 0) 713 goto read_next_packet; 714 715 if (bytes_read < 0) 716 goto read_next_packet; 717 718 /* Update RTP and RTCP session. */ 719 status = pjmedia_rtp_decode_rtp(&channel->rtp, 720 channel->in_pkt, bytes_read, 721 &hdr, &payload, &payloadlen); 735 enum { MAX = 16 }; 736 pj_timestamp ts; 737 unsigned i, count = MAX; 738 unsigned samples_per_frame; 739 pjmedia_frame frames[MAX]; 740 741 /* Get the timestamp of the first sample */ 742 ts.u64 = pj_ntohl(hdr->ts); 743 744 /* Parse the payload. */ 745 status = (*stream->codec->op->parse)(stream->codec, 746 (void*)payload, 747 payloadlen, 748 &ts, 749 &count, 750 frames); 722 751 if (status != PJ_SUCCESS) { 723 LOGERR_((stream->port.info.name.ptr, "RTP decode error", status)); 724 goto read_next_packet; 752 LOGERR_((stream->port.info.name.ptr, 753 "Codec parse() error", 754 status)); 755 count = 0; 725 756 } 726 757 727 728 /* Inform RTCP session */ 729 pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), 730 pj_ntohl(hdr->ts), payloadlen); 731 732 /* Handle incoming DTMF. */ 733 if (hdr->pt == stream->rx_event_pt) { 734 handle_incoming_dtmf(stream, payload, payloadlen); 735 goto read_next_packet; 758 /* Put each frame to jitter buffer. */ 759 samples_per_frame = stream->codec_param.info.frm_ptime * 760 stream->codec_param.info.clock_rate * 761 stream->codec_param.info.channel_cnt / 762 1000; 763 764 for (i=0; i<count; ++i) { 765 unsigned ext_seq; 766 767 ext_seq = (unsigned)(frames[i].timestamp.u64 / 768 samples_per_frame); 769 pjmedia_jbuf_put_frame(stream->jb, frames[i].buf, 770 frames[i].size, ext_seq); 771 736 772 } 737 738 739 /* Update RTP session (also checks if RTP session can accept 740 * the incoming packet. 741 */ 742 pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st); 743 if (seq_st.status.value) { 744 TRC_ ((stream->port.info.name.ptr, 745 "RTP status: badpt=%d, badssrc=%d, dup=%d, " 746 "outorder=%d, probation=%d, restart=%d", 747 seq_st.status.flag.badpt, 748 seq_st.status.flag.badssrc, 749 seq_st.status.flag.dup, 750 seq_st.status.flag.outorder, 751 seq_st.status.flag.probation, 752 seq_st.status.flag.restart)); 753 754 if (seq_st.status.flag.badpt) { 755 PJ_LOG(4,(stream->port.info.name.ptr, 756 "Bad RTP pt %d (expecting %d)", 757 hdr->pt, channel->rtp.out_pt)); 758 } 759 } 760 761 /* Skip bad RTP packet */ 762 if (seq_st.status.flag.bad) 763 goto read_next_packet; 764 765 766 /* See if source address of RTP packet is different than the 767 * configured address. 768 */ 769 if ((stream->rem_rtp_addr.sin_addr.s_addr != 770 stream->rtp_src_addr.sin_addr.s_addr) || 771 (stream->rem_rtp_addr.sin_port != stream->rtp_src_addr.sin_port)) 772 { 773 stream->rtp_src_cnt++; 774 775 if (stream->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) { 776 777 stream->rem_rtp_addr = stream->rtp_src_addr; 778 stream->rtp_src_cnt = 0; 779 780 PJ_LOG(4,(stream->port.info.name.ptr, 781 "Remote RTP address switched to %s:%d", 782 pj_inet_ntoa(stream->rtp_src_addr.sin_addr), 783 pj_ntohs(stream->rtp_src_addr.sin_port))); 784 } 785 } 786 787 788 789 /* Put "good" packet to jitter buffer, or reset the jitter buffer 790 * when RTP session is restarted. 791 */ 792 pj_mutex_lock( stream->jb_mutex ); 793 if (seq_st.status.flag.restart) { 794 status = pjmedia_jbuf_reset(stream->jb); 795 PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); 796 797 } else { 798 /* 799 * Packets may contain more than one frames, while the jitter 800 * buffer can only take one frame per "put" operation. So we need 801 * to ask the codec to "parse" the payload into multiple frames. 802 */ 803 enum { MAX = 16 }; 804 pj_timestamp ts; 805 unsigned i, count = MAX; 806 unsigned samples_per_frame; 807 pjmedia_frame frames[MAX]; 808 809 /* Get the timestamp of the first sample */ 810 ts.u64 = pj_ntohl(hdr->ts); 811 812 /* Parse the payload. */ 813 status = (*stream->codec->op->parse)(stream->codec, 814 (void*)payload, 815 payloadlen, 816 &ts, 817 &count, 818 frames); 819 if (status != PJ_SUCCESS) { 820 LOGERR_((stream->port.info.name.ptr, 821 "Codec parse() error", 822 status)); 823 count = 0; 824 } 825 826 /* Put each frame to jitter buffer. */ 827 samples_per_frame = stream->codec_param.info.frm_ptime * 828 stream->codec_param.info.clock_rate * 829 stream->codec_param.info.channel_cnt / 830 1000; 831 832 for (i=0; i<count; ++i) { 833 unsigned ext_seq; 834 835 ext_seq = (unsigned)(frames[i].timestamp.u64 / 836 samples_per_frame); 837 pjmedia_jbuf_put_frame(stream->jb, frames[i].buf, 838 frames[i].size, ext_seq); 839 840 } 841 } 842 pj_mutex_unlock( stream->jb_mutex ); 843 844 845 /* Check if now is the time to transmit RTCP SR/RR report. 846 * We only do this when stream direction is "decoding only", 847 * because otherwise check_tx_rtcp() will be handled by put_frame() 848 */ 849 if (stream->dir == PJMEDIA_DIR_DECODING) { 850 check_tx_rtcp(stream, pj_ntohl(hdr->ts)); 851 } 852 853 if (status != 0) { 854 LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error", 855 status)); 856 goto read_next_packet; 857 } 858 859 860 read_next_packet: 861 bytes_read = channel->in_pkt_size; 862 stream->rtp_addrlen = sizeof(stream->rtp_src_addr); 863 status = pj_ioqueue_recvfrom( stream->rtp_key, 864 &stream->rtp_op_key, 865 channel->in_pkt, 866 &bytes_read, 0, 867 &stream->rtp_src_addr, 868 &stream->rtp_addrlen); 869 870 if (status != PJ_SUCCESS) { 871 bytes_read = -status; 872 } 873 874 } while (status == PJ_SUCCESS || 875 status == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)); 876 877 if (status != PJ_SUCCESS && status != PJ_EPENDING) { 878 char errmsg[PJ_ERR_MSG_SIZE]; 879 880 pj_strerror(status, errmsg, sizeof(errmsg)); 881 PJ_LOG(4,(stream->port.info.name.ptr, 882 "Error reading RTP packet: %s [status=%d]. " 883 "RTP stream thread quitting!", 884 errmsg, status)); 885 } 886 } 887 888 889 /* 890 * This callback is called by ioqueue framework on receipt of packets 773 } 774 pj_mutex_unlock( stream->jb_mutex ); 775 776 777 /* Check if now is the time to transmit RTCP SR/RR report. 778 * We only do this when stream direction is "decoding only", 779 * because otherwise check_tx_rtcp() will be handled by put_frame() 780 */ 781 if (stream->dir == PJMEDIA_DIR_DECODING) { 782 check_tx_rtcp(stream, pj_ntohl(hdr->ts)); 783 } 784 785 if (status != 0) { 786 LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error", 787 status)); 788 return; 789 } 790 } 791 792 793 /* 794 * This callback is called by stream transport on receipt of packets 891 795 * in the RTCP socket. 892 796 */ 893 static void on_rx_rtcp( pj _ioqueue_key_t *key,894 pj_ioqueue_op_key_t *op_key,797 static void on_rx_rtcp( pjmedia_stream *stream, 798 const void *pkt, 895 799 pj_ssize_t bytes_read) 896 800 { 897 pjmedia_stream *stream = pj_ioqueue_get_user_data(key); 898 pj_status_t status; 899 900 PJ_UNUSED_ARG(op_key); 901 902 do { 903 if (bytes_read > 0) { 904 pjmedia_rtcp_rx_rtcp(&stream->rtcp, stream->rtcp_pkt, 905 bytes_read); 906 } 907 908 bytes_read = stream->rtcp_pkt_size; 909 stream->rtcp_addrlen = sizeof(stream->rem_rtcp_addr); 910 status = pj_ioqueue_recvfrom( stream->rtcp_key, 911 &stream->rtcp_op_key, 912 stream->rtcp_pkt, 913 &bytes_read, 0, 914 &stream->rem_rtcp_addr, 915 &stream->rtcp_addrlen); 916 917 } while (status == PJ_SUCCESS); 918 919 if (status != PJ_SUCCESS && status != PJ_EPENDING) { 920 char errmsg[PJ_ERR_MSG_SIZE]; 921 922 pj_strerror(status, errmsg, sizeof(errmsg)); 923 PJ_LOG(4,(stream->port.info.name.ptr, 924 "Error reading RTCP packet: %s [status=%d]", 925 errmsg, status)); 926 } 927 801 pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read); 928 802 } 929 803 … … 994 868 pj_pool_t *pool, 995 869 const pjmedia_stream_info *info, 870 pjmedia_transport *tp, 996 871 void *user_data, 997 872 pjmedia_stream **p_stream) … … 999 874 { 1000 875 pjmedia_stream *stream; 1001 pj_ioqueue_callback ioqueue_cb;1002 pj_uint16_t rtcp_port;1003 876 unsigned jb_init, jb_max, jb_min_pre, jb_max_pre; 1004 877 pj_status_t status; … … 1037 910 stream->dir = info->dir; 1038 911 stream->user_data = user_data; 1039 stream->skinfo = info->sock_info;1040 stream->rem_rtp_addr = info->rem_addr;1041 rtcp_port = (pj_uint16_t) (pj_ntohs(info->rem_addr.sin_port)+1);1042 stream->rem_rtcp_addr = stream->rem_rtp_addr;1043 stream->rem_rtcp_addr.sin_port = pj_htons(rtcp_port);1044 912 stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL + (pj_rand() % 8000)) * 1045 913 info->fmt.clock_rate / 1000; … … 1048 916 stream->rx_event_pt = info->rx_event_pt ? info->rx_event_pt : -1; 1049 917 stream->last_dtmf = -1; 918 919 /* Attach transport */ 920 status = (*tp->op->attach)(tp, stream, &info->rem_addr, 921 sizeof(info->rem_addr), &on_rx_rtp, 922 &on_rx_rtcp); 923 if (status != PJ_SUCCESS) 924 goto err_cleanup; 925 926 stream->transport = tp; 1050 927 1051 928 … … 1161 1038 goto err_cleanup; 1162 1039 1163 /* Register RTP socket to ioqueue */ 1164 pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); 1165 ioqueue_cb.on_read_complete = &on_rx_rtp; 1166 1167 status = pj_ioqueue_register_sock( pool, 1168 pjmedia_endpt_get_ioqueue(endpt), 1169 stream->skinfo.rtp_sock, 1170 stream, &ioqueue_cb, &stream->rtp_key); 1171 if (status != PJ_SUCCESS) 1172 goto err_cleanup; 1173 1174 /* Init pending operation key. */ 1175 pj_ioqueue_op_key_init(&stream->rtp_op_key, sizeof(stream->rtp_op_key)); 1176 1177 /* Bootstrap the first recvfrom() operation. */ 1178 on_rx_rtp( stream->rtp_key, &stream->rtp_op_key, 0); 1179 1180 1181 /* Register RTCP socket to ioqueue. */ 1182 if (stream->skinfo.rtcp_sock != PJ_INVALID_SOCKET) { 1183 pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); 1184 ioqueue_cb.on_read_complete = &on_rx_rtcp; 1185 1186 status = pj_ioqueue_register_sock( pool, 1187 pjmedia_endpt_get_ioqueue(endpt), 1188 stream->skinfo.rtcp_sock, 1189 stream, &ioqueue_cb, 1190 &stream->rtcp_key); 1191 if (status != PJ_SUCCESS) 1192 goto err_cleanup; 1193 } 1194 1195 /* Init pending operation key. */ 1196 pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key)); 1197 1198 stream->rtcp_pkt_size = sizeof(stream->rtcp_pkt); 1199 1200 /* Bootstrap the first recvfrom() operation. */ 1201 on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0); 1040 1202 1041 1203 1042 /* Success! */ … … 1228 1067 1229 1068 1230 /* Unregister from ioqueue. */ 1231 if (stream->rtp_key) { 1232 pj_ioqueue_unregister(stream->rtp_key); 1233 stream->rtp_key = NULL; 1234 } 1235 if (stream->rtcp_key) { 1236 pj_ioqueue_unregister(stream->rtcp_key); 1237 stream->rtcp_key = NULL; 1069 /* Detach from transport */ 1070 if (stream->transport) { 1071 (*stream->transport->op->detach)(stream->transport, stream); 1072 stream->transport = NULL; 1238 1073 } 1239 1074 … … 1266 1101 *p_port = &stream->port; 1267 1102 return PJ_SUCCESS; 1103 } 1104 1105 1106 /* 1107 * Get the transport object 1108 */ 1109 PJ_DEF(pjmedia_transport*) pjmedia_stream_get_transport(pjmedia_stream *st) 1110 { 1111 return st->transport; 1268 1112 } 1269 1113
Note: See TracChangeset
for help on using the changeset viewer.