aggregator: Add a streaming lock so to secure flush start action
authorThibault Saunier <tsaunier@gnome.org>
Sat, 2 Aug 2014 16:25:01 +0000 (18:25 +0200)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:25 +0000 (15:10 +0000)
Without a lock that is taken in FLUSH_START we had a rare race where we
end up aggregating a buffer that was before the whole FLUSH_START/STOP
dance. That could lead to very wrong behaviour in subclasses.

libs/gst/base/gstaggregator.c

index 5f6de3e..99767d0 100644 (file)
@@ -122,6 +122,22 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
         g_thread_self());                                           \
   } G_STMT_END
 
+#define PAD_STREAM_LOCK(pad)   G_STMT_START {                            \
+  GST_LOG_OBJECT (pad, "Taking lock from thread %p",              \
+        g_thread_self());                                               \
+  g_mutex_lock(&pad->priv->stream_lock);                                 \
+  GST_LOG_OBJECT (pad, "Took lock from thread %p",              \
+        g_thread_self());                                               \
+  } G_STMT_END
+
+#define PAD_STREAM_UNLOCK(pad)  G_STMT_START {                           \
+  GST_LOG_OBJECT (pad, "Releasing lock from thread %p",          \
+        g_thread_self());                                               \
+  g_mutex_unlock(&pad->priv->stream_lock);                               \
+  GST_LOG_OBJECT (pad, "Release lock from thread %p",          \
+        g_thread_self());                                               \
+  } G_STMT_END
+
 struct _GstAggregatorPadPrivate
 {
   gboolean pending_flush_start;
@@ -131,6 +147,8 @@ struct _GstAggregatorPadPrivate
 
   GMutex event_lock;
   GCond event_cond;
+
+  GMutex stream_lock;
 };
 
 static gboolean
@@ -581,6 +599,47 @@ _all_flush_stop_received (GstAggregator * self)
   return TRUE;
 }
 
+static void
+_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
+{
+  GstBuffer *tmpbuf;
+  GstAggregatorPrivate *priv = self->priv;
+  GstAggregatorPadPrivate *padpriv = aggpad->priv;
+
+  g_atomic_int_set (&aggpad->priv->flushing, TRUE);
+  /*  Remove pad buffer and wake up the streaming thread */
+  tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
+  gst_buffer_replace (&tmpbuf, NULL);
+  PAD_STREAM_LOCK (aggpad);
+  if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
+          TRUE, FALSE) == TRUE) {
+    GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
+    g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
+  }
+
+  if (g_atomic_int_get (&priv->flush_seeking)) {
+    /* If flush_seeking we forward the first FLUSH_START */
+    if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
+            TRUE, FALSE) == TRUE) {
+
+      GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
+      _stop_srcpad_task (self, event);
+      priv->flow_return = GST_FLOW_OK;
+
+      GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
+      GST_PAD_STREAM_LOCK (self->srcpad);
+      GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
+      event = NULL;
+    }
+  } else {
+    gst_event_unref (event);
+  }
+  PAD_STREAM_UNLOCK (aggpad);
+
+  tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
+  gst_buffer_replace (&tmpbuf, NULL);
+}
+
 /* GstAggregator vmethods default implementations */
 static gboolean
 _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
@@ -588,41 +647,13 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
   gboolean res = TRUE;
   GstPad *pad = GST_PAD (aggpad);
   GstAggregatorPrivate *priv = self->priv;
-  GstAggregatorPadPrivate *padpriv = aggpad->priv;
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
     {
-      GstBuffer *tmpbuf;
-
-      g_atomic_int_set (&aggpad->priv->flushing, TRUE);
-      /*  Remove pad buffer and wake up the streaming thread */
-      tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
-      gst_buffer_replace (&tmpbuf, NULL);
-      if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
-              TRUE, FALSE) == TRUE) {
-        GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
-        g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
-      }
-
-      if (g_atomic_int_get (&priv->flush_seeking)) {
-        /* If flush_seeking we forward the first FLUSH_START */
-        if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
-                TRUE, FALSE) == TRUE) {
-
-          GST_DEBUG_OBJECT (self, "Flushing, pausing srcpad task");
-          _stop_srcpad_task (self, event);
-          priv->flow_return = GST_FLOW_OK;
-
-          GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
-          GST_PAD_STREAM_LOCK (self->srcpad);
-          GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
-          event = NULL;
-          goto eat;
-        }
-      }
-
+      _flush_start (self, aggpad, event);
       /* We forward only in one case: right after flush_seeking */
+      event = NULL;
       goto eat;
     }
     case GST_EVENT_FLUSH_STOP:
@@ -1258,6 +1289,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
 
   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
 
+  PAD_STREAM_LOCK (aggpad);
   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
     goto flushing;
 
@@ -1274,7 +1306,6 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
     goto flushing;
 
-
   if (aggclass->clip) {
     aggclass->clip (self, aggpad, buffer, &actual_buf);
   }
@@ -1284,6 +1315,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
     gst_buffer_unref (aggpad->buffer);
   aggpad->buffer = actual_buf;
   PAD_UNLOCK_EVENT (aggpad);
+  PAD_STREAM_UNLOCK (aggpad);
 
   _add_aggregate_gsource (self);
 
@@ -1292,6 +1324,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   return priv->flow_return;
 
 flushing:
+  PAD_STREAM_UNLOCK (aggpad);
 
   gst_buffer_unref (buffer);
   GST_DEBUG_OBJECT (aggpad, "We are flushing");
@@ -1299,6 +1332,7 @@ flushing:
   return GST_FLOW_FLUSHING;
 
 eos:
+  PAD_STREAM_UNLOCK (aggpad);
 
   gst_buffer_unref (buffer);
   GST_DEBUG_OBJECT (pad, "We are EOS already...");
@@ -1374,6 +1408,7 @@ gst_aggregator_pad_finalize (GObject * object)
 
   g_mutex_clear (&pad->priv->event_lock);
   g_cond_clear (&pad->priv->event_cond);
+  g_mutex_clear (&pad->priv->stream_lock);
 
   G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object);
 }
@@ -1415,6 +1450,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
   g_mutex_init (&pad->priv->event_lock);
   g_cond_init (&pad->priv->event_cond);
 
+  g_mutex_init (&pad->priv->stream_lock);
 }
 
 /**