aggregator: add a timeout property determining buffer wait time
authorMatthew Waters <matthew@centricular.com>
Mon, 6 Oct 2014 07:23:03 +0000 (18:23 +1100)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
Determines the amount of time that a pad will wait for a buffer before
being marked unresponsive.

Network sources may fail to produce buffers for an extended period of time,
currently causing the pipeline to stall possibly indefinitely, waiting for
these buffers to appear.

Subclasses should render unresponsive pads with either silence (audio), the
last (video) frame or what makes the most sense in the given context.

libs/gst/base/gstaggregator.c
libs/gst/base/gstaggregator.h

index dce7180..42c0d8e 100644 (file)
@@ -69,6 +69,9 @@
 /*  Might become API */
 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
     const GstTagList * tags, GstTagMergeMode mode);
+static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout);
+static gint64 gst_aggregator_get_timeout (GstAggregator * agg);
+
 
 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 #define GST_CAT_DEFAULT aggregator_debug
@@ -145,6 +148,8 @@ struct _GstAggregatorPadPrivate
   gboolean pending_eos;
   gboolean flushing;
 
+  GstClockID timeout_id;
+
   GMutex event_lock;
   GCond event_cond;
 
@@ -232,6 +237,15 @@ typedef struct
   gboolean one_actually_seeked;
 } EventData;
 
+#define DEFAULT_TIMEOUT        -1
+
+enum
+{
+  PROP_0,
+  PROP_TIMEOUT,
+  PROP_LAST
+};
+
 /**
  * gst_aggregator_iterate_sinkpads:
  * @self: The #GstAggregator
@@ -311,13 +325,18 @@ no_iter:
 }
 
 static inline gboolean
-_check_all_pads_with_data_or_eos (GstAggregator * self,
+_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self,
     GstAggregatorPad * aggpad)
 {
   if (aggpad->buffer || aggpad->eos) {
     return TRUE;
   }
 
+  if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) {
+    /* pad has been deemed unresponsive */
+    return TRUE;
+  }
+
   GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
 
   return FALSE;
@@ -453,8 +472,8 @@ aggregate_func (GstAggregator * self)
 
   GST_LOG_OBJECT (self, "Checking aggregate");
   while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
-          (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
-          NULL) && priv->running) {
+          (GstAggregatorPadForeachFunc)
+          _check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) {
     GST_TRACE_OBJECT (self, "Actually aggregating!");
 
     priv->flow_return = klass->aggregate (self);
@@ -714,10 +733,19 @@ eat:
 }
 
 static gboolean
-_flush_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
+_stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
 {
   _aggpad_flush (pad, self);
 
+  PAD_LOCK_EVENT (pad);
+  /* remove the timeouts */
+  if (pad->priv->timeout_id) {
+    gst_clock_id_unschedule (pad->priv->timeout_id);
+    gst_clock_id_unref (pad->priv->timeout_id);
+    pad->priv->timeout_id = NULL;
+  }
+  PAD_UNLOCK_EVENT (pad);
+
   return TRUE;
 }
 
@@ -727,7 +755,7 @@ _stop (GstAggregator * agg)
   _reset_flow_values (agg);
 
   gst_aggregator_iterate_sinkpads (agg,
-      (GstAggregatorPadForeachFunc) _flush_pad, NULL);
+      (GstAggregatorPadForeachFunc) _stop_pad, NULL);
 
   if (agg->priv->tags)
     gst_tag_list_unref (agg->priv->tags);
@@ -795,6 +823,37 @@ _release_pad (GstElement * element, GstPad * pad)
   QUEUE_PUSH (self);
 }
 
