aggregator: Always handle serialized events/queries directly before waiting
authorSebastian Dröge <sebastian@centricular.com>
Mon, 19 Aug 2019 15:19:50 +0000 (18:19 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Mon, 19 Aug 2019 15:55:07 +0000 (18:55 +0300)
Otherwise it can happen that we start waiting for another pad, while one
pad already has events that can be handled and potentially also a buffer
that can be handled. That buffer would then however not be accessible by
the subclass from GstAggregator::get_next_time() as there would be the
events in front of it, which doesn't allow the subclass then to
calculate the next time based on already available buffers.

As a side-effect this also allows removing the duplicated event handling
code in the aggregate function as we'll always report pads as not ready
when there is a serialized event or query at the top of at least one
pad's queue.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/issues/428

libs/gst/base/gstaggregator.c

index e8282de..9eaaa90 100644 (file)
@@ -398,8 +398,15 @@ gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
       pad->priv->clipped_buffer == NULL);
 }
 
+/* Will return FALSE if there's no buffer available on every non-EOS pad, or
+ * if at least one of the pads has an event or query at the top of its queue.
+ *
+ * Only returns TRUE if all non-EOS pads have a buffer available at the top of
+ * their queue or a clipped buffer already.
+ */
 static gboolean
-gst_aggregator_check_pads_ready (GstAggregator * self)
+gst_aggregator_check_pads_ready (GstAggregator * self,
+    gboolean * have_event_or_query_ret)
 {
   GstAggregatorPad *pad = NULL;
   GList *l, *sinkpads;
@@ -419,18 +426,30 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
 
     PAD_LOCK (pad);
 
-    if (pad->priv->num_buffers == 0) {
-      if (!gst_aggregator_pad_queue_is_empty (pad))
-        have_event_or_query = TRUE;
-      if (!pad->priv->eos) {
-        have_buffer = FALSE;
+    /* If there's an event or query at the top of the queue and we don't yet
+     * have taken the top buffer out and stored it as clip_buffer, remember
+     * that and exit the loop. We first have to handle all events/queries
+     * before we handle any buffers. */
+    if (!pad->priv->clipped_buffer
+        && (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))
+            || GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))) {
+      PAD_UNLOCK (pad);
+      have_event_or_query = TRUE;
+      break;
+    }
 
-        /* If not live we need data on all pads, so leave the loop */
-        if (!self->priv->peer_latency_live) {
-          PAD_UNLOCK (pad);
-          goto pad_not_ready;
-        }
-      }
+    /* Otherwise check if we have a clipped buffer or a buffer at the top of
+     * the queue, and if not then this pad is not ready unless it is also EOS */
+    if (!pad->priv->clipped_buffer
+        && !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+      /* We must not have any buffers at all in this pad then as otherwise we
+       * would've had an event/query at the top of the queue */
+      g_assert (pad->priv->num_buffers == 0);
+
+      /* Only consider this pad as worth waiting for if it's not already EOS.
+       * There's no point in waiting for buffers on EOS pads */
+      if (!pad->priv->eos)
+        have_buffer = FALSE;
     } else if (self->priv->peer_latency_live) {
       /* In live mode, having a single pad with buffers is enough to
        * generate a start time from it. In non-live mode all pads need
@@ -442,7 +461,10 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
     PAD_UNLOCK (pad);
   }
 
-  if (!have_buffer && !have_event_or_query)
+  if (have_event_or_query)
+    goto pad_not_ready_but_event_or_query;
+
+  if (!have_buffer)
     goto pad_not_ready;
 
   if (have_buffer)
@@ -450,23 +472,42 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
 
   GST_OBJECT_UNLOCK (self);
   GST_LOG_OBJECT (self, "pads are ready");
+
+  if (have_event_or_query_ret)
+    *have_event_or_query_ret = have_event_or_query;
+
   return TRUE;
 
 no_sinkpads:
   {
     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
     GST_OBJECT_UNLOCK (self);
+
+    if (have_event_or_query_ret)
+      *have_event_or_query_ret = have_event_or_query;
+
     return FALSE;
   }
 pad_not_ready:
   {
-    if (have_event_or_query)
-      GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
-          " but waking up for serialized event");
-    else
-      GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
+    GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
     GST_OBJECT_UNLOCK (self);
-    return have_event_or_query;
+
+    if (have_event_or_query_ret)
+      *have_event_or_query_ret = have_event_or_query;
+
+    return FALSE;
+  }
+pad_not_ready_but_event_or_query:
+  {
+    GST_LOG_OBJECT (pad,
+        "pad not ready to be aggregated yet, need to handle serialized event or query first");
+    GST_OBJECT_UNLOCK (self);
+
+    if (have_event_or_query_ret)
+      *have_event_or_query_ret = have_event_or_query;
+
+    return FALSE;
   }
 }
 
@@ -630,6 +671,7 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
   GstClockTime latency;
   GstClockTime start;
   gboolean res;
+  gboolean have_event_or_query = FALSE;
 
   *timeout = FALSE;
 
@@ -637,13 +679,21 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
 
   latency = gst_aggregator_get_latency_unlocked (self);
 
-  if (gst_aggregator_check_pads_ready (self)) {
+  if (gst_aggregator_check_pads_ready (self, &have_event_or_query)) {
     GST_DEBUG_OBJECT (self, "all pads have data");
     SRC_UNLOCK (self);
 
     return TRUE;
   }
 
+  /* If we have an event or query, immediately return FALSE instead of waiting
+   * and handle it immediately */
+  if (have_event_or_query) {
+    GST_DEBUG_OBJECT (self, "Have serialized event or query to handle first");
+    SRC_UNLOCK (self);
+    return FALSE;
+  }
+
   /* Before waiting, check if we're actually still running */
   if (!self->priv->running || !self->priv->send_eos) {
     SRC_UNLOCK (self);
@@ -722,7 +772,7 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
     }
   }
 
-  res = gst_aggregator_check_pads_ready (self);
+  res = gst_aggregator_check_pads_ready (self, NULL);
   SRC_UNLOCK (self);
 
   return res;
@@ -1175,17 +1225,6 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     if (!gst_aggregator_wait_and_check (self, &timeout))
       continue;
 
-    events_query_data.processed_event = FALSE;
-    events_query_data.flow_ret = GST_FLOW_OK;
-    gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
-        gst_aggregator_do_events_and_queries, &events_query_data);
-
-    if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
-      goto handle_error;
-
-    if (events_query_data.processed_event)
-      continue;
-
     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
       if (!gst_aggregator_negotiate_unlocked (self)) {
         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));