aggregator: make the src pad task drive the pipeline for live pipelines
authorMatthew Waters <matthew@centricular.com>
Fri, 5 Dec 2014 07:19:54 +0000 (18:19 +1100)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
This removes the uses of GAsyncQueue and replaces it with explicit
GMutex, GCond and wakeup count which is used for the non-live case.

For live pipelines, the aggregator waits on the clock until either
data arrives on all sink pads or the expected output buffer time
arrives plus the timeout/latency at which time, the subclass
produces a buffer.

https://bugzilla.gnome.org/show_bug.cgi?id=741146

libs/gst/base/gstaggregator.c
libs/gst/base/gstaggregator.h

index 3503d22..f18bbdb 100644 (file)
@@ -69,8 +69,9 @@
 /*  Might become API */
 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
     const GstTagList * tags, GstTagMergeMode mode);
-static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout);
-static gint64 gst_aggregator_get_timeout (GstAggregator * agg);
+static void gst_aggregator_set_latency_property (GstAggregator * agg,
+    gint64 latency);
+static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
 
 
 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
@@ -141,6 +142,48 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
         g_thread_self());                                               \
   } G_STMT_END
 
+#define SRC_STREAM_LOCK(self)   G_STMT_START {                             \
+  GST_LOG_OBJECT (self, "Taking src STREAM lock from thread %p",           \
+        g_thread_self());                                                  \
+  g_mutex_lock(&self->priv->src_lock);                                     \
+  GST_LOG_OBJECT (self, "Took src STREAM lock from thread %p",             \
+        g_thread_self());                                                  \
+  } G_STMT_END
+
+#define SRC_STREAM_UNLOCK(self)  G_STMT_START {                            \
+  GST_LOG_OBJECT (self, "Releasing src STREAM lock from thread %p",        \
+        g_thread_self());                                                  \
+  g_mutex_unlock(&self->priv->src_lock);                                   \
+  GST_LOG_OBJECT (self, "Release src STREAM lock from thread %p",          \
+        g_thread_self());                                                  \
+  } G_STMT_END
+
+#define SRC_STREAM_WAIT(self)   G_STMT_START {                             \
+  GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p",             \
+        g_thread_self());                                                  \
+  g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));           \
+  GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p",        \
+        g_thread_self());                                                  \
+  } G_STMT_END
+
+#define SRC_STREAM_BROADCAST(self) {                                       \
+  GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",             \
+        g_thread_self());                                                  \
+  g_cond_broadcast(&(self->priv->src_cond));                               \
+  }
+
+#define KICK_SRC_THREAD(self) \
+  do { \
+    SRC_STREAM_LOCK (self); \
+    GST_LOG_OBJECT (self, "kicking src STREAM from thread %p", \
+          g_thread_self ()); \
+    if (self->priv->aggregate_id) \
+      gst_clock_id_unschedule (self->priv->aggregate_id); \
+    self->priv->n_kicks++; \
+    SRC_STREAM_BROADCAST (self); \
+    SRC_STREAM_UNLOCK (self); \
+  } while (0)
+
 struct _GstAggregatorPadPrivate
 {
   gboolean pending_flush_start;
@@ -148,8 +191,6 @@ struct _GstAggregatorPadPrivate
   gboolean pending_eos;
   gboolean flushing;
 
-  GstClockID timeout_id;
-
   GMutex event_lock;
   GCond event_cond;
 
@@ -175,38 +216,10 @@ _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
  *************************************/
 static GstElementClass *aggregator_parent_class = NULL;
 
-#define AGGREGATOR_QUEUE(self) (((GstAggregator*)self)->priv->queue)
-
-#define QUEUE_PUSH(self) G_STMT_START {                              \
-  GST_LOG_OBJECT (self, "Pushing to QUEUE in thread %p",             \
-      g_thread_self());                                              \
-  g_async_queue_push (AGGREGATOR_QUEUE (self), GINT_TO_POINTER (1)); \
-} G_STMT_END
-
-#define QUEUE_POP(self) G_STMT_START {                               \
-  GST_LOG_OBJECT (self, "Waiting on QUEUE in thread %p",             \
-        g_thread_self());                                            \
-  g_async_queue_pop (AGGREGATOR_QUEUE (self));                       \
-  GST_LOG_OBJECT (self, "Waited on QUEUE in thread %p",              \
-        g_thread_self());                                            \
-} G_STMT_END
-
-#define QUEUE_FLUSH(self) G_STMT_START {                             \
-  GST_LOG_OBJECT (self, "Flushing QUEUE in thread %p",               \
-      g_thread_self());                                              \
-  g_async_queue_lock (AGGREGATOR_QUEUE (self));                      \
-  while (g_async_queue_try_pop_unlocked (AGGREGATOR_QUEUE (self)));  \
-  g_async_queue_unlock (AGGREGATOR_QUEUE (self));                    \
-  GST_LOG_OBJECT (self, "Flushed QUEUE in thread %p",                \
-      g_thread_self());                                              \
-} G_STMT_END
-
 struct _GstAggregatorPrivate
 {
   gint padcount;
 
-  GAsyncQueue *queue;
-
   /* Our state is >= PAUSED */
   gboolean running;
 
@@ -230,6 +243,12 @@ struct _GstAggregatorPrivate
   gboolean latency_live;
   GstClockTime latency_min;
   GstClockTime latency_max;
+
+  /* aggregate */
+  GstClockID aggregate_id;
+  gint n_kicks;
+  GMutex src_lock;
+  GCond src_cond;
 };
 
 typedef struct