+static gboolean
+_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id,
+    gpointer user_data)
+{
+  GstAggregatorPad *aggpad;
+  GstAggregator *self;
+
+  if (user_data == NULL)
+    return FALSE;
+
+  aggpad = GST_AGGREGATOR_PAD (user_data);
+
+  /* avoid holding the last reference to the parent element here */
+  PAD_LOCK_EVENT (aggpad);
+
+  self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad)));
+
+  GST_DEBUG_OBJECT (aggpad, "marked unresponsive");
+
+  g_atomic_int_set (&aggpad->unresponsive, TRUE);
+
+  if (self) {
+    QUEUE_PUSH (self);
+    gst_object_unref (self);
+  }
+
+  PAD_UNLOCK_EVENT (aggpad);
+
+  return TRUE;
+}
+
 static GstPad *
 _request_new_pad (GstElement * element,
     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
@@ -827,6 +886,7 @@ _request_new_pad (GstElement * element,
     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
     g_free (name);
+
     GST_OBJECT_UNLOCK (element);
 
   } else {
@@ -1127,6 +1187,7 @@ gst_aggregator_finalize (GObject * object)
 {
   GstAggregator *self = (GstAggregator *) object;
 
+  gst_object_unref (self->clock);
   g_mutex_clear (&self->priv->setcaps_lock);
 
   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
@@ -1145,6 +1206,82 @@ gst_aggregator_dispose (GObject * object)
   }
 }
 
+/**
+ * gst_aggregator_set_timeout:
+ * @agg: a #GstAggregator
+ * @timeout: the new timeout value.
+ *
+ * Sets the new timeout value to @timeout. This value is used to limit the
+ * amount of time a pad waits for data to appear before considering the pad
+ * as unresponsive.
+ */
+static void
+gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout)
+{
+  g_return_if_fail (GST_IS_AGGREGATOR (agg));
+
+  GST_OBJECT_LOCK (agg);
+  agg->timeout = timeout;
+  GST_OBJECT_UNLOCK (agg);
+}
+
+/**
+ * gst_aggregator_get_timeout:
+ * @agg: a #GstAggregator
+ *
+ * Gets the timeout value. See gst_aggregator_set_timeout for
+ * more details.
+ *
+ * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
+ * before a pad is deemed unresponsive. A value of -1 means an
+ * unlimited time.
+ */
+static gint64
+gst_aggregator_get_timeout (GstAggregator * agg)
+{
+  gint64 res;
+
+  g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
+
+  GST_OBJECT_LOCK (agg);
+  res = agg->timeout;
+  GST_OBJECT_UNLOCK (agg);
+
+  return res;
+}
+
+static void
+gst_aggregator_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstAggregator *agg = GST_AGGREGATOR (object);
+
+  switch (prop_id) {
+    case PROP_TIMEOUT:
+      gst_aggregator_set_timeout (agg, g_value_get_int64 (value));
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_aggregator_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstAggregator *agg = GST_AGGREGATOR (object);
+
+  switch (prop_id) {
+    case PROP_TIMEOUT:
+      g_value_set_int64 (value, gst_aggregator_get_timeout (agg));
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
 /* GObject vmethods implementations */
 static void
 gst_aggregator_class_init (GstAggregatorClass * klass)
@@ -1173,8 +1310,17 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad);
   gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state);
 
+  gobject_class->set_property = gst_aggregator_set_property;
+  gobject_class->get_property = gst_aggregator_get_property;
   gobject_class->finalize = gst_aggregator_finalize;
   gobject_class->dispose = gst_aggregator_dispose;
+
+  g_object_class_install_property (gobject_class, PROP_TIMEOUT,
+      g_param_spec_int64 ("timeout", "Buffer timeout",
+          "Number of nanoseconds to wait for a buffer to arrive on a sink pad"
+          "before the pad is deemed unresponsive (-1 unlimited)", -1,
+          G_MAXINT64, DEFAULT_TIMEOUT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 }
 
 static void
@@ -1211,6 +1357,9 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 
   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
 
+  self->clock = gst_system_clock_obtain ();
+  self->timeout = -1;
+
   g_mutex_init (&self->priv->setcaps_lock);
 }
 
@@ -1250,10 +1399,19 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   GstAggregatorPrivate *priv = self->priv;
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
+  GstClockTime timeout = gst_aggregator_get_timeout (self);
+  GstClockTime now;
 
   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
+  if (aggpad->priv->timeout_id) {
+    gst_clock_id_unschedule (aggpad->priv->timeout_id);
+    gst_clock_id_unref (aggpad->priv->timeout_id);
+    aggpad->priv->timeout_id = NULL;
+  }
+  g_atomic_int_set (&aggpad->unresponsive, FALSE);
 
   PAD_STREAM_LOCK (aggpad);
+
   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
     goto flushing;
 
@@ -1261,6 +1419,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
     goto eos;
 
   PAD_LOCK_EVENT (aggpad);
+
   if (aggpad->buffer) {
     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
     PAD_WAIT_EVENT (aggpad);
@@ -1283,6 +1442,14 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
 
   QUEUE_PUSH (self);
 
+  if (GST_CLOCK_TIME_IS_VALID (timeout)) {
+    now = gst_clock_get_time (self->clock);
+    aggpad->priv->timeout_id =
+        gst_clock_new_single_shot_id (self->clock, now + timeout);
+    gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout,
+        gst_object_ref (aggpad), gst_object_unref);
+  }
+
   GST_DEBUG_OBJECT (aggpad, "Done chaining");
 
   return priv->flow_return;
index 507da13..837fdbb 100644 (file)
@@ -73,6 +73,7 @@ struct _GstAggregatorPad
   GstBuffer                 *  buffer;
   GstSegment                   segment;
   gboolean                     eos;
+  gboolean                     unresponsive;
 
   /* < Private > */
   GstAggregatorPadPrivate   *  priv;
@@ -137,6 +138,11 @@ struct _GstAggregator
   /*< private >*/
   GstAggregatorPrivate  *  priv;
 
+  GstClock              *  clock;
+
+  /* properties */
+  gint64                   timeout;
+
   gpointer                 _gst_reserved[GST_PADDING];
 };