/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
+ * Copyright (C) 2015 Kurento (http://kurento.org/)
+ * @author: Miguel ParĂs <mparisdiaz@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
#define DEFAULT_IS_SENDER FALSE
#define DEFAULT_SDES NULL
#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION
+#define DEFAULT_MAX_DROPOUT_TIME 60000
+#define DEFAULT_MAX_MISORDER_TIME 2000
+#define DEFAULT_DISABLE_RTCP FALSE
enum
{
PROP_IS_SENDER,
PROP_SDES,
PROP_STATS,
- PROP_PROBATION
+ PROP_PROBATION,
+ PROP_MAX_DROPOUT_TIME,
+ PROP_MAX_MISORDER_TIME,
+ PROP_DISABLE_RTCP
};
/* GObject vmethods */
* The current SDES items of the source. Returns a structure with name
* application/x-rtp-source-sdes and may contain the following fields:
*
- * 'cname' G_TYPE_STRING : The canonical name
+ * 'cname' G_TYPE_STRING : The canonical name in the form user@host
* 'name' G_TYPE_STRING : The user name
* 'email' G_TYPE_STRING : The user's electronic mail address
* 'phone' G_TYPE_STRING : The user's phone number
* These values are only updated when the source is sending.
*
* "sent-rb" G_TYPE_BOOLEAN we have sent an RB
- * "sent-rb-fractionlost" G_TYPE_UINT calculated lost fraction
+ * "sent-rb-fractionlost" G_TYPE_UINT calculated lost 8-bit fraction
* "sent-rb-packetslost" G_TYPE_INT lost packets
* "sent-rb-exthighestseq" G_TYPE_UINT last seen seqnum
* "sent-rb-jitter" G_TYPE_UINT jitter (in clock rate units)
- * "sent-rb-lsr" G_TYPE_UINT last SR time (in NTP Short Format, 16.16 fixed point)
- * "sent-rb-dlsr" G_TYPE_UINT delay since last SR (in NTP Short Format, 16.16 fixed point)
+ * "sent-rb-lsr" G_TYPE_UINT last SR time (seconds in NTP Short Format, 16.16 fixed point)
+ * "sent-rb-dlsr" G_TYPE_UINT delay since last SR (seconds in NTP Short Format, 16.16 fixed point)
*
* The following fields are only present for non-internal sources and
* represents the last RB that this source sent. This is only updated
* when the source is receiving data and sending RB blocks.
*
* "have-rb" G_TYPE_BOOLEAN the source has sent RB
- * "rb-fractionlost" G_TYPE_UINT lost fraction
+ * "rb-fractionlost" G_TYPE_UINT lost 8-bit fraction
* "rb-packetslost" G_TYPE_INT lost packets
* "rb-exthighestseq" G_TYPE_UINT highest received seqnum
* "rb-jitter" G_TYPE_UINT reception jitter (in clock rate units)
- * "rb-lsr" G_TYPE_UINT last SR time (in NTP Short Format, 16.16 fixed point)
- * "rb-dlsr" G_TYPE_UINT delay since last SR (in NTP Short Format, 16.16 fixed point)
+ * "rb-lsr" G_TYPE_UINT last SR time (seconds in NTP Short Format, 16.16 fixed point)
+ * "rb-dlsr" G_TYPE_UINT delay since last SR (seconds in NTP Short Format, 16.16 fixed point)
*
* The round trip of this source is calculated from the last RB
* values and the reception time of the last RB packet. It is only present for
* non-internal sources.
*
- * "rb-round-trip" G_TYPE_UINT the round-trip time (in NTP Short Format, 16.16 fixed point)
+ * "rb-round-trip" G_TYPE_UINT the round-trip time (seconds in NTP Short Format, 16.16 fixed point)
*
*/
g_object_class_install_property (gobject_class, PROP_STATS,
0, G_MAXUINT, DEFAULT_PROBATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
+ g_param_spec_uint ("max-dropout-time", "Max dropout time",
+ "The maximum time (milliseconds) of missing packets tolerated.",
+ 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
+ g_param_spec_uint ("max-misorder-time", "Max misorder time",
+ "The maximum time (milliseconds) of misordered packets tolerated.",
+ 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * RTPSession::disable-rtcp:
+ *
+ * Allow disabling the sending of RTCP packets for this source.
+ */
+ g_object_class_install_property (gobject_class, PROP_DISABLE_RTCP,
+ g_param_spec_boolean ("disable-rtcp", "Disable RTCP",
+ "Disable sending RTCP packets for this source",
+ DEFAULT_DISABLE_RTCP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
}
src->bye_reason = NULL;
src->sent_bye = FALSE;
g_hash_table_remove_all (src->reported_in_sr_of);
+ g_queue_foreach (src->retained_feedback, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (src->retained_feedback);
+ src->last_rtptime = -1;
src->stats.cycles = -1;
src->stats.jitter = 0;
src->stats.sent_pli_count = 0;
src->stats.sent_fir_count = 0;
+ src->stats.sent_nack_count = 0;
+ src->stats.recv_nack_count = 0;
}
static void
src->probation = DEFAULT_PROBATION;
src->curr_probation = src->probation;
src->closing = FALSE;
+ src->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
+ src->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
src->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
src->clock_rate = -1;
src->packets = g_queue_new ();
src->seqnum_offset = -1;
- src->last_rtptime = -1;
src->retained_feedback = g_queue_new ();
src->nacks = g_array_new (FALSE, FALSE, sizeof (guint32));
src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal);
+ src->last_keyframe_request = GST_CLOCK_TIME_NONE;
+
rtp_source_reset (src);
+
+ src->pt_set = FALSE;
}
void
"sent-pli-count", G_TYPE_UINT, src->stats.sent_pli_count,
"recv-pli-count", G_TYPE_UINT, src->stats.recv_pli_count,
"sent-fir-count", G_TYPE_UINT, src->stats.sent_fir_count,
- "recv-fir-count", G_TYPE_UINT, src->stats.recv_fir_count, NULL);
+ "recv-fir-count", G_TYPE_UINT, src->stats.recv_fir_count,
+ "sent-nack-count", G_TYPE_UINT, src->stats.sent_nack_count,
+ "recv-nack-count", G_TYPE_UINT, src->stats.recv_nack_count, NULL);
/* get the last SR. */
have_sr = rtp_source_get_last_sr (src, &time, &ntptime, &rtptime,
case PROP_PROBATION:
src->probation = g_value_get_uint (value);
break;
+ case PROP_MAX_DROPOUT_TIME:
+ src->max_dropout_time = g_value_get_uint (value);
+ break;
+ case PROP_MAX_MISORDER_TIME:
+ src->max_misorder_time = g_value_get_uint (value);
+ break;
+ case PROP_DISABLE_RTCP:
+ src->disable_rtcp = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_PROBATION:
g_value_set_uint (value, src->probation);
break;
+ case PROP_MAX_DROPOUT_TIME:
+ g_value_set_uint (value, src->max_dropout_time);
+ break;
+ case PROP_MAX_MISORDER_TIME:
+ g_value_set_uint (value, src->max_misorder_time);
+ break;
+ case PROP_DISABLE_RTCP:
+ g_value_set_boolean (value, src->disable_rtcp);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
GST_DEBUG ("got clock-rate %d", clock_rate);
src->clock_rate = clock_rate;
+ gst_rtp_packet_rate_ctx_reset (&src->packet_rate_ctx, clock_rate);
}
return src->clock_rate;
}
}
static gboolean
-update_receiver_stats (RTPSource * src, RTPPacketInfo * pinfo)
+update_receiver_stats (RTPSource * src, RTPPacketInfo * pinfo,
+ gboolean is_receive)
{
guint16 seqnr, expected;
RTPSourceStats *stats;
gint16 delta;
+ gint32 packet_rate, max_dropout, max_misorder;
stats = &src->stats;
seqnr = pinfo->seqnum;
+ packet_rate =
+ gst_rtp_packet_rate_ctx_update (&src->packet_rate_ctx, pinfo->seqnum,
+ pinfo->rtptime);
+ max_dropout =
+ gst_rtp_packet_rate_ctx_get_max_dropout (&src->packet_rate_ctx,
+ src->max_dropout_time);
+ max_misorder =
+ gst_rtp_packet_rate_ctx_get_max_misorder (&src->packet_rate_ctx,
+ src->max_misorder_time);
+ GST_TRACE ("SSRC %08x, packet_rate: %d, max_dropout: %d, max_misorder: %d",
+ src->ssrc, packet_rate, max_dropout, max_misorder);
+
if (stats->cycles == -1) {
GST_DEBUG ("received first packet");
/* first time we heard of this source */
src->curr_probation = src->probation;
}
- expected = src->stats.max_seq + 1;
- delta = gst_rtp_buffer_compare_seqnum (expected, seqnr);
+ if (is_receive) {
+ expected = src->stats.max_seq + 1;
+ delta = gst_rtp_buffer_compare_seqnum (expected, seqnr);
+
+ /* if we are still on probation, check seqnum */
+ if (src->curr_probation) {
+ /* when in probation, we require consecutive seqnums */
+ if (delta == 0) {
+ /* expected packet */
+ GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected);
+ src->curr_probation--;
+ if (seqnr < stats->max_seq) {
+ /* sequence number wrapped - count another 64K cycle. */
+ stats->cycles += RTP_SEQ_MOD;
+ }
+ src->stats.max_seq = seqnr;
+
+ if (src->curr_probation == 0) {
+ GST_DEBUG ("probation done!");
+ init_seq (src, seqnr);
+ } else {
+ GstBuffer *q;
+
+ GST_DEBUG ("probation %d: queue packet", src->curr_probation);
+ /* when still in probation, keep packets in a list. */
+ g_queue_push_tail (src->packets, pinfo->data);
+ pinfo->data = NULL;
+ /* remove packets from queue if there are too many */
+ while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
+ q = g_queue_pop_head (src->packets);
+ gst_buffer_unref (q);
+ }
+ goto done;
+ }
+ } else {
+ /* unexpected seqnum in probation */
+ goto probation_seqnum;
+ }
+ } else if (delta >= 0 && delta < max_dropout) {
+ /* Clear bad packets */
+ stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */
+ g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (src->packets);
- /* if we are still on probation, check seqnum */
- if (src->curr_probation) {
- /* when in probation, we require consecutive seqnums */
- if (delta == 0) {
- /* expected packet */
- GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected);
- src->curr_probation--;
+ /* in order, with permissible gap */
if (seqnr < stats->max_seq) {
/* sequence number wrapped - count another 64K cycle. */
stats->cycles += RTP_SEQ_MOD;
}
- src->stats.max_seq = seqnr;
-
- if (src->curr_probation == 0) {
- GST_DEBUG ("probation done!");
+ stats->max_seq = seqnr;
+ } else if (delta < -max_misorder || delta >= max_dropout) {
+ /* the sequence number made a very large jump */
+ if (seqnr == stats->bad_seq && src->packets->head) {
+ /* two sequential packets -- assume that the other side
+ * restarted without telling us so just re-sync
+ * (i.e., pretend this was the first packet). */
init_seq (src, seqnr);
} else {
- GstBuffer *q;
-
- GST_DEBUG ("probation %d: queue packet", src->curr_probation);
- /* when still in probation, keep packets in a list. */
+ /* unacceptable jump */
+ stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1);
+ g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (src->packets);
g_queue_push_tail (src->packets, pinfo->data);
pinfo->data = NULL;
- /* remove packets from queue if there are too many */
- while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
- q = g_queue_pop_head (src->packets);
- gst_buffer_unref (q);
- }
- goto done;
+ goto bad_sequence;
}
- } else {
- /* unexpected seqnum in probation */
- goto probation_seqnum;
- }
- } else if (delta >= 0 && delta < RTP_MAX_DROPOUT) {
- /* Clear bad packets */
- stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */
- g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL);
- g_queue_clear (src->packets);
-
- /* in order, with permissible gap */
- if (seqnr < stats->max_seq) {
- /* sequence number wrapped - count another 64K cycle. */
- stats->cycles += RTP_SEQ_MOD;
- }
- stats->max_seq = seqnr;
- } else if (delta < -RTP_MAX_MISORDER || delta >= RTP_MAX_DROPOUT) {
- /* the sequence number made a very large jump */
- if (seqnr == stats->bad_seq && src->packets->head) {
- /* two sequential packets -- assume that the other side
- * restarted without telling us so just re-sync
- * (i.e., pretend this was the first packet). */
- init_seq (src, seqnr);
- } else {
- /* unacceptable jump */
- stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1);
+ } else { /* delta < 0 && delta >= -max_misorder */
+ /* Clear bad packets */
+ stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */
g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (src->packets);
- g_queue_push_tail (src->packets, pinfo->data);
- pinfo->data = NULL;
- goto bad_sequence;
+
+ /* duplicate or reordered packet, will be filtered by jitterbuffer. */
+ GST_INFO ("duplicate or reordered packet (seqnr %u, expected %u)",
+ seqnr, expected);
}
- } else { /* delta < 0 && delta >= -RTP_MAX_MISORDER */
- /* Clear bad packets */
- stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */
- g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL);
- g_queue_clear (src->packets);
-
- /* duplicate or reordered packet, will be filtered by jitterbuffer. */
- GST_WARNING ("duplicate or reordered packet (seqnr %u, expected %u)", seqnr,
- expected);
}
src->stats.octets_received += pinfo->payload_len;
}
bad_sequence:
{
- GST_WARNING ("unacceptable seqnum received");
+ GST_WARNING
+ ("unacceptable seqnum received (seqnr %u, delta %d, packet_rate: %d, max_dropout: %d, max_misorder: %d)",
+ seqnr, delta, packet_rate, max_dropout, max_misorder);
return FALSE;
}
probation_seqnum:
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR);
- if (!update_receiver_stats (src, pinfo))
+ if (!update_receiver_stats (src, pinfo, TRUE))
return GST_FLOW_OK;
/* the source that sent the packet must be a sender */
src->is_sender = TRUE;
/* we are also a receiver of our packets */
- if (!update_receiver_stats (src, pinfo))
+ if (!update_receiver_stats (src, pinfo, FALSE))
return GST_FLOW_OK;
+ if (src->pt_set && src->pt != pinfo->pt) {
+ GST_WARNING ("Changing pt from %u to %u for SSRC %u", src->pt, pinfo->pt,
+ src->ssrc);
+ }
+
+ src->pt = pinfo->pt;
+ src->pt_set = TRUE;
+
/* update stats for the SR */
src->stats.packets_sent += pinfo->packets;
src->stats.octets_sent += pinfo->payload_len;
GST_DEBUG ("last_rtime %" GST_TIME_FORMAT ", last_rtptime %"
G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_rtime), t_rtp);
+ if (src->clock_rate == -1 && src->pt_set) {
+ GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt,
+ src->ssrc);
+ get_clock_rate (src, src->pt);
+ }
+
if (src->clock_rate != -1) {
/* get the diff between the clock running_time and the buffer running_time.
* This is the elapsed time, as measured against the pipeline clock, between
* We need to apply this diff to the RTP timestamp to get the RTP timestamp
* for the given ntpnstime. */
diff = GST_CLOCK_DIFF (src->last_rtime, running_time);
+ GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_STIME_FORMAT,
+ GST_TIME_ARGS (running_time), GST_STIME_ARGS (diff));
/* now translate the diff to RTP time, handle positive and negative cases.
* If there is no diff, we already set rtptime correctly above. */
if (diff > 0) {
- GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT,
- GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff));
t_rtp += gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND);
} else {
diff = -diff;
- GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT,
- GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff));
t_rtp -= gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND);
}
} else {
- GST_WARNING ("no clock-rate, cannot interpolate rtp time");
+ GST_WARNING ("no clock-rate, cannot interpolate rtp time for SSRC %u",
+ src->ssrc);
}
/* convert the NTP time in nanoseconds to 32.32 fixed point */
*/
void
rtp_source_timeout (RTPSource * src, GstClockTime current_time,
- GstClockTime feedback_retention_window)
+ GstClockTime running_time, GstClockTime feedback_retention_window)
{
GstRTCPPacket *pkt;
+ GstClockTime max_pts_window;
+ guint pruned = 0;
src->conflicting_addresses =
timeout_conflicting_addresses (src->conflicting_addresses, current_time);
+ if (feedback_retention_window == GST_CLOCK_TIME_NONE ||
+ running_time < feedback_retention_window) {
+ return;
+ }
+
+ max_pts_window = running_time - feedback_retention_window;
+
/* Time out AVPF packets that are older than the desired length */
- while ((pkt = g_queue_peek_tail (src->retained_feedback)) &&
- GST_BUFFER_PTS (pkt) < feedback_retention_window)
- gst_buffer_unref (g_queue_pop_tail (src->retained_feedback));
+ while ((pkt = g_queue_peek_head (src->retained_feedback)) &&
+ GST_BUFFER_PTS (pkt) < max_pts_window) {
+ gst_buffer_unref (g_queue_pop_head (src->retained_feedback));
+ pruned++;
+ }
+
+ GST_LOG_OBJECT (src,
+ "%u RTCP packets pruned with PTS less than %" GST_TIME_FORMAT
+ ", queue len: %u", pruned, GST_TIME_ARGS (max_pts_window),
+ g_queue_get_length (src->retained_feedback));
}
static gint
const GstBuffer *bufa = a;
const GstBuffer *bufb = b;
- return GST_BUFFER_PTS (bufa) - GST_BUFFER_PTS (bufb);
+ g_return_val_if_fail (GST_BUFFER_PTS (bufa) != GST_CLOCK_TIME_NONE, -1);
+ g_return_val_if_fail (GST_BUFFER_PTS (bufb) != GST_CLOCK_TIME_NONE, 1);
+
+ if (GST_BUFFER_PTS (bufa) < GST_BUFFER_PTS (bufb)) {
+ return -1;
+ } else if (GST_BUFFER_PTS (bufa) > GST_BUFFER_PTS (bufb)) {
+ return 1;
+ }
+
+ return 0;
}
void
{
GstBuffer *buffer;
+ g_return_if_fail (running_time != GST_CLOCK_TIME_NONE);
+
buffer = gst_buffer_copy_region (packet->rtcp->buffer, GST_BUFFER_COPY_MEMORY,
packet->offset, (gst_rtcp_packet_get_length (packet) + 1) * 4);
GST_BUFFER_PTS (buffer) = running_time;
g_queue_insert_sorted (src->retained_feedback, buffer, compare_buffers, NULL);
+
+ GST_LOG_OBJECT (src, "RTCP packet retained with PTS: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (running_time));
}
gboolean