@@ -241,12 +260,12 @@ typedef struct
   gboolean one_actually_seeked;
 } EventData;
 
-#define DEFAULT_TIMEOUT        -1
+#define DEFAULT_LATENCY        -1
 
 enum
 {
   PROP_0,
-  PROP_TIMEOUT,
+  PROP_LATENCY,
   PROP_LAST
 };
 
@@ -329,18 +348,13 @@ no_iter:
 }
 
 static inline gboolean
-_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self,
-    GstAggregatorPad * aggpad)
+_check_all_pads_with_data_or_eos (GstAggregator * self,
+    GstAggregatorPad * aggpad, gpointer user_data)
 {
   if (aggpad->buffer || aggpad->eos) {
     return TRUE;
   }
 
-  if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) {
-    /* pad has been deemed unresponsive */
-    return TRUE;
-  }
-
   GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
 
   return FALSE;
@@ -460,6 +474,95 @@ _push_eos (GstAggregator * self)
   gst_pad_push_event (self->srcpad, event);
 }
 
+static GstClockTime
+gst_aggregator_get_next_time (GstAggregator * self)
+{
+  GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
+
+  if (klass->get_next_time)
+    return klass->get_next_time (self);
+
+  return GST_CLOCK_TIME_NONE;
+}
+
+/* called with the src STREAM lock */
+static gboolean
+_wait_and_check (GstAggregator * self)
+{
+  GstClockTime latency_max, latency_min;
+  GstClockTime start;
+  gboolean live;
+
+  gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
+
+  if (gst_aggregator_iterate_sinkpads (self,
+          (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
+          NULL)) {
+    GST_DEBUG_OBJECT (self, "all pads have data");
+    return TRUE;
+  }
+
+  SRC_STREAM_LOCK (self);
+  start = gst_aggregator_get_next_time (self);
+
+  if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
+      || !GST_CLOCK_TIME_IS_VALID (start)) {
+    while (self->priv->n_kicks <= 0)
+      SRC_STREAM_WAIT (self);
+    self->priv->n_kicks--;
+  } else {
+    GstClockTime time;
+    GstClockReturn status;
+
+    GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (start));
+
+    time = GST_ELEMENT_CAST (self)->base_time + start;
+
+    if (GST_CLOCK_TIME_IS_VALID (latency_max)) {
+      time += latency_max;
+    } else if (GST_CLOCK_TIME_IS_VALID (latency_min)) {
+      time += latency_min;
+    } else {
+      time += self->latency;
+    }
+
+    GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
+        GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
+        " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT
+        " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time),
+        GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
+        GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max),
+        GST_TIME_ARGS (latency_min),
+        GST_TIME_ARGS (gst_clock_get_time (GST_ELEMENT_CLOCK (self))));
+
+    self->priv->aggregate_id =
+        gst_clock_new_single_shot_id (GST_ELEMENT_CLOCK (self), time);
+    SRC_STREAM_UNLOCK (self);
+
+    status = gst_clock_id_wait (self->priv->aggregate_id, NULL);
+
+    SRC_STREAM_LOCK (self);
+    if (self->priv->aggregate_id) {
+      gst_clock_id_unref (self->priv->aggregate_id);
+      self->priv->aggregate_id = NULL;
+    }
+    self->priv->n_kicks--;
+
+    GST_DEBUG_OBJECT (self, "clock returned %d", status);
+
+    /* we timed out */
+    if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
+      SRC_STREAM_UNLOCK (self);
+      return TRUE;
+    }
+  }
+  SRC_STREAM_UNLOCK (self);
+
+  return gst_aggregator_iterate_sinkpads (self,
+      (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL);
+}
+
 static void
 aggregate_func (GstAggregator * self)
 {
@@ -468,22 +571,19 @@ aggregate_func (GstAggregator * self)
 
   if (self->priv->running == FALSE) {
     GST_DEBUG_OBJECT (self, "Not running anymore");
-
     return;
   }
 
-  QUEUE_POP (self);
-
   GST_LOG_OBJECT (self, "Checking aggregate");
-  while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
-          (GstAggregatorPadForeachFunc)
-          _check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) {
+  while (priv->send_eos && priv->running) {
+    if (!_wait_and_check (self))
+      continue;
+
     GST_TRACE_OBJECT (self, "Actually aggregating!");
 
     priv->flow_return = klass->aggregate (self);
 
     if (priv->flow_return == GST_FLOW_EOS) {
-      QUEUE_FLUSH (self);
       _push_eos (self);
     }
 
@@ -497,7 +597,6 @@ aggregate_func (GstAggregator * self)
     if (priv->flow_return != GST_FLOW_OK)
       break;
   }
-
 }
 
 static gboolean
