/**
* SECTION:element-rtpjitterbuffer
+ * @title: rtpjitterbuffer
*
* This element reorders and removes duplicate RTP packets as they are received
* from a network source.
* depayloader or other element to create concealment data or some other logic
* to gracefully handle the missing packets.
*
- * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
+ * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incoming
* buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
* buffer.
*
*
* - If seqnum N arrived, all seqnum older than
* N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
- * immediately. This is to request fast feedback for abonormally reorder
+ * immediately. This is to request fast feedback for abnormally reorder
* packets before any of the previous timeouts is triggered.
*
* A late packet triggers the GstRTPRetransmissionRequest custom upstream
*
* This element will automatically be used inside rtpbin.
*
- * <refsect2>
- * <title>Example pipelines</title>
+ * ## Example pipelines
* |[
* gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
* ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
* inserted into the pipeline to smooth out network jitter and to reorder the
* out-of-order RTP packets.
- * </refsect2>
+ *
*/
#ifdef HAVE_CONFIG_H
#include "gstrtpjitterbuffer.h"
#include "rtpjitterbuffer.h"
#include "rtpstats.h"
+#include "rtptimerqueue.h"
#include <gst/glib-compat-private.h>
#define DEFAULT_TS_OFFSET 0
#define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT 0
#define DEFAULT_DO_LOST FALSE
+#define DEFAULT_POST_DROP_MESSAGES FALSE
+#define DEFAULT_DROP_MESSAGES_INTERVAL_MS 200
#define DEFAULT_MODE RTP_JITTER_BUFFER_MODE_SLAVE
#define DEFAULT_PERCENT 0
#define DEFAULT_DO_RETRANSMISSION FALSE
PROP_TS_OFFSET,
PROP_MAX_TS_OFFSET_ADJUSTMENT,
PROP_DO_LOST,
+ PROP_POST_DROP_MESSAGES,
+ PROP_DROP_MESSAGES_INTERVAL,
PROP_MODE,
PROP_PERCENT,
PROP_DO_RETRANSMISSION,
(g_mutex_unlock (&(priv)->jbuf_lock)); \
} G_STMT_END
+#define JBUF_WAIT_QUEUE(priv) G_STMT_START { \
+ GST_DEBUG ("waiting queue"); \
+ (priv)->waiting_queue++; \
+ g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock); \
+ (priv)->waiting_queue--; \
+ GST_DEBUG ("waiting queue done"); \
+} G_STMT_END
+#define JBUF_SIGNAL_QUEUE(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_queue)) { \
+ GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
+ g_cond_signal (&(priv)->jbuf_queue); \
+ } \
+} G_STMT_END
+
#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
GST_DEBUG ("waiting timer"); \
(priv)->waiting_timer++; \
#define GST_BUFFER_IS_RETRANSMISSION(buffer) \
GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
-typedef struct TimerQueue
+#if !GLIB_CHECK_VERSION(2, 60, 0)
+#define g_queue_clear_full queue_clear_full
+static void
+queue_clear_full (GQueue * queue, GDestroyNotify free_func)
{
- GQueue *timers;
- GHashTable *hashtable;
-} TimerQueue;
+ gpointer data;
+
+ while ((data = g_queue_pop_head (queue)) != NULL)
+ free_func (data);
+}
+#endif
struct _GstRtpJitterBufferPrivate
{
RTPJitterBuffer *jbuf;
GMutex jbuf_lock;
- gboolean waiting_timer;
+ guint waiting_queue;
+ GCond jbuf_queue;
+ guint waiting_timer;
GCond jbuf_timer;
gboolean waiting_event;
GCond jbuf_event;
gint64 ts_offset;
guint64 max_ts_offset_adjustment;
gboolean do_lost;
+ gboolean post_drop_messages;
+ guint drop_messages_interval_ms;
gboolean do_retransmission;
gboolean rtx_next_seqnum;
gint rtx_delay;
GstClockTime last_in_pts;
guint32 next_in_seqnum;
- GArray *timers;
- TimerQueue *rtx_stats_timers;
+ /* "normal" timers */
+ RtpTimerQueue *timers;
+ /* timers used for RTX statistics backlog */
+ RtpTimerQueue *rtx_stats_timers;
/* start and stop ranges */
GstClockTime npt_start;
GstClockTime last_pts;
guint64 last_rtptime;
GstClockTime avg_jitter;
-};
+ /* for dropped packet messages */
+ GstClockTime last_drop_msg_timestamp;
+ /* accumulators; reset every time a drop message is posted */
+ guint num_too_late;
+ guint num_drop_on_latency;
+};
typedef enum
{
- TIMER_TYPE_EXPECTED,
- TIMER_TYPE_LOST,
- TIMER_TYPE_DEADLINE,
- TIMER_TYPE_EOS
-} TimerType;
-
-typedef struct
-{
- guint idx;
- guint16 seqnum;
- guint num;
- TimerType type;
- GstClockTime timeout;
- GstClockTime duration;
- GstClockTime rtx_base;
- GstClockTime rtx_delay;
- GstClockTime rtx_retry;
- GstClockTime rtx_last;
- guint num_rtx_retry;
- guint num_rtx_received;
-} TimerData;
+ REASON_TOO_LATE,
+ REASON_DROP_ON_LATENCY
+} DropMessageReason;
static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
-static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
jitterbuffer);
static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
- TimerData * timer, GstClockTime dts, gboolean success);
+ const RtpTimer * timer, GstClockTime dts, gboolean success);
-static TimerQueue *timer_queue_new (void);
-static void timer_queue_free (TimerQueue * queue);
+static GstClockTime get_current_running_time (GstRtpJitterBuffer *
+ jitterbuffer);
static void
gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
+ * GstRtpJitterBuffer:post-drop-messages:
+ *
+ * Post custom messages to the bus when a packet is dropped by the
+ * jitterbuffer due to arriving too late, being already considered lost,
+ * or being dropped due to the drop-on-latency property being enabled.
+ * Message is of type GST_MESSAGE_ELEMENT and contains a GstStructure named
+ * "drop-msg" with the following fields:
+ *
+ * * #guint `seqnum`: Seqnum of dropped packet.
+ * * #guint64 `timestamp`: PTS timestamp of dropped packet.
+ * * #GString `reason`: Reason for dropping the packet.
+ * * #guint `num-too-late`: Number of packets arriving too late since
+ * last drop message.
+ * * #guint `num-drop-on-latency`: Number of packets dropped due to the
+ * drop-on-latency property since last drop message.
+ *
+ * Since: 1.18
+ */
+ g_object_class_install_property (gobject_class, PROP_POST_DROP_MESSAGES,
+ g_param_spec_boolean ("post-drop-messages", "Post drop messages",
+ "Post a custom message to the bus when a packet is dropped by the jitterbuffer",
+ DEFAULT_POST_DROP_MESSAGES,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstRtpJitterBuffer:drop-messages-interval:
+ *
+ * Minimal time in milliseconds between posting dropped packet messages, if enabled
+ * by setting property by setting #GstRtpJitterBuffer:post-drop-messages to %TRUE.
+ * If interval is set to 0, every dropped packet will result in a drop message being posted.
+ *
+ * Since: 1.18
+ */
+ g_object_class_install_property (gobject_class, PROP_DROP_MESSAGES_INTERVAL,
+ g_param_spec_uint ("drop-messages-interval",
+ "Drop message interval",
+ "Minimal time between posting dropped packet messages", 0,
+ G_MAXUINT, DEFAULT_DROP_MESSAGES_INTERVAL_MS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
* GstRtpJitterBuffer:mode:
*
* Control the buffering and timestamping mode used by the jitterbuffer.
-1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
- * GstRtpJitterBuffer::rtx-retry-timeout:
+ * GstRtpJitterBuffer:rtx-retry-timeout:
*
* When no packet has been received after sending a retransmission event
* for this time, retry sending a retransmission event.
"ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
- * GstRtpJitterBuffer::rtx-min-retry-timeout:
+ * GstRtpJitterBuffer:rtx-min-retry-timeout:
*
* The minimum amount of time between retry timeouts. When
* GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
"(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DEADLINE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
- * GstRtpJitterBuffer::rtx-stats-timeout:
+ * GstRtpJitterBuffer:rtx-stats-timeout:
*
* The time to wait for a retransmitted packet after it has been
* considered lost in order to collect RTX statistics.
g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
g_param_spec_uint ("max-dropout-time", "Max dropout time",
"The maximum time (milliseconds) of missing packets tolerated.",
- 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
+ 0, G_MAXINT32, DEFAULT_MAX_DROPOUT_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
* Various jitterbuffer statistics. This property returns a GstStructure
* with name application/x-rtp-jitterbuffer-stats with the following fields:
*
- * <itemizedlist>
- * <listitem>
- * <para>
- * #guint64
- * <classname>"num-pushed"</classname>:
- * the number of packets pushed out.
- * </para>
- * </listitem>
- * <listitem>
- * <para>
- * #guint64
- * <classname>"num-lost"</classname>:
- * the number of packets considered lost.
- * </para>
- * </listitem>
- * <listitem>
- * <para>
- * #guint64
- * <classname>"num-late"</classname>:
- * the number of packets arriving too late.
- * </para>
- * </listitem>
- * <listitem>
- * <para>
- * #guint64
- * <classname>"num-duplicates"</classname>:
- * the number of duplicate packets.
- * </para>
- * </listitem>
- * <listitem>
- * <para>
- * #guint64
- * <classname>"rtx-count"</classname>:
- * the number of retransmissions requested.
- * </para>
- * </listitem>
- * <listitem>
- * <para>
- * #guint64
- * <classname>"rtx-success-count"</classname>:
- * the number of successful retransmissions.
- * </para>
- * </listitem>
- * <listitem>
- * <para>
- * #gdouble
- * <classname>"rtx-per-packet"</classname>:
- * average number of RTX per packet.
- * </para>
- * </listitem>
- * <listitem>
- * <para>
- * #guint64
- * <classname>"rtx-rtt"</classname>:
- * average round trip time per RTX.
- * </para>
- * </listitem>
- * </itemizedlist>
+ * * #guint64 `num-pushed`: the number of packets pushed out.
+ * * #guint64 `num-lost`: the number of packets considered lost.
+ * * #guint64 `num-late`: the number of packets arriving too late.
+ * * #guint64 `num-duplicates`: the number of duplicate packets.
+ * * #guint64 `avg-jitter`: the average jitter in nanoseconds.
+ * * #guint64 `rtx-count`: the number of retransmissions requested.
+ * * #guint64 `rtx-success-count`: the number of successful retransmissions.
+ * * #gdouble `rtx-per-packet`: average number of RTX per packet.
+ * * #guint64 `rtx-rtt`: average round trip time per RTX.
*
* Since: 1.4
*/
gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
- request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
- GST_TYPE_CAPS, 1, G_TYPE_UINT);
+ request_pt_map), NULL, NULL, NULL, GST_TYPE_CAPS, 1, G_TYPE_UINT);
/**
* GstRtpJitterBuffer::handle-sync:
* @buffer: the object which received the signal
gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
- handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
+ handle_sync), NULL, NULL, NULL,
G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
/**
gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
- on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
- G_TYPE_NONE, 0, G_TYPE_NONE);
+ on_npt_stop), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstRtpJitterBuffer::clear-pt-map:
g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
- g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
+ NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstRtpJitterBuffer::set-active:
g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
- G_TYPE_UINT64);
+ NULL, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN, G_TYPE_UINT64);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
GST_DEBUG_CATEGORY_INIT
(rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_jitter_buffer_chain_rtcp);
+
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+ gst_type_mark_as_plugin_api (RTP_TYPE_JITTER_BUFFER_MODE, 0);
+#endif
}
static void
priv->ts_offset = DEFAULT_TS_OFFSET;
priv->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
priv->do_lost = DEFAULT_DO_LOST;
+ priv->post_drop_messages = DEFAULT_POST_DROP_MESSAGES;
+ priv->drop_messages_interval_ms = DEFAULT_DROP_MESSAGES_INTERVAL_MS;
priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
priv->rtx_delay = DEFAULT_RTX_DELAY;
priv->last_pts = -1;
priv->last_rtptime = -1;
priv->avg_jitter = 0;
+ priv->last_drop_msg_timestamp = GST_CLOCK_TIME_NONE;
+ priv->num_too_late = 0;
+ priv->num_drop_on_latency = 0;
priv->segment_seqnum = GST_SEQNUM_INVALID;
- priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
- priv->rtx_stats_timers = timer_queue_new ();
+ priv->timers = rtp_timer_queue_new ();
+ priv->rtx_stats_timers = rtp_timer_queue_new ();
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
+ g_cond_init (&priv->jbuf_queue);
g_cond_init (&priv->jbuf_timer);
g_cond_init (&priv->jbuf_event);
g_cond_init (&priv->jbuf_query);
g_queue_init (&priv->gap_packets);
gst_segment_init (&priv->segment, GST_FORMAT_TIME);
- /* reset skew detection initialy */
+ /* reset skew detection initially */
rtp_jitter_buffer_reset_skew (priv->jbuf);
rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
}
-#define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
-
-#define ITEM_TYPE_BUFFER 0
-#define ITEM_TYPE_LOST 1
-#define ITEM_TYPE_EVENT 2
-#define ITEM_TYPE_QUERY 3
-
-static RTPJitterBufferItem *
-alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
- guint seqnum, guint count, guint rtptime)
-{
- RTPJitterBufferItem *item;
-
- item = g_slice_new (RTPJitterBufferItem);
- item->data = data;
- item->next = NULL;
- item->prev = NULL;
- item->type = type;
- item->dts = dts;
- item->pts = pts;
- item->seqnum = seqnum;
- item->count = count;
- item->rtptime = rtptime;
-
- return item;
-}
-
static void
-free_item (RTPJitterBufferItem * item)
-{
- g_return_if_fail (item != NULL);
-
- if (item->data && item->type != ITEM_TYPE_QUERY)
- gst_mini_object_unref (item->data);
- g_slice_free (RTPJitterBufferItem, item);
-}
-
-static void
-free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
+free_item_and_retain_sticky_events (RTPJitterBufferItem * item,
+ gpointer user_data)
{
GList **l = user_data;
if (item->data && item->type == ITEM_TYPE_EVENT
&& GST_EVENT_IS_STICKY (item->data)) {
*l = g_list_prepend (*l, item->data);
- } else if (item->data && item->type != ITEM_TYPE_QUERY) {
- gst_mini_object_unref (item->data);
+ item->data = NULL;
}
- g_slice_free (RTPJitterBufferItem, item);
+
+ rtp_jitter_buffer_free_item (item);
}
static void
jitterbuffer = GST_RTP_JITTER_BUFFER (object);
priv = jitterbuffer->priv;
- g_array_free (priv->timers, TRUE);
- timer_queue_free (priv->rtx_stats_timers);
+ g_object_unref (priv->timers);
+ g_object_unref (priv->rtx_stats_timers);
g_mutex_clear (&priv->jbuf_lock);
+ g_cond_clear (&priv->jbuf_queue);
g_cond_clear (&priv->jbuf_timer);
g_cond_clear (&priv->jbuf_event);
g_cond_clear (&priv->jbuf_query);
- rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
+ rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (&priv->gap_packets);
g_object_unref (priv->jbuf);
return caps;
}
+/* g_ascii_string_to_unsigned is available since 2.54. Get rid of this wrapper
+ * when we bump the version in 1.18 */
+#if !GLIB_CHECK_VERSION(2,54,0)
+#define g_ascii_string_to_unsigned _gst_jitter_buffer_ascii_string_to_unsigned
+static gboolean
+_gst_jitter_buffer_ascii_string_to_unsigned (const gchar * str, guint base,
+ guint64 min, guint64 max, guint64 * out_num, GError ** error)
+{
+ gchar *endptr = NULL;
+ *out_num = g_ascii_strtoull (str, &endptr, base);
+ if (errno)
+ return FALSE;
+ if (endptr == str)
+ return FALSE;
+ return TRUE;
+}
+#endif
+
/*
* Must be called with JBUF_LOCK held
*/
if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
- if (!g_str_has_prefix (mediaclk, "direct=")
- || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
+ if (!g_str_has_prefix (mediaclk, "direct=") ||
+ !g_ascii_string_to_unsigned (&mediaclk[8], 10, 0, G_MAXUINT64,
+ &clock_offset, NULL))
GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
if (strstr (mediaclk, "rate=") != NULL) {
GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
/* this unblocks any waiting pops on the src pad task */
JBUF_SIGNAL_EVENT (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
+ JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
}
priv->last_in_pts = 0;
priv->equidistant = 0;
priv->segment_seqnum = GST_SEQNUM_INVALID;
+ priv->last_drop_msg_timestamp = GST_CLOCK_TIME_NONE;
+ priv->num_too_late = 0;
+ priv->num_drop_on_latency = 0;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
- rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
+ rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
rtp_jitter_buffer_reset_skew (priv->jbuf);
- remove_all_timers (jitterbuffer);
+ rtp_timer_queue_remove_all (priv->timers);
g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (&priv->gap_packets);
JBUF_UNLOCK (priv);
/* block until we go to PLAYING */
priv->blocked = TRUE;
priv->timer_running = TRUE;
+ priv->srcresult = GST_FLOW_OK;
priv->timer_thread =
g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
JBUF_UNLOCK (priv);
JBUF_LOCK (priv);
gst_buffer_replace (&priv->last_sr, NULL);
priv->timer_running = FALSE;
+ priv->srcresult = GST_FLOW_FLUSHING;
unschedule_current_timer (jitterbuffer);
JBUF_SIGNAL_TIMER (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
+ JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
g_thread_join (priv->timer_thread);
priv->timer_thread = NULL;
queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- RTPJitterBufferItem *item;
gboolean head;
switch (GST_EVENT_TYPE (event)) {
break;
}
-
GST_DEBUG_OBJECT (jitterbuffer, "adding event");
- item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+ head = rtp_jitter_buffer_append_event (priv->jbuf, event);
if (head || priv->eos)
JBUF_SIGNAL_EVENT (priv);
}
/*
- * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
+ * Must be called with JBUF_LOCK held, will release the LOCK when emitting the
* signal. The function returns GST_FLOW_ERROR when a parsing error happened and
* GST_FLOW_FLUSHING when the element is shutting down. On success
* GST_FLOW_OK is returned.
return message;
}
+/* call with jbuf lock held */
+static GstMessage *
+new_drop_message (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
+ GstClockTime timestamp, DropMessageReason reason)
+{
+
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstMessage *drop_msg = NULL;
+ GstStructure *s;
+ GstClockTime current_time;
+ GstClockTime time_diff;
+ const gchar *reason_str;
+
+ current_time = get_current_running_time (jitterbuffer);
+ time_diff = current_time - priv->last_drop_msg_timestamp;
+
+ if (reason == REASON_TOO_LATE) {
+ priv->num_too_late++;
+ reason_str = "too-late";
+ } else if (reason == REASON_DROP_ON_LATENCY) {
+ priv->num_drop_on_latency++;
+ reason_str = "drop-on-latency";
+ } else {
+ GST_WARNING_OBJECT (jitterbuffer, "Invalid reason for drop message");
+ return drop_msg;
+ }
+
+ /* Only create new drop_msg if time since last drop_msg is larger that
+ * that the set interval, or if it is the first drop message posted */
+ if ((time_diff >= priv->drop_messages_interval_ms * GST_MSECOND) ||
+ (priv->last_drop_msg_timestamp == GST_CLOCK_TIME_NONE)) {
+
+ s = gst_structure_new ("drop-msg",
+ "seqnum", G_TYPE_UINT, seqnum,
+ "timestamp", GST_TYPE_CLOCK_TIME, timestamp,
+ "reason", G_TYPE_STRING, reason_str,
+ "num-too-late", G_TYPE_UINT, priv->num_too_late,
+ "num-drop-on-latency", G_TYPE_UINT, priv->num_drop_on_latency, NULL);
+
+ priv->last_drop_msg_timestamp = current_time;
+ priv->num_too_late = 0;
+ priv->num_drop_on_latency = 0;
+ drop_msg = gst_message_new_element (GST_OBJECT (jitterbuffer), s);
+ }
+ return drop_msg;
+}
+
+
+static inline GstClockTimeDiff
+timeout_offset (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ return priv->ts_offset + priv->out_offset + priv->latency_ns;
+}
+
+static inline GstClockTime
+get_pts_timeout (const RtpTimer * timer)
+{
+ if (timer->timeout == -1)
+ return -1;
+
+ return timer->timeout - timer->offset;
+}
+
+static void
+update_timer_offsets (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
+ GstClockTimeDiff new_offset = timeout_offset (jitterbuffer);
+
+ while (test) {
+ if (test->type != RTP_TIMER_EXPECTED) {
+ test->timeout = get_pts_timeout (test) + new_offset;
+ test->offset = new_offset;
+ /* as we apply the offset on all timers, the order of timers won't
+ * change and we can skip updating the timer queue */
+ }
+
+ test = rtp_timer_get_next (test);
+ }
+}
+
static void
update_offset (GstRtpJitterBuffer * jitterbuffer)
{
priv->ts_offset += priv->ts_offset_remainder;
priv->ts_offset_remainder = 0;
}
+
+ update_timer_offsets (jitterbuffer);
}
}
return timestamp;
}
-static TimerQueue *
-timer_queue_new (void)
-{
- TimerQueue *queue;
-
- queue = g_slice_new (TimerQueue);
- queue->timers = g_queue_new ();
- queue->hashtable = g_hash_table_new (NULL, NULL);
-
- return queue;
-}
-
-static void
-timer_queue_free (TimerQueue * queue)
-{
- if (!queue)
- return;
-
- g_hash_table_destroy (queue->hashtable);
- g_queue_free_full (queue->timers, g_free);
- g_slice_free (TimerQueue, queue);
-}
-
-static void
-timer_queue_append (TimerQueue * queue, const TimerData * timer,
- GstClockTime timeout, gboolean lost)
-{
- TimerData *copy;
-
- copy = g_memdup (timer, sizeof (*timer));
- copy->timeout = timeout;
- copy->type = lost ? TIMER_TYPE_LOST : TIMER_TYPE_EXPECTED;
- copy->idx = -1;
-
- GST_LOG ("Append rtx-stats timer #%d, %" GST_TIME_FORMAT,
- copy->seqnum, GST_TIME_ARGS (copy->timeout));
- g_queue_push_tail (queue->timers, copy);
- g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (copy->seqnum), copy);
-}
-
-static void
-timer_queue_clear_until (TimerQueue * queue, GstClockTime timeout)
-{
- TimerData *test;
-
- test = g_queue_peek_head (queue->timers);
- while (test && test->timeout < timeout) {
- GST_LOG ("Pop rtx-stats timer #%d, %" GST_TIME_FORMAT " < %"
- GST_TIME_FORMAT, test->seqnum, GST_TIME_ARGS (test->timeout),
- GST_TIME_ARGS (timeout));
- g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (test->seqnum));
- g_free (g_queue_pop_head (queue->timers));
- test = g_queue_peek_head (queue->timers);
- }
-}
-
-static TimerData *
-timer_queue_find (TimerQueue * queue, guint16 seqnum)
-{
- return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum));
-}
-
-static TimerData *
-find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- TimerData *timer = NULL;
- gint i, len;
-
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- TimerData *test = &g_array_index (priv->timers, TimerData, i);
- if (test->seqnum == seqnum) {
- timer = test;
- break;
- }
- }
- return timer;
-}
-
static void
unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
{
}
}
-static GstClockTime
-get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime test_timeout;
-
- if ((test_timeout = timer->timeout) == -1)
- return -1;
-
- if (timer->type != TIMER_TYPE_EXPECTED) {
- /* add our latency and offset to get output times. */
- test_timeout = apply_offset (jitterbuffer, test_timeout);
- test_timeout += priv->latency_ns;
- }
- return test_timeout;
-}
-
-static void
-recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-
- if (priv->clock_id) {
- GstClockTime timeout = get_timeout (jitterbuffer, timer);
-
- GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
-
- if (timeout == -1 || timeout < priv->timer_timeout)
- unschedule_current_timer (jitterbuffer);
- }
-}
-
-static TimerData *
-add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
- guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
- GstClockTime duration)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- TimerData *timer;
- gint len;
-
- GST_DEBUG_OBJECT (jitterbuffer,
- "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
- GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
- GST_TIME_ARGS (delay));
-
- len = priv->timers->len;
- g_array_set_size (priv->timers, len + 1);
- timer = &g_array_index (priv->timers, TimerData, len);
- timer->idx = len;
- timer->type = type;
- timer->seqnum = seqnum;
- timer->num = num;
- timer->timeout = timeout + delay;
- timer->duration = duration;
- if (type == TIMER_TYPE_EXPECTED) {
- timer->rtx_base = timeout;
- timer->rtx_delay = delay;
- timer->rtx_retry = 0;
- }
- timer->rtx_last = GST_CLOCK_TIME_NONE;
- timer->num_rtx_retry = 0;
- timer->num_rtx_received = 0;
- recalculate_timer (jitterbuffer, timer);
- JBUF_SIGNAL_TIMER (priv);
-
- return timer;
-}
-
static void
-reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
- guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
+update_current_timer (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- gboolean seqchange, timechange;
- guint16 oldseq;
- GstClockTime new_timeout;
+ RtpTimer *timer;
- oldseq = timer->seqnum;
- new_timeout = timeout + delay;
- seqchange = oldseq != seqnum;
- timechange = timer->timeout != new_timeout;
+ timer = rtp_timer_queue_peek_earliest (priv->timers);
- if (!seqchange && !timechange) {
- GST_DEBUG_OBJECT (jitterbuffer,
- "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT
- "), skipping", oldseq, GST_TIME_ARGS (timer->timeout));
+ /* we never need to wakeup the timer thread when there is no more timers, if
+ * it was waiting on a clock id, it will simply do later and then wait on
+ * the conditions */
+ if (timer == NULL) {
+ GST_DEBUG_OBJECT (jitterbuffer, "no more timers");
return;
}
- GST_DEBUG_OBJECT (jitterbuffer,
- "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT
- "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum,
- GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout));
-
- timer->timeout = new_timeout;
- timer->seqnum = seqnum;
- if (reset) {
- GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT
- "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay),
- GST_TIME_ARGS (delay));
- timer->rtx_base = timeout;
- timer->rtx_delay = delay;
- timer->rtx_retry = 0;
- }
- if (seqchange) {
- timer->num_rtx_retry = 0;
- timer->num_rtx_received = 0;
- }
-
- if (priv->clock_id) {
- /* we changed the seqnum and there is a timer currently waiting with this
- * seqnum, unschedule it */
- if (seqchange && priv->timer_seqnum == oldseq)
- unschedule_current_timer (jitterbuffer);
- /* we changed the time, check if it is earlier than what we are waiting
- * for and unschedule if so */
- else if (timechange)
- recalculate_timer (jitterbuffer, timer);
- }
-}
-
-static TimerData *
-set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
- guint16 seqnum, GstClockTime timeout)
-{
- TimerData *timer;
-
- /* find the seqnum timer */
- timer = find_timer (jitterbuffer, seqnum);
- if (timer == NULL) {
- timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
- } else {
- reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
- }
- return timer;
-}
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting till %" GST_TIME_FORMAT
+ " and earliest timeout is at %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (priv->timer_timeout), GST_TIME_ARGS (timer->timeout));
-static void
-remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- guint idx;
+ /* wakeup the timer thread in case the timer queue was empty */
+ JBUF_SIGNAL_TIMER (priv);
- if (timer->idx == -1)
+ /* no need to wait if the current wait is earlier or later */
+ if (timer->timeout != -1 && timer->timeout >= priv->timer_timeout)
return;
- if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
- unschedule_current_timer (jitterbuffer);
-
- idx = timer->idx;
- GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
- g_array_remove_index_fast (priv->timers, idx);
- timer->idx = idx;
-
- JBUF_SIGNAL_TIMER (priv);
-}
-
-static void
-remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
- g_array_set_size (priv->timers, 0);
+ /* for other cases, force a reschedule of the timer thread */
unschedule_current_timer (jitterbuffer);
- JBUF_SIGNAL_TIMER (priv);
}
/* get the extra delay to wait before sending RTX */
GstClockTime delay;
if (priv->rtx_delay == -1) {
+ /* the maximum delay for any RTX-packet is given by the latency, since
+ anything after that is considered lost. For various calulcations,
+ (given large avg_jitter and/or packet_spacing), the resulting delay
+ could exceed the configured latency, ending up issuing an RTX-request
+ that would never arrive in time. To help this we cap the delay
+ for any RTX with the last possible time it could still arrive in time. */
+ GstClockTime delay_max = (priv->latency_ns > priv->avg_rtx_rtt) ?
+ priv->latency_ns - priv->avg_rtx_rtt : priv->latency_ns;
+
if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
delay = DEFAULT_AUTO_RTX_DELAY;
} else {
* packet spacing is a good margin */
delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
}
+
+ delay = MIN (delay_max, delay);
} else {
delay = priv->rtx_delay * GST_MSECOND;
}
return delay;
}
-/* Check if packet with seqnum is already considered definitely lost by being
- * part of a "lost timer" for multiple packets */
-static gboolean
-already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- gint i, len;
-
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- TimerData *test = &g_array_index (priv->timers, TimerData, i);
- gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
-
- if (test->num > 1 && test->type == TIMER_TYPE_LOST && gap >= 0 &&
- gap < test->num) {
- GST_DEBUG ("seqnum #%d already considered definitely lost (#%d->#%d)",
- seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff);
- return TRUE;
- }
- }
-
- return FALSE;
-}
-
/* we just received a packet with seqnum and dts.
*
* First check for old seqnum that we are still expecting. If the gap with the
static void
update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
GstClockTime dts, GstClockTime pts, gboolean do_next_seqnum,
- gboolean is_rtx, TimerData * timer)
+ gboolean is_rtx, RtpTimer * timer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ gboolean is_stats_timer = FALSE;
+
+ if (timer && rtp_timer_queue_find (priv->rtx_stats_timers, timer->seqnum))
+ is_stats_timer = TRUE;
- /* go through all timers and unschedule the ones with a large gap */
+ /* schedule immediatly expected timer which exceed the maximum RTX delay
+ * reorder configuration */
if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
- gint i, len;
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- TimerData *test = &g_array_index (priv->timers, TimerData, i);
+ RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
+ while (test) {
gint gap;
+ /* filter the timer type to speed up this loop */
+ if (test->type != RTP_TIMER_EXPECTED) {
+ test = rtp_timer_get_next (test);
+ continue;
+ }
+
gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
test->type, test->seqnum, seqnum, gap);
- if (gap > priv->rtx_delay_reorder) {
- /* max gap, we exceeded the max reorder distance and we don't expect the
- * missing packet to be this reordered */
- if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
- reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
- }
+ /* if this expected packet have a smaller gap then the configured one,
+ * then earlier timer are not expected to have bigger gap as the timer
+ * queue is ordered */
+ if (gap <= priv->rtx_delay_reorder)
+ break;
+
+ /* max gap, we exceeded the max reorder distance and we don't expect the
+ * missing packet to be this reordered */
+ if (test->num_rtx_retry == 0 && test->type == RTP_TIMER_EXPECTED)
+ rtp_timer_queue_update_timer (priv->timers, test, test->seqnum,
+ -1, 0, 0, FALSE);
+
+ test = rtp_timer_get_next (test);
}
}
do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
&& priv->do_retransmission && priv->rtx_next_seqnum;
- if (timer && timer->type != TIMER_TYPE_DEADLINE) {
+ if (timer && timer->type != RTP_TIMER_DEADLINE) {
if (timer->num_rtx_retry > 0) {
if (is_rtx) {
update_rtx_stats (jitterbuffer, timer, dts, TRUE);
do_next_seqnum = FALSE;
}
- if (!is_rtx || timer->num_rtx_retry > 1) {
+ if (!is_stats_timer && (!is_rtx || timer->num_rtx_retry > 1)) {
+ RtpTimer *stats_timer = rtp_timer_dup (timer);
/* Store timer in order to record stats when/if the retransmitted
* packet arrives. We should also store timer information if we've
* requested retransmission more than once since we may receive
* several retransmitted packets. For accuracy we should update the
* stats also when the redundant retransmitted packets arrives. */
- timer_queue_append (priv->rtx_stats_timers, timer,
- pts + priv->rtx_stats_timeout * GST_MSECOND, FALSE);
+ stats_timer->timeout = pts + priv->rtx_stats_timeout * GST_MSECOND;
+ stats_timer->type = RTP_TIMER_EXPECTED;
+ rtp_timer_queue_insert (priv->rtx_stats_timers, stats_timer);
}
}
}
GST_TIME_ARGS (expected), GST_TIME_ARGS (delay),
GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter));
- if (timer) {
- timer->type = TIMER_TYPE_EXPECTED;
- reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
- delay, TRUE);
+ if (timer && !is_stats_timer) {
+ timer->type = RTP_TIMER_EXPECTED;
+ rtp_timer_queue_update_timer (priv->timers, timer, priv->next_in_seqnum,
+ expected, delay, 0, TRUE);
} else {
- add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
+ rtp_timer_queue_set_expected (priv->timers, priv->next_in_seqnum,
expected, delay, priv->packet_spacing);
}
- } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
+ } else if (timer && timer->type != RTP_TIMER_DEADLINE && !is_stats_timer) {
/* if we had a timer, remove it, we don't know when to expect the next
* packet. */
- remove_timer (jitterbuffer, timer);
+ rtp_timer_queue_unschedule (priv->timers, timer);
+ rtp_timer_free (timer);
}
}
}
static void
+insert_lost_event (GstRtpJitterBuffer * jitterbuffer,
+ guint16 seqnum, guint lost_packets, GstClockTime timestamp,
+ GstClockTime duration, guint num_rtx_retry)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstEvent *event = NULL;
+ guint next_in_seqnum;
+
+ /* we had a gap and thus we lost some packets. Create an event for this. */
+ if (lost_packets > 1)
+ GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
+ seqnum + lost_packets - 1);
+ else
+ GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
+
+ priv->num_lost += lost_packets;
+ priv->num_rtx_failed += num_rtx_retry;
+
+ next_in_seqnum = (seqnum + lost_packets) & 0xffff;
+
+ /* we now only accept seqnum bigger than this */
+ if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
+ priv->next_in_seqnum = next_in_seqnum;
+ priv->last_in_pts = timestamp;
+ }
+
+ /* Avoid creating events if we don't need it. Note that we still need to create
+ * the lost *ITEM* since it will be used to notify the outgoing thread of
+ * lost items (so that we can set discont flags and such) */
+ if (priv->do_lost) {
+ /* create packet lost event */
+ if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
+ duration = priv->packet_spacing;
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+ gst_structure_new ("GstRTPPacketLost",
+ "seqnum", G_TYPE_UINT, (guint) seqnum,
+ "timestamp", G_TYPE_UINT64, timestamp,
+ "duration", G_TYPE_UINT64, duration,
+ "retry", G_TYPE_UINT, num_rtx_retry, NULL));
+ }
+ if (rtp_jitter_buffer_append_lost_event (priv->jbuf,
+ event, seqnum, lost_packets))
+ JBUF_SIGNAL_EVENT (priv);
+}
+
+static void
calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
guint16 seqnum, GstClockTime pts, gint gap)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime duration, expected_pts, delay;
- TimerType type;
+ GstClockTime duration, expected_pts;
gboolean equidistant = priv->equidistant > 0;
+ GstClockTime last_in_pts = priv->last_in_pts;
GST_DEBUG_OBJECT (jitterbuffer,
"pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
- GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts));
+ GST_TIME_ARGS (pts), GST_TIME_ARGS (last_in_pts));
if (pts == GST_CLOCK_TIME_NONE) {
GST_WARNING_OBJECT (jitterbuffer, "Have no PTS");
if (equidistant) {
GstClockTime total_duration;
/* the total duration spanned by the missing packets */
- if (pts >= priv->last_in_pts)
- total_duration = pts - priv->last_in_pts;
+ if (pts >= last_in_pts)
+ total_duration = pts - last_in_pts;
else
total_duration = 0;
GST_TIME_ARGS (priv->latency_ns), lost_packets,
GST_TIME_ARGS (gap_time));
- /* this timer will fire immediately and the lost event will be pushed from
- * the timer thread */
+ /* this multi-lost-packet event will be inserted directly into the packet-queue
+ for immediate processing */
if (lost_packets > 0) {
- add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
- priv->last_in_pts + duration, 0, gap_time);
+ RtpTimer *timer;
+ GstClockTime timestamp =
+ apply_offset (jitterbuffer, last_in_pts + duration);
+ insert_lost_event (jitterbuffer, expected, lost_packets, timestamp,
+ gap_time, 0);
+
+ timer = rtp_timer_queue_find (priv->timers, expected);
+ if (timer && timer->type == RTP_TIMER_EXPECTED) {
+ if (timer->queued)
+ rtp_timer_queue_unschedule (priv->timers, timer);
+ GST_DEBUG_OBJECT (jitterbuffer, "removing timer for seqnum #%u",
+ expected);
+ rtp_timer_free (timer);
+ }
+
expected += lost_packets;
- priv->last_in_pts += gap_time;
+ last_in_pts += gap_time;
}
}
- expected_pts = priv->last_in_pts + duration;
+ expected_pts = last_in_pts + duration;
} else {
/* If we cannot assume equidistant packet spacing, the only thing we now
* for sure is that the missing packets have expected pts not later than
expected_pts = pts;
}
- delay = 0;
-
if (priv->do_retransmission) {
- TimerData *timer = find_timer (jitterbuffer, expected);
-
- type = TIMER_TYPE_EXPECTED;
- delay = get_rtx_delay (priv);
+ RtpTimer *timer = rtp_timer_queue_find (priv->timers, expected);
+ GstClockTime rtx_delay = get_rtx_delay (priv);
/* if we had a timer for the first missing packet, update it. */
- if (timer && timer->type == TIMER_TYPE_EXPECTED) {
+ if (timer && timer->type == RTP_TIMER_EXPECTED) {
GstClockTime timeout = timer->timeout;
+ GstClockTime delay = MAX (rtx_delay, pts - expected_pts);
timer->duration = duration;
if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) {
- reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts,
- delay, TRUE);
+ rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
+ expected_pts, delay, 0, TRUE);
}
expected++;
expected_pts += duration;
}
- } else {
- type = TIMER_TYPE_LOST;
- }
- while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
- add_timer (jitterbuffer, type, expected, 0, expected_pts, delay, duration);
- expected_pts += duration;
- expected++;
+ while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
+ /* minimum delay the expected-timer has "waited" is the elapsed time
+ * since expected arrival of the missing packet */
+ GstClockTime delay = MAX (rtx_delay, pts - expected_pts);
+ rtp_timer_queue_set_expected (priv->timers, expected, expected_pts,
+ delay, duration);
+ expected_pts += duration;
+ expected++;
+ }
+ } else {
+ while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
+ rtp_timer_queue_set_lost (priv->timers, expected, expected_pts,
+ duration, timeout_offset (jitterbuffer));
+ expected_pts += duration;
+ expected++;
+ }
}
}
priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
GST_LOG_OBJECT (jitterbuffer,
- "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
- ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
- GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
+ "dtsdiff %" GST_STIME_FORMAT " rtptime %" GST_STIME_FORMAT
+ ", clock-rate %d, diff %" GST_STIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
+ GST_STIME_ARGS (dtsdiff), GST_STIME_ARGS (rtpdiffns), priv->clock_rate,
+ GST_STIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
return;
GstFlowReturn ret = GST_FLOW_OK;
GList *events = NULL, *l;
GList *buffers;
- gboolean head;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf,
- (GFunc) free_item_and_retain_events, &events);
+ (GFunc) free_item_and_retain_sticky_events, &events);
rtp_jitter_buffer_reset_skew (priv->jbuf);
- remove_all_timers (jitterbuffer);
+ rtp_timer_queue_remove_all (priv->timers);
priv->discont = TRUE;
priv->last_popped_seqnum = -1;
*/
events = g_list_reverse (events);
for (l = events; l; l = l->next) {
- RTPJitterBufferItem *item;
-
- item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+ rtp_jitter_buffer_append_event (priv->jbuf, l->data);
}
g_list_free (events);
{
GstRtpJitterBufferPrivate *priv;
RTPJitterBufferItem *item;
- TimerData *timer;
+ RtpTimer *timer;
priv = jitterbuffer->priv;
if (!item)
return FALSE;
- timer = find_timer (jitterbuffer, item->seqnum);
- if (!timer || timer->type != TIMER_TYPE_DEADLINE)
+ timer = rtp_timer_queue_find (priv->timers, item->seqnum);
+ if (!timer || timer->type != RTP_TIMER_DEADLINE)
return FALSE;
if (rtp_jitter_buffer_can_fast_start (priv->jbuf,
GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
priv->faststart_min_packets);
timer->timeout = -1;
+ rtp_timer_queue_reschedule (priv->timers, timer);
return TRUE;
}
GstClockTime dts, pts;
guint64 latency_ts;
gboolean head;
+ gboolean duplicate;
gint percent = -1;
guint8 pt;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
gboolean do_next_seqnum = FALSE;
- RTPJitterBufferItem *item;
GstMessage *msg = NULL;
+ GstMessage *drop_msg = NULL;
gboolean estimated_dts = FALSE;
gint32 packet_rate, max_dropout, max_misorder;
- TimerData *timer = NULL;
+ RtpTimer *timer = NULL;
+ gboolean is_rtx;
jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
rtptime = gst_rtp_buffer_get_timestamp (&rtp);
gst_rtp_buffer_unmap (&rtp);
+ is_rtx = GST_BUFFER_IS_RETRANSMISSION (buffer);
+
/* make sure we have PTS and DTS set */
pts = GST_BUFFER_PTS (buffer);
dts = GST_BUFFER_DTS (buffer);
GST_DEBUG_OBJECT (jitterbuffer,
"Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
- seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer),
- GST_BUFFER_IS_RETRANSMISSION (buffer));
+ seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer), is_rtx);
JBUF_LOCK_CHECK (priv, out_flushing);
if (G_UNLIKELY (priv->eos))
goto have_eos;
- if (!GST_BUFFER_IS_RETRANSMISSION (buffer))
+ if (!is_rtx)
calculate_jitter (jitterbuffer, dts, rtptime);
if (priv->seqnum_base != -1) {
expected = priv->next_in_seqnum;
- packet_rate =
- gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx, seqnum, rtptime);
+ /* don't update packet-rate based on RTX, as those arrive highly unregularly */
+ if (!is_rtx) {
+ packet_rate = gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx,
+ seqnum, rtptime);
+ GST_TRACE_OBJECT (jitterbuffer, "updated packet_rate: %d", packet_rate);
+ }
max_dropout =
gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
priv->max_dropout_time);
max_misorder =
gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
priv->max_misorder_time);
- GST_TRACE_OBJECT (jitterbuffer,
- "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
+ GST_TRACE_OBJECT (jitterbuffer, "max_dropout: %d, max_misorder: %d",
max_dropout, max_misorder);
+ timer = rtp_timer_queue_find (priv->timers, seqnum);
+ if (is_rtx) {
+ if (G_UNLIKELY (!priv->do_retransmission))
+ goto unsolicited_rtx;
+
+ if (!timer)
+ timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum);
+
+ /* If the first buffer is an (old) rtx, e.g. from before a reset, or
+ * already lost, ignore it */
+ if (!timer || expected == -1)
+ goto unsolicited_rtx;
+ }
+
/* now check against our expected seqnum */
if (G_UNLIKELY (expected == -1)) {
GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
/* calculate a pts based on rtptime and arrival time (dts) */
pts =
rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
- rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
+ rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)),
+ 0, FALSE);
+
+ if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pts))) {
+ /* A valid timestamp cannot be calculated, discard packet */
+ goto discard_invalid;
+ }
/* we don't know what the next_in_seqnum should be, wait for the last
* possible moment to push this buffer, maybe we get an earlier seqnum
* while we wait */
- set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts);
+ rtp_timer_queue_set_deadline (priv->timers, seqnum, pts,
+ timeout_offset (jitterbuffer));
do_next_seqnum = TRUE;
/* take rtptime and pts to calculate packet spacing */
GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
expected, seqnum, gap);
- if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) {
+ if (G_UNLIKELY (gap > 0 &&
+ rtp_timer_queue_length (priv->timers) >= max_dropout)) {
/* If we have timers for more than RTP_MAX_DROPOUT packets
* pending this means that we have a huge gap overall. We can
* reset the jitterbuffer at this point because there's
* sensible with the past data. Just try again from the
* next packet */
GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
- priv->timers->len, max_dropout);
- gst_buffer_unref (buffer);
+ rtp_timer_queue_length (priv->timers), max_dropout);
+ g_queue_insert_sorted (&priv->gap_packets, buffer,
+ (GCompareDataFunc) compare_buffer_seqnum, NULL);
return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
}
/* Special handling of large gaps */
- if ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout)) {
+ if (!is_rtx && ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout))) {
gboolean reset = handle_big_gap_buffer (jitterbuffer, buffer, pt, seqnum,
gap, max_dropout, max_misorder);
if (reset) {
/* If we estimated the DTS, don't consider it in the clock skew calculations */
pts =
rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
- rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
+ rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)),
+ gap, is_rtx);
+
+ if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pts))) {
+ /* A valid timestamp cannot be calculated, discard packet */
+ goto discard_invalid;
+ }
if (G_LIKELY (gap == 0)) {
/* packet is expected */
priv->next_in_seqnum = (seqnum + 1) & 0xffff;
}
- timer = find_timer (jitterbuffer, seqnum);
- if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
- if (!timer)
- timer = timer_queue_find (priv->rtx_stats_timers, seqnum);
- if (timer)
- timer->num_rtx_received++;
+ if (is_rtx)
+ timer->num_rtx_received++;
+
+ /* At 2^15, we would detect a seqnum rollover too early, therefore
+ * limit the queue size. But let's not limit it to a number that is
+ * too small to avoid emptying it needlessly if there is a spurious huge
+ * sequence number, let's allow at least 10k packets in any case. */
+ while (rtp_jitter_buffer_is_full (priv->jbuf) &&
+ priv->srcresult == GST_FLOW_OK) {
+ RtpTimer *timer = rtp_timer_queue_peek_earliest (priv->timers);
+ while (timer) {
+ timer->timeout = -1;
+ if (timer->type == RTP_TIMER_DEADLINE)
+ break;
+ timer = rtp_timer_get_next (timer);
+ }
+
+ update_current_timer (jitterbuffer);
+ JBUF_WAIT_QUEUE (priv);
+ if (priv->srcresult != GST_FLOW_OK)
+ goto out_flushing;
}
/* let's check if this buffer is too late, we can only accept packets with
/* priv->last_popped_seqnum >= seqnum, we're too late. */
if (G_UNLIKELY (gap <= 0)) {
if (priv->do_retransmission) {
- if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) {
+ if (is_rtx && timer) {
update_rtx_stats (jitterbuffer, timer, dts, FALSE);
/* Only count the retranmitted packet too late if it has been
* considered lost. If the original packet arrived before the
* retransmitted we just count it as a duplicate. */
- if (timer->type != TIMER_TYPE_LOST)
+ if (timer->type != RTP_TIMER_LOST)
goto rtx_duplicate;
}
}
}
}
- if (already_lost (jitterbuffer, seqnum))
- goto already_lost;
-
/* let's drop oldest packet if the queue is already full and drop-on-latency
* is set. We can only do this when there actually is a latency. When no
* latency is set, we just pump it in the queue and let the other end push it
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
old_item);
priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
- free_item (old_item);
+ if (priv->post_drop_messages) {
+ drop_msg =
+ new_drop_message (jitterbuffer, old_item->seqnum, old_item->pts,
+ REASON_DROP_ON_LATENCY);
+ }
+ rtp_jitter_buffer_free_item (old_item);
}
/* we might have removed some head buffers, signal the pushing thread to
* see if it can push now */
* later. The code above always sets dts to pts or the other way around if
* any of those is valid in the buffer, so we know that if we estimated the
* dts that both are unknown */
- if (estimated_dts)
- item =
- alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE,
- pts, seqnum, 1, rtptime);
- else
- item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
+ head = rtp_jitter_buffer_append_buffer (priv->jbuf, buffer,
+ estimated_dts ? GST_CLOCK_TIME_NONE : dts, pts, seqnum, rtptime,
+ &duplicate, &percent);
/* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
- if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, &head,
- &percent))) {
- if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer)
+ if (G_UNLIKELY (duplicate)) {
+ if (is_rtx && timer)
update_rtx_stats (jitterbuffer, timer, dts, FALSE);
goto duplicate;
}
head = TRUE;
/* update timers */
- update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum,
- GST_BUFFER_IS_RETRANSMISSION (buffer), timer);
+ update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum, is_rtx, timer);
/* we had an unhandled SR, handle it now */
if (priv->last_sr)
/* signal addition of new buffer when the _loop is waiting. */
if (G_LIKELY (priv->active))
JBUF_SIGNAL_EVENT (priv);
-
- /* let's unschedule and unblock any waiting buffers. We only want to do this
- * when the head buffer changed */
- if (G_UNLIKELY (priv->clock_id)) {
- GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
- unschedule_current_timer (jitterbuffer);
- }
}
GST_DEBUG_OBJECT (jitterbuffer,
msg = check_buffering_percent (jitterbuffer, percent);
finished:
+ update_current_timer (jitterbuffer);
JBUF_UNLOCK (priv);
if (msg)
gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
+ if (drop_msg)
+ gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), drop_msg);
return ret;
GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
" popped, dropping", seqnum, priv->last_popped_seqnum);
priv->num_late++;
- gst_buffer_unref (buffer);
- goto finished;
- }
-already_lost:
- {
- GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as it was already "
- "considered lost", seqnum);
- priv->num_late++;
+ if (priv->post_drop_messages) {
+ drop_msg = new_drop_message (jitterbuffer, seqnum, pts, REASON_TOO_LATE);
+ }
gst_buffer_unref (buffer);
goto finished;
}
GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
seqnum);
priv->num_duplicates++;
- free_item (item);
goto finished;
}
rtx_duplicate:
gst_buffer_unref (buffer);
goto finished;
}
+unsolicited_rtx:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Unsolicited RTX packet #%d detected, dropping", seqnum);
+ gst_buffer_unref (buffer);
+ goto finished;
+ }
+discard_invalid:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "cannot calculate a valid pts for #%d (rtx: %d), discard",
+ seqnum, is_rtx);
+ gst_buffer_unref (buffer);
+ goto finished;
+ }
}
/* FIXME: hopefully we can do something more efficient here, especially when
GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
if (estimated != -1 && priv->estimated_eos != estimated) {
- set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
+ rtp_timer_queue_set_eos (priv->timers, estimated,
+ timeout_offset (jitterbuffer));
priv->estimated_eos = estimated;
}
}
if (type == ITEM_TYPE_EVENT && outevent &&
GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
g_assert (priv->eos);
- while (priv->timers->len > 0) {
+ while (rtp_timer_queue_length (priv->timers) > 0) {
/* Stopping timers */
unschedule_current_timer (jitterbuffer);
JBUF_WAIT_TIMER (priv);
JBUF_UNLOCK (priv);
item->data = NULL;
- free_item (item);
+ rtp_jitter_buffer_free_item (item);
if (msg)
gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
priv->num_pushed++;
+ GST_BUFFER_DTS (outbuf) = GST_CLOCK_TIME_NONE;
result = gst_pad_push (priv->srcpad, outbuf);
JBUF_LOCK_CHECK (priv, out_flushing);
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum);
item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
- free_item (item);
+ rtp_jitter_buffer_free_item (item);
result = GST_FLOW_OK;
} else {
/* the chain function has scheduled timers to request retransmission or
"Sequence number GAP detected: expected %d instead of %d (%d missing)",
next_seqnum, seqnum, gap);
/* if we have reached EOS, just keep processing */
- if (priv->eos) {
+ /* Also do the same if we block input because the JB is full */
+ if (priv->eos || rtp_jitter_buffer_is_full (priv->jbuf)) {
result = pop_and_push_next (jitterbuffer, seqnum);
result = GST_FLOW_OK;
} else {
}
static void
-update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer,
GstClockTime dts, gboolean success)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
/* the timeout for when we expected a packet expired */
static gboolean
-do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
- GstClockTime now)
+do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
+ GstClockTime now, GQueue * events)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstEvent *event;
GstClockTime rtx_retry_period;
GstClockTime rtx_retry_timeout;
GstClock *clock;
+ GstClockTimeDiff offset = 0;
GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
"deadline", G_TYPE_UINT, rtx_deadline_ms,
"packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
"avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
+ g_queue_push_tail (events, event);
GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
priv->num_rtx_requests++;
/* calculate the timeout for the next retransmission attempt */
timer->rtx_retry += rtx_retry_timeout;
- GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
+ GST_DEBUG_OBJECT (jitterbuffer, "timer #%i base %" GST_TIME_FORMAT ", delay %"
GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
- GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
- GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
+ timer->seqnum, GST_TIME_ARGS (timer->rtx_base),
+ GST_TIME_ARGS (timer->rtx_delay), GST_TIME_ARGS (timer->rtx_retry),
+ timer->num_rtx_retry);
if ((priv->rtx_max_retries != -1
&& timer->num_rtx_retry >= priv->rtx_max_retries)
|| (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)
|| (timer->rtx_base + rtx_retry_period < now)) {
- GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
+ GST_DEBUG_OBJECT (jitterbuffer, "reschedule #%i as LOST timer",
+ timer->seqnum);
/* too many retransmission request, we now convert the timer
* to a lost timer, leave the num_rtx_retry as it is for stats */
- timer->type = TIMER_TYPE_LOST;
+ timer->type = RTP_TIMER_LOST;
timer->rtx_delay = 0;
timer->rtx_retry = 0;
+ offset = timeout_offset (jitterbuffer);
}
- reschedule_timer (jitterbuffer, timer, timer->seqnum,
- timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
-
- JBUF_UNLOCK (priv);
- gst_pad_push_event (priv->sinkpad, event);
- JBUF_LOCK (priv);
+ rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
+ timer->rtx_base + timer->rtx_retry, timer->rtx_delay, offset, FALSE);
return FALSE;
}
/* a packet is lost */
static gboolean
-do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
GstClockTime now)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
- gboolean head;
- GstEvent *event = NULL;
- RTPJitterBufferItem *item;
-
- seqnum = timer->seqnum;
- lost_packets = MAX (timer->num, 1);
- num_rtx_retry = timer->num_rtx_retry;
-
- /* we had a gap and thus we lost some packets. Create an event for this. */
- if (lost_packets > 1)
- GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
- seqnum + lost_packets - 1);
- else
- GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
-
- priv->num_lost += lost_packets;
- priv->num_rtx_failed += num_rtx_retry;
-
- next_in_seqnum = (seqnum + lost_packets) & 0xffff;
-
- /* we now only accept seqnum bigger than this */
- if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
- priv->next_in_seqnum = next_in_seqnum;
- priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout);
- }
+ GstClockTime timestamp;
- /* Avoid creating events if we don't need it. Note that we still need to create
- * the lost *ITEM* since it will be used to notify the outgoing thread of
- * lost items (so that we can set discont flags and such) */
- if (priv->do_lost) {
- GstClockTime duration, timestamp;
- /* create paket lost event */
- timestamp = apply_offset (jitterbuffer, timer->timeout);
- duration = timer->duration;
- if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
- duration = priv->packet_spacing;
- event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
- gst_structure_new ("GstRTPPacketLost",
- "seqnum", G_TYPE_UINT, (guint) seqnum,
- "timestamp", G_TYPE_UINT64, timestamp,
- "duration", G_TYPE_UINT64, duration,
- "retry", G_TYPE_UINT, num_rtx_retry, NULL));
- }
- item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
- if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL))
- /* Duplicate */
- free_item (item);
+ timestamp = apply_offset (jitterbuffer, get_pts_timeout (timer));
+ insert_lost_event (jitterbuffer, timer->seqnum, 1, timestamp,
+ timer->duration, timer->num_rtx_retry);
if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
/* Store info to update stats if the packet arrives too late */
- timer_queue_append (priv->rtx_stats_timers, timer,
- now + priv->rtx_stats_timeout * GST_MSECOND, TRUE);
+ timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND;
+ timer->type = RTP_TIMER_LOST;
+ rtp_timer_queue_insert (priv->rtx_stats_timers, timer);
+ } else {
+ rtp_timer_free (timer);
}
- remove_timer (jitterbuffer, timer);
-
- if (head)
- JBUF_SIGNAL_EVENT (priv);
return TRUE;
}
static gboolean
-do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
GstClockTime now)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
- remove_timer (jitterbuffer, timer);
+ rtp_timer_free (timer);
if (!priv->eos) {
GstEvent *event;
}
static gboolean
-do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
GstClockTime now)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
* only mess with current ongoing seqnum if still unknown */
if (priv->next_seqnum == -1)
priv->next_seqnum = timer->seqnum;
- remove_timer (jitterbuffer, timer);
+ rtp_timer_free (timer);
JBUF_SIGNAL_EVENT (priv);
return TRUE;
}
static gboolean
-do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
- GstClockTime now)
+do_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
+ GstClockTime now, GQueue * events)
{
gboolean removed = FALSE;
switch (timer->type) {
- case TIMER_TYPE_EXPECTED:
- removed = do_expected_timeout (jitterbuffer, timer, now);
+ case RTP_TIMER_EXPECTED:
+ removed = do_expected_timeout (jitterbuffer, timer, now, events);
break;
- case TIMER_TYPE_LOST:
+ case RTP_TIMER_LOST:
removed = do_lost_timeout (jitterbuffer, timer, now);
break;
- case TIMER_TYPE_DEADLINE:
+ case RTP_TIMER_DEADLINE:
removed = do_deadline_timeout (jitterbuffer, timer, now);
break;
- case TIMER_TYPE_EOS:
+ case RTP_TIMER_EOS:
removed = do_eos_timeout (jitterbuffer, timer, now);
break;
}
return removed;
}
+static void
+push_rtx_events_unlocked (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstEvent *event;
+
+ while ((event = (GstEvent *) g_queue_pop_head (events)))
+ gst_pad_push_event (priv->sinkpad, event);
+}
+
+/* called with JBUF lock
+ *
+ * Pushes all events in @events queue.
+ *
+ * Returns: %TRUE if the timer thread is not longer running
+ */
+static void
+push_rtx_events (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
+ if (events->length == 0)
+ return;
+
+ JBUF_UNLOCK (priv);
+ push_rtx_events_unlocked (jitterbuffer, events);
+ JBUF_LOCK (priv);
+}
+
/* called when we need to wait for the next timeout.
*
* We loop over the array of recorded timeouts and wait for the earliest one.
JBUF_LOCK (priv);
while (priv->timer_running) {
- TimerData *timer = NULL;
- GstClockTime timer_timeout = -1;
- gint i, len;
+ RtpTimer *timer = NULL;
+ GQueue events = G_QUEUE_INIT;
+
+ /* don't produce data in paused */
+ while (priv->blocked) {
+ JBUF_WAIT_TIMER (priv);
+ if (!priv->timer_running)
+ goto stopping;
+ }
/* If we have a clock, update "now" now with the very
* latest running time we have. If timers are unscheduled below we
/* Clear expired rtx-stats timers */
if (priv->do_retransmission)
- timer_queue_clear_until (priv->rtx_stats_timers, now);
-
- /* Iterate "normal" timers */
- len = priv->timers->len;
- for (i = 0; i < len;) {
- TimerData *test = &g_array_index (priv->timers, TimerData, i);
- GstClockTime test_timeout = get_timeout (jitterbuffer, test);
- gboolean save_best = FALSE;
+ rtp_timer_queue_remove_until (priv->rtx_stats_timers, now);
- GST_DEBUG_OBJECT (jitterbuffer,
- "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i,
- test->type, test->seqnum, GST_TIME_ARGS (test_timeout),
- GST_STIME_ARGS ((gint64) (test_timeout - now)));
-
- /* Weed out anything too late */
- if (test->type == TIMER_TYPE_LOST &&
- (test_timeout == -1 || test_timeout <= now)) {
- GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry");
- do_lost_timeout (jitterbuffer, test, now);
- if (!priv->timer_running)
- break;
- /* We don't move the iterator forward since we just removed the current entry,
- * but we update the termination condition */
- len = priv->timers->len;
- } else {
- /* find the smallest timeout */
- if (timer == NULL) {
- save_best = TRUE;
- } else if (timer_timeout == -1) {
- /* we already have an immediate timeout, the new timer must be an
- * immediate timer with smaller seqnum to become the best */
- if (test_timeout == -1
- && (gst_rtp_buffer_compare_seqnum (test->seqnum,
- timer->seqnum) > 0))
- save_best = TRUE;
- } else if (test_timeout == -1) {
- /* first immediate timer */
- save_best = TRUE;
- } else if (test_timeout < timer_timeout) {
- /* earlier timer */
- save_best = TRUE;
- } else if (test_timeout == timer_timeout
- && (gst_rtp_buffer_compare_seqnum (test->seqnum,
- timer->seqnum) > 0)) {
- /* same timer, smaller seqnum */
- save_best = TRUE;
- }
+ /* Iterate expired "normal" timers */
+ while ((timer = rtp_timer_queue_pop_until (priv->timers, now)))
+ do_timeout (jitterbuffer, timer, now, &events);
- if (save_best) {
- GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
- timer = test;
- timer_timeout = test_timeout;
- }
- i++;
- }
- }
- if (timer && !priv->blocked) {
+ timer = rtp_timer_queue_peek_earliest (priv->timers);
+ if (timer) {
GstClock *clock;
GstClockTime sync_time;
GstClockID id;
GstClockReturn ret;
GstClockTimeDiff clock_jitter;
- if (timer_timeout == -1 || timer_timeout <= now || priv->eos) {
- /* We have normally removed all lost timers in the loop above */
- g_assert (timer->type != TIMER_TYPE_LOST);
-
- do_timeout (jitterbuffer, timer, now);
- /* check here, do_timeout could have released the lock */
- if (!priv->timer_running)
- break;
- continue;
- }
+ /* we poped all immediate and due timer, so this should just never
+ * happens */
+ g_assert (GST_CLOCK_TIME_IS_VALID (timer->timeout));
GST_OBJECT_LOCK (jitterbuffer);
clock = GST_ELEMENT_CLOCK (jitterbuffer);
GST_OBJECT_UNLOCK (jitterbuffer);
/* let's just push if there is no clock */
GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
- now = timer_timeout;
+ now = timer->timeout;
+ push_rtx_events (jitterbuffer, &events);
continue;
}
/* prepare for sync against clock */
- sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+ sync_time = timer->timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
/* add latency of peer to get input time */
sync_time += priv->peer_latency;
- GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
- " with sync time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
+ GST_DEBUG_OBJECT (jitterbuffer, "timer #%i sync to timestamp %"
+ GST_TIME_FORMAT " with sync time %" GST_TIME_FORMAT, timer->seqnum,
+ GST_TIME_ARGS (get_pts_timeout (timer)), GST_TIME_ARGS (sync_time));
/* create an entry for the clock */
id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
- priv->timer_timeout = timer_timeout;
+ priv->timer_timeout = timer->timeout;
priv->timer_seqnum = timer->seqnum;
GST_OBJECT_UNLOCK (jitterbuffer);
/* release the lock so that the other end can push stuff or unlock */
JBUF_UNLOCK (priv);
+ push_rtx_events_unlocked (jitterbuffer, &events);
+
ret = gst_clock_id_wait (id, &clock_jitter);
JBUF_LOCK (priv);
+
if (!priv->timer_running) {
+ g_queue_clear_full (&events, (GDestroyNotify) gst_event_unref);
gst_clock_id_unref (id);
priv->clock_id = NULL;
break;
}
if (ret != GST_CLOCK_UNSCHEDULED) {
- now = timer_timeout + MAX (clock_jitter, 0);
+ now = priv->timer_timeout + MAX (clock_jitter, 0);
GST_DEBUG_OBJECT (jitterbuffer,
"sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
GST_STIME_ARGS (clock_jitter));
} else {
GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
}
+
/* and free the entry */
gst_clock_id_unref (id);
priv->clock_id = NULL;
} else {
+ push_rtx_events_unlocked (jitterbuffer, &events);
+
+ /* when draining the timers, the pusher thread will reuse our
+ * condition to wait for completion. Signal that thread before
+ * sleeping again here */
+ if (priv->eos)
+ JBUF_SIGNAL_TIMER (priv);
+
/* no timers, wait for activity */
JBUF_WAIT_TIMER (priv);
}
}
+stopping:
JBUF_UNLOCK (priv);
GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
}
/*
- * This funcion implements the main pushing loop on the source pad.
+ * This function implements the main pushing loop on the source pad.
*
* It first tries to push as many buffers as possible. If there is a seqnum
* mismatch, we wait for the next timeouts.
result = handle_next_buffer (jitterbuffer);
if (G_LIKELY (result == GST_FLOW_WAIT)) {
/* now wait for the next event */
+ JBUF_SIGNAL_QUEUE (priv);
JBUF_WAIT_EVENT (priv, flushing);
result = GST_FLOW_OK;
}
}
}
-/* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
+/* collect the info from the latest RTCP packet and the jitterbuffer sync, do
* some sanity checks and then emit the handle-sync signal with the parameters.
* This function must be called with the LOCK */
static void
}
default:
if (GST_QUERY_IS_SERIALIZED (query)) {
- RTPJitterBufferItem *item;
- gboolean head;
-
JBUF_LOCK_CHECK (priv, out_flushing);
if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
RTP_JITTER_BUFFER_MODE_BUFFER) {
GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
- item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
- if (head)
+ if (rtp_jitter_buffer_append_query (priv->jbuf, query))
JBUF_SIGNAL_EVENT (priv);
JBUF_WAIT_QUERY (priv, out_flushing);
res = priv->last_query;
} else {
priv->ts_offset = g_value_get_int64 (value);
priv->ts_offset_remainder = 0;
+ update_timer_offsets (jitterbuffer);
}
priv->ts_discont = TRUE;
JBUF_UNLOCK (priv);
priv->do_lost = g_value_get_boolean (value);
JBUF_UNLOCK (priv);
break;
+ case PROP_POST_DROP_MESSAGES:
+ JBUF_LOCK (priv);
+ priv->post_drop_messages = g_value_get_boolean (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_DROP_MESSAGES_INTERVAL:
+ JBUF_LOCK (priv);
+ priv->drop_messages_interval_ms = g_value_get_uint (value);
+ JBUF_UNLOCK (priv);
+ break;
case PROP_MODE:
JBUF_LOCK (priv);
rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
g_value_set_boolean (value, priv->do_lost);
JBUF_UNLOCK (priv);
break;
+ case PROP_POST_DROP_MESSAGES:
+ JBUF_LOCK (priv);
+ g_value_set_boolean (value, priv->post_drop_messages);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_DROP_MESSAGES_INTERVAL:
+ JBUF_LOCK (priv);
+ g_value_set_uint (value, priv->drop_messages_interval_ms);
+ JBUF_UNLOCK (priv);
+ break;
case PROP_MODE:
JBUF_LOCK (priv);
g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));