X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Frtpsource.c;h=7079e168a73e0606e979826aa7d97149930db38b;hb=0de3ebc5b26b3e3404386db6e058b42150ad7181;hp=3dd95173f263cbccc171a6e4d09be0bf55e05ea6;hpb=c60038f188a1128d5cb6e981a690e09bba91f2ad;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 3dd9517..7079e16 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -1,5 +1,7 @@ /* GStreamer * Copyright (C) <2007> Wim Taymans + * Copyright (C) 2015 Kurento (http://kurento.org/) + * @author: Miguel París * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -40,6 +42,9 @@ enum #define DEFAULT_IS_SENDER FALSE #define DEFAULT_SDES NULL #define DEFAULT_PROBATION RTP_DEFAULT_PROBATION +#define DEFAULT_MAX_DROPOUT_TIME 60000 +#define DEFAULT_MAX_MISORDER_TIME 2000 +#define DEFAULT_DISABLE_RTCP FALSE enum { @@ -50,7 +55,10 @@ enum PROP_IS_SENDER, PROP_SDES, PROP_STATS, - PROP_PROBATION + PROP_PROBATION, + PROP_MAX_DROPOUT_TIME, + PROP_MAX_MISORDER_TIME, + PROP_DISABLE_RTCP }; /* GObject vmethods */ @@ -102,7 +110,7 @@ rtp_source_class_init (RTPSourceClass * klass) * The current SDES items of the source. Returns a structure with name * application/x-rtp-source-sdes and may contain the following fields: * - * 'cname' G_TYPE_STRING : The canonical name + * 'cname' G_TYPE_STRING : The canonical name in the form user@host * 'name' G_TYPE_STRING : The user name * 'email' G_TYPE_STRING : The user's electronic mail address * 'phone' G_TYPE_STRING : The user's phone number @@ -110,7 +118,7 @@ rtp_source_class_init (RTPSourceClass * klass) * 'tool' G_TYPE_STRING : The name of application or tool * 'note' G_TYPE_STRING : A notice about the source * - * other fields may be present and these represent private items in + * Other fields may be present and these represent private items in * the SDES where the field name is the prefix. */ g_object_class_install_property (gobject_class, PROP_SDES, @@ -121,25 +129,37 @@ rtp_source_class_init (RTPSourceClass * klass) /** * RTPSource::stats * - * The statistics of the source. This property returns a GstStructure with - * name application/x-rtp-source-stats with the following fields: + * This property returns a GstStructure named application/x-rtp-source-stats with + * fields useful for statistics and diagnostics. * - * "ssrc" G_TYPE_UINT The SSRC of this source - * "internal" G_TYPE_BOOLEAN If this source is a source of the session - * "validated" G_TYPE_BOOLEAN If the source is validated - * "received-bye" G_TYPE_BOOLEAN If we received a BYE from this source - * "is-csrc" G_TYPE_BOOLEAN If this source was found as CSRC - * "is-sender" G_TYPE_BOOLEAN If this source is a sender + * Take note of each respective field's units: + * + * - NTP times are in the appropriate 32-bit or 64-bit fixed-point format + * starting from January 1, 1970 (except for timespans). + * - RTP times are in clock rate units (i.e. clock rate = 1 second) + * starting at a random offset. + * - For fields indicating packet loss, note that late packets are not considered lost, + * and duplicates are not taken into account. Hence, the loss may be negative + * if there are duplicates. + * + * The following fields are always present. + * + * "ssrc" G_TYPE_UINT the SSRC of this source + * "internal" G_TYPE_BOOLEAN this source is a source of the session + * "validated" G_TYPE_BOOLEAN the source is validated + * "received-bye" G_TYPE_BOOLEAN we received a BYE from this source + * "is-csrc" G_TYPE_BOOLEAN this source was found as CSRC + * "is-sender" G_TYPE_BOOLEAN this source is a sender * "seqnum-base" G_TYPE_INT first seqnum if known * "clock-rate" G_TYPE_INT the clock rate of the media * - * The following two fields are only present when known. + * The following fields are only present when known. * * "rtp-from" G_TYPE_STRING where we received the last RTP packet from * "rtcp-from" G_TYPE_STRING where we received the last RTCP packet from * * The following fields make sense for internal sources and will only increase - * when "is-sender" is TRUE: + * when "is-sender" is TRUE. * * "octets-sent" G_TYPE_UINT64 number of bytes we sent * "packets-sent" G_TYPE_UINT64 number of packets we sent @@ -153,15 +173,15 @@ rtp_source_class_init (RTPSourceClass * klass) * Following fields are updated when "is-sender" is TRUE. * * "bitrate" G_TYPE_UINT64 bitrate in bits per second - * "jitter" G_TYPE_UINT estimated jitter + * "jitter" G_TYPE_UINT estimated jitter (in clock rate units) * "packets-lost" G_TYPE_INT estimated amount of packets lost * * The last SR report this source sent. This only updates when "is-sender" is * TRUE. * * "have-sr" G_TYPE_BOOLEAN the source has sent SR - * "sr-ntptime" G_TYPE_UINT64 ntptime of SR - * "sr-rtptime" G_TYPE_UINT rtptime of SR + * "sr-ntptime" G_TYPE_UINT64 NTP time of SR (in NTP Timestamp Format, 32.32 fixed point) + * "sr-rtptime" G_TYPE_UINT RTP time of SR (in clock rate units) * "sr-octet-count" G_TYPE_UINT the number of bytes in the SR * "sr-packet-count" G_TYPE_UINT the number of packets in the SR * @@ -170,30 +190,31 @@ rtp_source_class_init (RTPSourceClass * klass) * These values are only updated when the source is sending. * * "sent-rb" G_TYPE_BOOLEAN we have sent an RB - * "sent-rb-fractionlost" G_TYPE_UINT calculated lost fraction + * "sent-rb-fractionlost" G_TYPE_UINT calculated lost 8-bit fraction * "sent-rb-packetslost" G_TYPE_INT lost packets * "sent-rb-exthighestseq" G_TYPE_UINT last seen seqnum - * "sent-rb-jitter" G_TYPE_UINT jitter - * "sent-rb-lsr" G_TYPE_UINT last SR time - * "sent-rb-dlsr" G_TYPE_UINT delay since last SR + * "sent-rb-jitter" G_TYPE_UINT jitter (in clock rate units) + * "sent-rb-lsr" G_TYPE_UINT last SR time (seconds in NTP Short Format, 16.16 fixed point) + * "sent-rb-dlsr" G_TYPE_UINT delay since last SR (seconds in NTP Short Format, 16.16 fixed point) * * The following fields are only present for non-internal sources and * represents the last RB that this source sent. This is only updated * when the source is receiving data and sending RB blocks. * * "have-rb" G_TYPE_BOOLEAN the source has sent RB - * "rb-fractionlost" G_TYPE_UINT lost fraction + * "rb-fractionlost" G_TYPE_UINT lost 8-bit fraction * "rb-packetslost" G_TYPE_INT lost packets * "rb-exthighestseq" G_TYPE_UINT highest received seqnum - * "rb-jitter" G_TYPE_UINT reception jitter - * "rb-lsr" G_TYPE_UINT last SR time - * "rb-dlsr" G_TYPE_UINT delay since last SR + * "rb-jitter" G_TYPE_UINT reception jitter (in clock rate units) + * "rb-lsr" G_TYPE_UINT last SR time (seconds in NTP Short Format, 16.16 fixed point) + * "rb-dlsr" G_TYPE_UINT delay since last SR (seconds in NTP Short Format, 16.16 fixed point) * - * The round trip of this source. This is calculated from the last RB - * values and the recption time of the last RB packet. Only present for + * The round trip of this source is calculated from the last RB + * values and the reception time of the last RB packet. It is only present for * non-internal sources. * - * "rb-round-trip" G_TYPE_UINT the round trip time in nanoseconds + * "rb-round-trip" G_TYPE_UINT the round-trip time (seconds in NTP Short Format, 16.16 fixed point) + * */ g_object_class_install_property (gobject_class, PROP_STATS, g_param_spec_boxed ("stats", "Stats", @@ -206,6 +227,28 @@ rtp_source_class_init (RTPSourceClass * klass) 0, G_MAXUINT, DEFAULT_PROBATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME, + g_param_spec_uint ("max-dropout-time", "Max dropout time", + "The maximum time (milliseconds) of missing packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME, + g_param_spec_uint ("max-misorder-time", "Max misorder time", + "The maximum time (milliseconds) of misordered packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * RTPSession::disable-rtcp: + * + * Allow disabling the sending of RTCP packets for this source. + */ + g_object_class_install_property (gobject_class, PROP_DISABLE_RTCP, + g_param_spec_boolean ("disable-rtcp", "Disable RTCP", + "Disable sending RTCP packets for this source", + DEFAULT_DISABLE_RTCP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source"); } @@ -224,6 +267,9 @@ rtp_source_reset (RTPSource * src) src->bye_reason = NULL; src->sent_bye = FALSE; g_hash_table_remove_all (src->reported_in_sr_of); + g_queue_foreach (src->retained_feedback, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (src->retained_feedback); + src->last_rtptime = -1; src->stats.cycles = -1; src->stats.jitter = 0; @@ -240,18 +286,22 @@ rtp_source_reset (RTPSource * src) src->stats.sent_pli_count = 0; src->stats.sent_fir_count = 0; + src->stats.sent_nack_count = 0; + src->stats.recv_nack_count = 0; } static void rtp_source_init (RTPSource * src) { - /* sources are initialy on probation until we receive enough valid RTP + /* sources are initially on probation until we receive enough valid RTP * packets or a valid RTCP packet */ src->validated = FALSE; src->internal = FALSE; src->probation = DEFAULT_PROBATION; src->curr_probation = src->probation; src->closing = FALSE; + src->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME; + src->max_misorder_time = DEFAULT_MAX_MISORDER_TIME; src->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes"); @@ -259,14 +309,18 @@ rtp_source_init (RTPSource * src) src->clock_rate = -1; src->packets = g_queue_new (); src->seqnum_offset = -1; - src->last_rtptime = -1; src->retained_feedback = g_queue_new (); - src->nacks = g_array_new (FALSE, FALSE, sizeof (guint32)); + src->nacks = g_array_new (FALSE, FALSE, sizeof (guint16)); + src->nack_deadlines = g_array_new (FALSE, FALSE, sizeof (GstClockTime)); src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal); + src->last_keyframe_request = GST_CLOCK_TIME_NONE; + rtp_source_reset (src); + + src->pt_set = FALSE; } void @@ -298,6 +352,7 @@ rtp_source_finalize (GObject * object) g_queue_free (src->retained_feedback); g_array_free (src->nacks, TRUE); + g_array_free (src->nack_deadlines, TRUE); if (src->rtp_from) g_object_unref (src->rtp_from); @@ -367,7 +422,9 @@ rtp_source_create_stats (RTPSource * src) "sent-pli-count", G_TYPE_UINT, src->stats.sent_pli_count, "recv-pli-count", G_TYPE_UINT, src->stats.recv_pli_count, "sent-fir-count", G_TYPE_UINT, src->stats.sent_fir_count, - "recv-fir-count", G_TYPE_UINT, src->stats.recv_fir_count, NULL); + "recv-fir-count", G_TYPE_UINT, src->stats.recv_fir_count, + "sent-nack-count", G_TYPE_UINT, src->stats.sent_nack_count, + "recv-nack-count", G_TYPE_UINT, src->stats.recv_nack_count, NULL); /* get the last SR. */ have_sr = rtp_source_get_last_sr (src, &time, &ntptime, &rtptime, @@ -492,6 +549,15 @@ rtp_source_set_property (GObject * object, guint prop_id, case PROP_PROBATION: src->probation = g_value_get_uint (value); break; + case PROP_MAX_DROPOUT_TIME: + src->max_dropout_time = g_value_get_uint (value); + break; + case PROP_MAX_MISORDER_TIME: + src->max_misorder_time = g_value_get_uint (value); + break; + case PROP_DISABLE_RTCP: + src->disable_rtcp = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -528,6 +594,15 @@ rtp_source_get_property (GObject * object, guint prop_id, case PROP_PROBATION: g_value_set_uint (value, src->probation); break; + case PROP_MAX_DROPOUT_TIME: + g_value_set_uint (value, src->max_dropout_time); + break; + case PROP_MAX_MISORDER_TIME: + g_value_set_uint (value, src->max_misorder_time); + break; + case PROP_DISABLE_RTCP: + g_value_set_boolean (value, src->disable_rtcp); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -865,6 +940,7 @@ get_clock_rate (RTPSource * src, guint8 payload) GST_DEBUG ("got clock-rate %d", clock_rate); src->clock_rate = clock_rate; + gst_rtp_packet_rate_ctx_reset (&src->packet_rate_ctx, clock_rate); } return src->clock_rate; } @@ -992,16 +1068,30 @@ do_bitrate_estimation (RTPSource * src, GstClockTime running_time, } static gboolean -update_receiver_stats (RTPSource * src, RTPPacketInfo * pinfo) +update_receiver_stats (RTPSource * src, RTPPacketInfo * pinfo, + gboolean is_receive) { guint16 seqnr, expected; RTPSourceStats *stats; gint16 delta; + gint32 packet_rate, max_dropout, max_misorder; stats = &src->stats; seqnr = pinfo->seqnum; + packet_rate = + gst_rtp_packet_rate_ctx_update (&src->packet_rate_ctx, pinfo->seqnum, + pinfo->rtptime); + max_dropout = + gst_rtp_packet_rate_ctx_get_max_dropout (&src->packet_rate_ctx, + src->max_dropout_time); + max_misorder = + gst_rtp_packet_rate_ctx_get_max_misorder (&src->packet_rate_ctx, + src->max_misorder_time); + GST_TRACE ("SSRC %08x, packet_rate: %d, max_dropout: %d, max_misorder: %d", + src->ssrc, packet_rate, max_dropout, max_misorder); + if (stats->cycles == -1) { GST_DEBUG ("received first packet"); /* first time we heard of this source */ @@ -1010,85 +1100,94 @@ update_receiver_stats (RTPSource * src, RTPPacketInfo * pinfo) src->curr_probation = src->probation; } - expected = src->stats.max_seq + 1; - delta = gst_rtp_buffer_compare_seqnum (expected, seqnr); + if (is_receive) { + expected = src->stats.max_seq + 1; + delta = gst_rtp_buffer_compare_seqnum (expected, seqnr); + + /* if we are still on probation, check seqnum */ + if (src->curr_probation) { + /* when in probation, we require consecutive seqnums */ + if (delta == 0) { + /* expected packet */ + GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); + src->curr_probation--; + if (seqnr < stats->max_seq) { + /* sequence number wrapped - count another 64K cycle. */ + stats->cycles += RTP_SEQ_MOD; + } + src->stats.max_seq = seqnr; + + if (src->curr_probation == 0) { + GST_DEBUG ("probation done!"); + init_seq (src, seqnr); + } else { + GstBuffer *q; + + GST_DEBUG ("probation %d: queue packet", src->curr_probation); + /* when still in probation, keep packets in a list. */ + g_queue_push_tail (src->packets, pinfo->data); + pinfo->data = NULL; + /* remove packets from queue if there are too many */ + while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { + q = g_queue_pop_head (src->packets); + gst_buffer_unref (q); + } + goto done; + } + } else { + /* unexpected seqnum in probation + * + * There is no need to clean the queue at this point because the + * invalid packets in the queue are not going to be pushed as we are + * still in probation, and some cleanup will be performed at future + * probation attempts anyway if there are too many old packets in the + * queue. + */ + goto probation_seqnum; + } + } else if (delta >= 0 && delta < max_dropout) { + /* Clear bad packets */ + stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */ + g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (src->packets); - /* if we are still on probation, check seqnum */ - if (src->curr_probation) { - /* when in probation, we require consecutive seqnums */ - if (delta == 0) { - /* expected packet */ - GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); - src->curr_probation--; + /* in order, with permissible gap */ if (seqnr < stats->max_seq) { /* sequence number wrapped - count another 64K cycle. */ stats->cycles += RTP_SEQ_MOD; } - src->stats.max_seq = seqnr; - - if (src->curr_probation == 0) { - GST_DEBUG ("probation done!"); + stats->max_seq = seqnr; + } else if (delta < -max_misorder || delta >= max_dropout) { + /* the sequence number made a very large jump */ + if (seqnr == stats->bad_seq && src->packets->head) { + /* two sequential packets -- assume that the other side + * restarted without telling us so just re-sync + * (i.e., pretend this was the first packet). */ init_seq (src, seqnr); } else { - GstBuffer *q; - - GST_DEBUG ("probation %d: queue packet", src->curr_probation); - /* when still in probation, keep packets in a list. */ + /* unacceptable jump */ + stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1); + g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (src->packets); g_queue_push_tail (src->packets, pinfo->data); pinfo->data = NULL; - /* remove packets from queue if there are too many */ - while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { - q = g_queue_pop_head (src->packets); - gst_buffer_unref (q); - } - goto done; + goto bad_sequence; } - } else { - /* unexpected seqnum in probation */ - goto probation_seqnum; - } - } else if (delta >= 0 && delta < RTP_MAX_DROPOUT) { - /* Clear bad packets */ - stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */ - g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL); - g_queue_clear (src->packets); - - /* in order, with permissible gap */ - if (seqnr < stats->max_seq) { - /* sequence number wrapped - count another 64K cycle. */ - stats->cycles += RTP_SEQ_MOD; - } - stats->max_seq = seqnr; - } else if (delta < -RTP_MAX_MISORDER || delta >= RTP_MAX_DROPOUT) { - /* the sequence number made a very large jump */ - if (seqnr == stats->bad_seq && src->packets->head) { - /* two sequential packets -- assume that the other side - * restarted without telling us so just re-sync - * (i.e., pretend this was the first packet). */ - init_seq (src, seqnr); - } else { - /* unacceptable jump */ - stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1); + } else { /* delta < 0 && delta >= -max_misorder */ + /* Clear bad packets */ + stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */ g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL); g_queue_clear (src->packets); - g_queue_push_tail (src->packets, pinfo->data); - pinfo->data = NULL; - goto bad_sequence; + + /* duplicate or reordered packet, will be filtered by jitterbuffer. */ + GST_INFO ("duplicate or reordered packet (seqnr %u, expected %u)", + seqnr, expected); } - } else { /* delta < 0 && delta >= -RTP_MAX_MISORDER */ - /* Clear bad packets */ - stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */ - g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL); - g_queue_clear (src->packets); - - /* duplicate or reordered packet, will be filtered by jitterbuffer. */ - GST_WARNING ("duplicate or reordered packet (seqnr %u, expected %u)", seqnr, - expected); } src->stats.octets_received += pinfo->payload_len; src->stats.bytes_received += pinfo->bytes; - src->stats.packets_received++; + src->stats.packets_received += pinfo->packets; /* for the bitrate estimation */ src->bytes_received += pinfo->payload_len; @@ -1104,12 +1203,16 @@ done: } bad_sequence: { - GST_WARNING ("unacceptable seqnum received"); + GST_WARNING + ("unacceptable seqnum received (seqnr %u, delta %d, packet_rate: %d, max_dropout: %d, max_misorder: %d)", + seqnr, delta, packet_rate, max_dropout, max_misorder); return FALSE; } probation_seqnum: { - GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected); + GST_WARNING ("probation: seqnr %d != expected %d " + "(SSRC %u curr_probation %i probation %i)", seqnr, expected, src->ssrc, + src->curr_probation, src->probation); src->curr_probation = src->probation; src->stats.max_seq = seqnr; return FALSE; @@ -1121,7 +1224,7 @@ probation_seqnum: * @src: an #RTPSource * @pinfo: an #RTPPacketInfo * - * Let @src handle the incomming RTP packet described in @pinfo. + * Let @src handle the incoming RTP packet described in @pinfo. * * Returns: a #GstFlowReturn. */ @@ -1133,7 +1236,7 @@ rtp_source_process_rtp (RTPSource * src, RTPPacketInfo * pinfo) g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR); - if (!update_receiver_stats (src, pinfo)) + if (!update_receiver_stats (src, pinfo, TRUE)) return GST_FLOW_OK; /* the source that sent the packet must be a sender */ @@ -1179,12 +1282,10 @@ rtp_source_mark_bye (RTPSource * src, const gchar * reason) /** * rtp_source_send_rtp: * @src: an #RTPSource - * @data: an RTP buffer or a list of RTP buffers - * @is_list: if @data is a buffer or list - * @running_time: the running time of @data + * @pinfo: an #RTPPacketInfo * - * Send @data (an RTP buffer or list of buffers) originating from @src. - * This will make @src a sender. This function takes ownership of @data and + * Send data (an RTP buffer or buffer list from @pinfo) originating from @src. + * This will make @src a sender. This function takes ownership of the data and * modifies the SSRC in the RTP packet to that of @src when needed. * * Returns: a #GstFlowReturn. @@ -1203,12 +1304,22 @@ rtp_source_send_rtp (RTPSource * src, RTPPacketInfo * pinfo) /* we are a sender now */ src->is_sender = TRUE; + /* we are also a receiver of our packets */ + if (!update_receiver_stats (src, pinfo, FALSE)) + return GST_FLOW_OK; + + if (src->pt_set && src->pt != pinfo->pt) { + GST_WARNING ("Changing pt from %u to %u for SSRC %u", src->pt, pinfo->pt, + src->ssrc); + } + + src->pt = pinfo->pt; + src->pt_set = TRUE; + /* update stats for the SR */ src->stats.packets_sent += pinfo->packets; src->stats.octets_sent += pinfo->payload_len; src->bytes_sent += pinfo->payload_len; - /* we are also a receiver of our packets */ - update_receiver_stats (src, pinfo); running_time = pinfo->running_time; @@ -1262,10 +1373,10 @@ no_callback: * rtp_source_process_sr: * @src: an #RTPSource * @time: time of packet arrival - * @ntptime: the NTP time in 32.32 fixed point - * @rtptime: the RTP time + * @ntptime: the NTP time (in NTP Timestamp Format, 32.32 fixed point) + * @rtptime: the RTP time (in clock rate units) * @packet_count: the packet count - * @octet_count: the octect count + * @octet_count: the octet count * * Update the sender report in @src. */ @@ -1309,11 +1420,13 @@ rtp_source_process_sr (RTPSource * src, GstClockTime time, guint64 ntptime, * @src: an #RTPSource * @ntpnstime: the current time in nanoseconds since 1970 * @fractionlost: fraction lost since last SR/RR - * @packetslost: the cumululative number of packets lost + * @packetslost: the cumulative number of packets lost * @exthighestseq: the extended last sequence number received - * @jitter: the interarrival jitter - * @lsr: the last SR packet from this source - * @dlsr: the delay since last SR packet + * @jitter: the interarrival jitter (in clock rate units) + * @lsr: the time of the last SR packet on this source + * (in NTP Short Format, 16.16 fixed point) + * @dlsr: the delay since the last SR packet + * (in NTP Short Format, 16.16 fixed point) * * Update the report block in @src. */ @@ -1369,17 +1482,17 @@ rtp_source_process_rb (RTPSource * src, guint64 ntpnstime, * rtp_source_get_new_sr: * @src: an #RTPSource * @ntpnstime: the current time in nanoseconds since 1970 - * @running_time: the current running_time of the pipeline. - * @ntptime: the NTP time in 32.32 fixed point - * @rtptime: the RTP time corresponding to @ntptime + * @running_time: the current running_time of the pipeline + * @ntptime: the NTP time (in NTP Timestamp Format, 32.32 fixed point) + * @rtptime: the RTP time corresponding to @ntptime (in clock rate units) * @packet_count: the packet count - * @octet_count: the octect count + * @octet_count: the octet count * * Get new values to put into a new SR report from this source. * * @running_time and @ntpnstime are captured at the same time and represent the * running time of the pipeline clock and the absolute current system time in - * nanoseconds respectively. Together with the last running_time and rtp timestamp + * nanoseconds respectively. Together with the last running_time and RTP timestamp * we have observed in the source, we can generate @ntptime and @rtptime for an SR * packet. @ntptime is basically the fixed point representation of @ntpnstime * and @rtptime the associated RTP timestamp. @@ -1408,6 +1521,12 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime, GST_DEBUG ("last_rtime %" GST_TIME_FORMAT ", last_rtptime %" G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_rtime), t_rtp); + if (src->clock_rate == -1 && src->pt_set) { + GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt, + src->ssrc); + get_clock_rate (src, src->pt); + } + if (src->clock_rate != -1) { /* get the diff between the clock running_time and the buffer running_time. * This is the elapsed time, as measured against the pipeline clock, between @@ -1416,21 +1535,20 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime, * We need to apply this diff to the RTP timestamp to get the RTP timestamp * for the given ntpnstime. */ diff = GST_CLOCK_DIFF (src->last_rtime, running_time); + GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_STIME_FORMAT, + GST_TIME_ARGS (running_time), GST_STIME_ARGS (diff)); /* now translate the diff to RTP time, handle positive and negative cases. * If there is no diff, we already set rtptime correctly above. */ if (diff > 0) { - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); t_rtp += gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); } else { diff = -diff; - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); t_rtp -= gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); } } else { - GST_WARNING ("no clock-rate, cannot interpolate rtp time"); + GST_WARNING ("no clock-rate, cannot interpolate rtp time for SSRC %u", + src->ssrc); } /* convert the NTP time in nanoseconds to 32.32 fixed point */ @@ -1457,11 +1575,13 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime, * @src: an #RTPSource * @time: the current time of the system clock * @fractionlost: fraction lost since last SR/RR - * @packetslost: the cumululative number of packets lost + * @packetslost: the cumulative number of packets lost * @exthighestseq: the extended last sequence number received - * @jitter: the interarrival jitter - * @lsr: the last SR packet from this source - * @dlsr: the delay since last SR packet + * @jitter: the interarrival jitter (in clock rate units) + * @lsr: the time of the last SR packet on this source + * (in NTP Short Format, 16.16 fixed point) + * @dlsr: the delay since the last SR packet + * (in NTP Short Format, 16.16 fixed point) * * Get new values to put into a new report block from this source. * @@ -1547,10 +1667,10 @@ rtp_source_get_new_rb (RTPSource * src, GstClockTime time, * rtp_source_get_last_sr: * @src: an #RTPSource * @time: time of packet arrival - * @ntptime: the NTP time in 32.32 fixed point - * @rtptime: the RTP time + * @ntptime: the NTP time (in NTP Timestamp Format, 32.32 fixed point) + * @rtptime: the RTP time (in clock rate units) * @packet_count: the packet count - * @octet_count: the octect count + * @octet_count: the octet count * * Get the values of the last sender report as set with rtp_source_process_sr(). * @@ -1586,12 +1706,15 @@ rtp_source_get_last_sr (RTPSource * src, GstClockTime * time, guint64 * ntptime, * rtp_source_get_last_rb: * @src: an #RTPSource * @fractionlost: fraction lost since last SR/RR - * @packetslost: the cumululative number of packets lost + * @packetslost: the cumulative number of packets lost * @exthighestseq: the extended last sequence number received - * @jitter: the interarrival jitter - * @lsr: the last SR packet from this source - * @dlsr: the delay since last SR packet - * @round_trip: the round trip time + * @jitter: the interarrival jitter (in clock rate units) + * @lsr: the time of the last SR packet on this source + * (in NTP Short Format, 16.16 fixed point) + * @dlsr: the delay since the last SR packet + * (in NTP Short Format, 16.16 fixed point) + * @round_trip: the round-trip time + * (in NTP Short Format, 16.16 fixed point) * * Get the values of the last RB report set with rtp_source_process_rb(). * @@ -1735,17 +1858,33 @@ rtp_source_add_conflicting_address (RTPSource * src, */ void rtp_source_timeout (RTPSource * src, GstClockTime current_time, - GstClockTime feedback_retention_window) + GstClockTime running_time, GstClockTime feedback_retention_window) { GstRTCPPacket *pkt; + GstClockTime max_pts_window; + guint pruned = 0; src->conflicting_addresses = timeout_conflicting_addresses (src->conflicting_addresses, current_time); + if (feedback_retention_window == GST_CLOCK_TIME_NONE || + running_time < feedback_retention_window) { + return; + } + + max_pts_window = running_time - feedback_retention_window; + /* Time out AVPF packets that are older than the desired length */ - while ((pkt = g_queue_peek_tail (src->retained_feedback)) && - GST_BUFFER_TIMESTAMP (pkt) < feedback_retention_window) - gst_buffer_unref (g_queue_pop_tail (src->retained_feedback)); + while ((pkt = g_queue_peek_head (src->retained_feedback)) && + GST_BUFFER_PTS (pkt) < max_pts_window) { + gst_buffer_unref (g_queue_pop_head (src->retained_feedback)); + pruned++; + } + + GST_LOG_OBJECT (src, + "%u RTCP packets pruned with PTS less than %" GST_TIME_FORMAT + ", queue len: %u", pruned, GST_TIME_ARGS (max_pts_window), + g_queue_get_length (src->retained_feedback)); } static gint @@ -1754,7 +1893,16 @@ compare_buffers (gconstpointer a, gconstpointer b, gpointer user_data) const GstBuffer *bufa = a; const GstBuffer *bufb = b; - return GST_BUFFER_TIMESTAMP (bufa) - GST_BUFFER_TIMESTAMP (bufb); + g_return_val_if_fail (GST_BUFFER_PTS (bufa) != GST_CLOCK_TIME_NONE, -1); + g_return_val_if_fail (GST_BUFFER_PTS (bufb) != GST_CLOCK_TIME_NONE, 1); + + if (GST_BUFFER_PTS (bufa) < GST_BUFFER_PTS (bufb)) { + return -1; + } else if (GST_BUFFER_PTS (bufa) > GST_BUFFER_PTS (bufb)) { + return 1; + } + + return 0; } void @@ -1763,12 +1911,17 @@ rtp_source_retain_rtcp_packet (RTPSource * src, GstRTCPPacket * packet, { GstBuffer *buffer; + g_return_if_fail (running_time != GST_CLOCK_TIME_NONE); + buffer = gst_buffer_copy_region (packet->rtcp->buffer, GST_BUFFER_COPY_MEMORY, packet->offset, (gst_rtcp_packet_get_length (packet) + 1) * 4); - GST_BUFFER_TIMESTAMP (buffer) = running_time; + GST_BUFFER_PTS (buffer) = running_time; g_queue_insert_sorted (src->retained_feedback, buffer, compare_buffers, NULL); + + GST_LOG_OBJECT (src, "RTCP packet retained with PTS: %" GST_TIME_FORMAT, + GST_TIME_ARGS (running_time)); } gboolean @@ -1781,53 +1934,54 @@ rtp_source_has_retained (RTPSource * src, GCompareFunc func, gconstpointer data) } /** + * rtp_source_register_nack: * @src: The #RTPSource * @seqnum: a seqnum + * @deadline: the deadline before which RTX is still possible * * Register that @seqnum has not been received from @src. */ void -rtp_source_register_nack (RTPSource * src, guint16 seqnum) +rtp_source_register_nack (RTPSource * src, guint16 seqnum, + GstClockTime deadline) { - guint i, len; - guint32 dword = seqnum << 16; - gint diff = 16; + gint i; + guint len; + gint diff = -1; + guint16 tseq; len = src->nacks->len; - for (i = 0; i < len; i++) { - guint32 tdword; - guint16 tseq; + for (i = len - 1; i >= 0; i--) { + tseq = g_array_index (src->nacks, guint16, i); + diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum); - tdword = g_array_index (src->nacks, guint32, i); - tseq = tdword >> 16; + GST_TRACE ("[%u] %u %u diff %i len %u", i, tseq, seqnum, diff, len); - diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum); - if (diff < 16) + if (diff >= 0) break; } - /* we already have this seqnum */ - if (diff == 0) - return; - /* it comes before the recorded seqnum, FIXME, we could merge it - * if not to far away */ - if (diff < 0) { - GST_DEBUG ("insert NACK #%u at %u", seqnum, i); - g_array_insert_val (src->nacks, i, dword); - } else if (diff < 16) { - /* we can merge it */ - dword = g_array_index (src->nacks, guint32, i); - dword |= 1 << (diff - 1); - GST_DEBUG ("merge NACK #%u at %u with NACK #%u -> 0x%08x", seqnum, i, - dword >> 16, dword); - g_array_index (src->nacks, guint32, i) = dword; + + if (diff == 0) { + GST_DEBUG ("update NACK #%u deadline to %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_index (src->nack_deadlines, GstClockTime, i) = deadline; + } else if (i == len - 1) { + GST_DEBUG ("append NACK #%u with deadline %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_append_val (src->nacks, seqnum); + g_array_append_val (src->nack_deadlines, deadline); } else { - GST_DEBUG ("append NACK #%u", seqnum); - g_array_append_val (src->nacks, dword); + GST_DEBUG ("insert NACK #%u with deadline %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_insert_val (src->nacks, i + 1, seqnum); + g_array_insert_val (src->nack_deadlines, i + 1, deadline); } + src->send_nack = TRUE; } /** + * rtp_source_get_nacks: * @src: The #RTPSource * @n_nacks: result number of nacks * @@ -1835,18 +1989,51 @@ rtp_source_register_nack (RTPSource * src, guint16 seqnum) * * Returns: an array of @n_nacks seqnum values. */ -guint32 * +guint16 * rtp_source_get_nacks (RTPSource * src, guint * n_nacks) { if (n_nacks) *n_nacks = src->nacks->len; - return (guint32 *) src->nacks->data; + return (guint16 *) src->nacks->data; +} + +/** + * rtp_source_get_nacks: + * @src: The #RTPSource + * @n_nacks: result number of nacks + * + * Get the registered NACKS deadlines. + * + * Returns: an array of @n_nacks deadline values. + */ +GstClockTime * +rtp_source_get_nack_deadlines (RTPSource * src, guint * n_nacks) +{ + if (n_nacks) + *n_nacks = src->nack_deadlines->len; + + return (GstClockTime *) src->nack_deadlines->data; } +/** + * rtp_source_clear_nacks: + * @src: The #RTPSource + * @n_nacks: number of nacks + * + * Remove @n_nacks oldest NACKS form array. + */ void -rtp_source_clear_nacks (RTPSource * src) +rtp_source_clear_nacks (RTPSource * src, guint n_nacks) { - g_array_set_size (src->nacks, 0); - src->send_nack = FALSE; + g_return_if_fail (n_nacks <= src->nacks->len); + + if (src->nacks->len == n_nacks) { + g_array_set_size (src->nacks, 0); + g_array_set_size (src->nack_deadlines, 0); + src->send_nack = FALSE; + } else { + g_array_remove_range (src->nacks, 0, n_nacks); + g_array_remove_range (src->nack_deadlines, 0, n_nacks); + } }