@@ -528,14 +627,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
       flush_start ? "Pausing" : "Stopping");
 
   self->priv->running = FALSE;
-  QUEUE_PUSH (self);
+  KICK_SRC_THREAD (self);
 
   if (flush_start) {
     res = gst_pad_push_event (self->srcpad, flush_start);
   }
 
   gst_pad_stop_task (self->srcpad);
-  QUEUE_FLUSH (self);
+  KICK_SRC_THREAD (self);
 
   return res;
 }
@@ -546,6 +645,7 @@ _start_srcpad_task (GstAggregator * self)
   GST_INFO_OBJECT (self, "Starting srcpad task");
 
   self->priv->running = TRUE;
+  self->priv->n_kicks = 0;
   gst_pad_start_task (GST_PAD (self->srcpad),
       (GstTaskFunction) aggregate_func, self, NULL);
 }
@@ -662,7 +762,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
             gst_pad_push_event (self->srcpad, event);
             priv->send_eos = TRUE;
             event = NULL;
-            QUEUE_PUSH (self);
+            KICK_SRC_THREAD (self);
 
             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
             GST_PAD_STREAM_UNLOCK (self->srcpad);
@@ -690,7 +790,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
       }
       PAD_UNLOCK_EVENT (aggpad);
 
-      QUEUE_PUSH (self);
+      KICK_SRC_THREAD (self);
       goto eat;
     }
     case GST_EVENT_SEGMENT:
@@ -741,15 +841,6 @@ _stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
 {
   _aggpad_flush (pad, self);
 
-  PAD_LOCK_EVENT (pad);
-  /* remove the timeouts */
-  if (pad->priv->timeout_id) {
-    gst_clock_id_unschedule (pad->priv->timeout_id);
-    gst_clock_id_unref (pad->priv->timeout_id);
-    pad->priv->timeout_id = NULL;
-  }
-  PAD_UNLOCK_EVENT (pad);
-
   return TRUE;
 }
 
@@ -811,9 +902,9 @@ failure:
 static void
 _release_pad (GstElement * element, GstPad * pad)
 {
+  GstAggregator *self = GST_AGGREGATOR (element);
   GstBuffer *tmpbuf;
 
-  GstAggregator *self = GST_AGGREGATOR (element);
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
 
   GST_INFO_OBJECT (pad, "Removing pad");
@@ -823,39 +914,7 @@ _release_pad (GstElement * element, GstPad * pad)
   gst_buffer_replace (&tmpbuf, NULL);
   gst_element_remove_pad (element, pad);
 
-  /* Something changed make sure we try to aggregate */
-  QUEUE_PUSH (self);
-}
-
-static gboolean
-_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id,
-    gpointer user_data)
-{
-  GstAggregatorPad *aggpad;
-  GstAggregator *self;
-
-  if (user_data == NULL)
-    return FALSE;
-
-  aggpad = GST_AGGREGATOR_PAD (user_data);
-
-  /* avoid holding the last reference to the parent element here */
-  PAD_LOCK_EVENT (aggpad);
-
-  self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad)));
-
-  GST_DEBUG_OBJECT (aggpad, "marked unresponsive");
-
-  g_atomic_int_set (&aggpad->unresponsive, TRUE);
-
-  if (self) {
-    QUEUE_PUSH (self);
-    gst_object_unref (self);
-  }
-
-  PAD_UNLOCK_EVENT (aggpad);
-
-  return TRUE;
+  KICK_SRC_THREAD (self);
 }
 
 static GstPad *
@@ -970,11 +1029,11 @@ gst_aggregator_get_latency (GstAggregator * self, gboolean * live,
   min = self->priv->latency_min;
   max = self->priv->latency_max;
 
-  if (GST_CLOCK_TIME_IS_VALID (self->timeout)) {
+  if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
     if (GST_CLOCK_TIME_IS_VALID (min))
-      min += self->timeout;
+      min += self->latency;
     if (GST_CLOCK_TIME_IS_VALID (max))
-      max += self->timeout;
+      max += self->latency;
   }
 
   if (live)
@@ -998,13 +1057,13 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
   gst_aggregator_iterate_sinkpads (self,
       (GstAggregatorPadForeachFunc) _latency_query, &data);
 
-  if (data.live && GST_CLOCK_TIME_IS_VALID (self->timeout) &&
-      self->timeout > data.max) {
+  if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) &&
+      self->latency > data.max) {
     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
-        ("%s", "Timeout too big"),
-        ("The requested timeout value is too big for the latency in the "
-            "current pipeline.  Limiting to %" G_GINT64_FORMAT, data.max));
-    self->timeout = data.max;
+        ("%s", "Latency too big"),
+        ("The requested latency value is too big for the current pipeline.  "
+            "Limiting to %" G_GINT64_FORMAT, data.max));
+    self->latency = data.max;
   }
 
   self->priv->latency_live = data.live;
@@ -1012,11 +1071,11 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
   self->priv->latency_max = data.max;
 
   /* add our own */
-  if (GST_CLOCK_TIME_IS_VALID (self->timeout)) {
+  if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
     if (GST_CLOCK_TIME_IS_VALID (data.min))
-      data.min += self->timeout;
+      data.min += self->latency;
     if (GST_CLOCK_TIME_IS_VALID (data.max))
-      data.max += self->timeout;
+      data.max += self->latency;
   }
 
   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
@@ -1317,6 +1376,8 @@ gst_aggregator_finalize (GObject * object)
 
   gst_object_unref (self->clock);
   g_mutex_clear (&self->priv->setcaps_lock);
+  g_mutex_clear (&self->priv->src_lock);
+  g_cond_clear (&self->priv->src_cond);
 
   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
 }
@@ -1324,51 +1385,44 @@ gst_aggregator_finalize (GObject * object)
 static void
 gst_aggregator_dispose (GObject * object)
 {
-  GstAggregator *self = (GstAggregator *) object;
-
   G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
-
-  if (AGGREGATOR_QUEUE (self)) {
-    g_async_queue_unref (AGGREGATOR_QUEUE (self));
-    AGGREGATOR_QUEUE (self) = NULL;
-  }
 }
 
-/**
- * gst_aggregator_set_timeout:
+/*
+ * gst_aggregator_set_latency_property:
  * @agg: a #GstAggregator
- * @timeout: the new timeout value.
+ * @latency: the new latency value.
  *
- * Sets the new timeout value to @timeout. This value is used to limit the
+ * Sets the new latency value to @latency. This value is used to limit the
  * amount of time a pad waits for data to appear before considering the pad
  * as unresponsive.
  */
 static void
-gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout)
+gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
 {
   g_return_if_fail (GST_IS_AGGREGATOR (self));
 
   GST_OBJECT_LOCK (self);
 
   if (self->priv->latency_live && self->priv->latency_max != 0 &&
-      GST_CLOCK_TIME_IS_VALID (timeout) && timeout > self->priv->latency_max) {
+      GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) {
     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
-        ("%s", "Timeout too big"),
-        ("The requested timeout value is too big for the latency in the "
+        ("%s", "Latency too big"),
+        ("The requested latency value is too big for the latency in the "
             "current pipeline.  Limiting to %" G_GINT64_FORMAT,
             self->priv->latency_max));
-    timeout = self->priv->latency_max;
+    latency = self->priv->latency_max;
   }
 
-  self->timeout = timeout;
+  self->latency = latency;
   GST_OBJECT_UNLOCK (self);
 }
 
-/**
- * gst_aggregator_get_timeout:
+/*
+ * gst_aggregator_get_latency_property:
  * @agg: a #GstAggregator
  *
- * Gets the timeout value. See gst_aggregator_set_timeout for
+ * Gets the latency value. See gst_aggregator_set_latency for
  * more details.
  *
  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
@@ -1376,14 +1430,14 @@ gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout)
  * unlimited time.
  */
 static gint64
-gst_aggregator_get_timeout (GstAggregator * agg)
+gst_aggregator_get_latency_property (GstAggregator * agg)
 {
   gint64 res;
 
   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
 
   GST_OBJECT_LOCK (agg);
-  res = agg->timeout;
+  res = agg->latency;
   GST_OBJECT_UNLOCK (agg);
 
   return res;
@@ -1396,8 +1450,8 @@ gst_aggregator_set_property (GObject * object, guint prop_id,
   GstAggregator *agg = GST_AGGREGATOR (object);
 
   switch (prop_id) {
-    case PROP_TIMEOUT:
-      gst_aggregator_set_timeout (agg, g_value_get_int64 (value));
+    case PROP_LATENCY:
+      gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -1412,8 +1466,8 @@ gst_aggregator_get_property (GObject * object, guint prop_id,
   GstAggregator *agg = GST_AGGREGATOR (object);
 
   switch (prop_id) {
-    case PROP_TIMEOUT:
-      g_value_set_int64 (value, gst_aggregator_get_timeout (agg));
+    case PROP_LATENCY:
+      g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -1454,12 +1508,12 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   gobject_class->finalize = gst_aggregator_finalize;
   gobject_class->dispose = gst_aggregator_dispose;
 
-  g_object_class_install_property (gobject_class, PROP_TIMEOUT,
-      g_param_spec_int64 ("timeout", "Buffer timeout",
+  g_object_class_install_property (gobject_class, PROP_LATENCY,
+      g_param_spec_int64 ("latency", "Buffer latency",
           "Number of nanoseconds to wait for a buffer to arrive on a sink pad"
           "before the pad is deemed unresponsive (-1 unlimited)", -1,
           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
-          DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 }
 
 static void
@@ -1488,7 +1542,6 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
   self->priv->latency_max = GST_CLOCK_TIME_NONE;
   _reset_flow_values (self);
 
-  AGGREGATOR_QUEUE (self) = g_async_queue_new ();
   self->srcpad = gst_pad_new_from_template (pad_template, "src");
 
   gst_pad_set_event_function (self->srcpad,
@@ -1501,9 +1554,11 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
 
   self->clock = gst_system_clock_obtain ();
-  self->timeout = -1;
+  self->latency = -1;
 
   g_mutex_init (&self->priv->setcaps_lock);
+  g_mutex_init (&self->priv->src_lock);
+  g_cond_init (&self->priv->src_cond);
 }
 
 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
@@ -1542,16 +1597,8 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   GstAggregatorPrivate *priv = self->priv;
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
-  GstClockTime timeout = gst_aggregator_get_timeout (self);
-  GstClockTime now;
 
   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
-  if (aggpad->priv->timeout_id) {
-    gst_clock_id_unschedule (aggpad->priv->timeout_id);
-    gst_clock_id_unref (aggpad->priv->timeout_id);
-    aggpad->priv->timeout_id = NULL;
-  }
-  g_atomic_int_set (&aggpad->unresponsive, FALSE);
 
   PAD_STREAM_LOCK (aggpad);
 
@@ -1583,15 +1630,9 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   PAD_UNLOCK_EVENT (aggpad);
   PAD_STREAM_UNLOCK (aggpad);
 
-  QUEUE_PUSH (self);
-
-  if (GST_CLOCK_TIME_IS_VALID (timeout)) {
-    now = gst_clock_get_time (self->clock);
-    aggpad->priv->timeout_id =
-        gst_clock_new_single_shot_id (self->clock, now + timeout);
-    gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout,
-        gst_object_ref (aggpad), gst_object_unref);
-  }
+  if (gst_aggregator_iterate_sinkpads (self,
+          (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
+    KICK_SRC_THREAD (self);
 
   GST_DEBUG_OBJECT (aggpad, "Done chaining");
 
index 8bf8fbd..19b04e9 100644 (file)
@@ -73,7 +73,6 @@ struct _GstAggregatorPad
   GstBuffer                 *  buffer;
   GstSegment                   segment;
   gboolean                     eos;
-  gboolean                     unresponsive;
 
   /* < Private > */
   GstAggregatorPadPrivate   *  priv;
@@ -141,7 +140,7 @@ struct _GstAggregator
   GstClock              *  clock;
 
   /* properties */
-  gint64                   timeout;
+  gint64                   latency;
 
   gpointer                 _gst_reserved[GST_PADDING];
 };
@@ -189,6 +188,12 @@ struct _GstAggregator
  *                  Should be linked up first. Called when the element goes from
  *                  READY to PAUSED. The subclass should get ready to process
  *                  aggregated buffers.
+ * @get_next_time:  Optional.
+ *                  Called when the element needs to know the time of the next
+ *                  rendered buffer for live pipelines. This causes deadline
+ *                  based aggregation to occur. Defaults to returning
+ *                  GST_CLOCK_TIME_NONE causing the element to wait for buffers
+ *                  on all sink pads before aggregating.
  *
  * The aggregator base class will handle in a thread-safe way all manners of
  * concurrent flushes, seeks, pad additions and removals, leaving to the
@@ -238,6 +243,8 @@ struct _GstAggregatorClass {
 
   gboolean          (*start)          (GstAggregator    *  aggregator);
 
+  GstClockTime      (*get_next_time)  (GstAggregator    *  aggregator);
+
   /*< private >*/
   gpointer          _gst_reserved[GST_PADDING];
 };