aggregator: Take the stream lock when iterating sink pads
authorNirbheek Chauhan <nirbheek@centricular.com>
Fri, 26 Dec 2014 22:49:52 +0000 (04:19 +0530)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
When iterating sink pads to collect some data, we should take the stream lock so
we don't get stale data and possibly deadlock because of that. This fixes
a definitive deadlock in _wait_and_check() that manifests with high max
latencies in a live pipeline, and fixes other possible race conditions.

libs/gst/base/gstaggregator.c

index 47f60de..6a3796f 100644 (file)
@@ -166,13 +166,17 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
         g_thread_self());                                                  \
   } G_STMT_END
 
-#define SRC_STREAM_BROADCAST(self) G_STMT_START {                          \
-    SRC_STREAM_LOCK (self);                                                \
+#define SRC_STREAM_BROADCAST_UNLOCKED(self) G_STMT_START {                 \
     GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",           \
         g_thread_self());                                                  \
     if (self->priv->aggregate_id)                                          \
       gst_clock_id_unschedule (self->priv->aggregate_id);                  \
     g_cond_broadcast(&(self->priv->src_cond));                             \
+  } G_STMT_END
+
+#define SRC_STREAM_BROADCAST(self) G_STMT_START {                          \
+    SRC_STREAM_LOCK (self);                                                \
+    SRC_STREAM_BROADCAST_UNLOCKED (self);                                  \
     SRC_STREAM_UNLOCK (self);                                              \
   } G_STMT_END
 
@@ -485,21 +489,23 @@ _wait_and_check (GstAggregator * self, gboolean * timeout)
 {
   GstClockTime latency_max, latency_min;
   GstClockTime start;
-  gboolean live;
+  gboolean live, res;
 
   *timeout = FALSE;
 
+  SRC_STREAM_LOCK (self);
+
   gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
 
   if (gst_aggregator_iterate_sinkpads (self,
           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
           NULL)) {
     GST_DEBUG_OBJECT (self, "all pads have data");
+    SRC_STREAM_UNLOCK (self);
+
     return TRUE;
   }
 
-  SRC_STREAM_LOCK (self);
-
   /* Before waiting, check if we're actually still running */
   if (!self->priv->running || !self->priv->send_eos) {
     SRC_STREAM_UNLOCK (self);
@@ -571,10 +577,12 @@ _wait_and_check (GstAggregator * self, gboolean * timeout)
       return TRUE;
     }
   }
-  SRC_STREAM_UNLOCK (self);
 
-  return gst_aggregator_iterate_sinkpads (self,
+  res = gst_aggregator_iterate_sinkpads (self,
       (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL);
+  SRC_STREAM_UNLOCK (self);
+
+  return res;
 }
 
 static void
@@ -1074,8 +1082,10 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
   data.live = FALSE;
 
   /* query upstream's latency */
+  SRC_STREAM_LOCK (self);
   gst_aggregator_iterate_sinkpads (self,
       (GstAggregatorPadForeachFunc) _latency_query, &data);
+  SRC_STREAM_UNLOCK (self);
 
   if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) &&
       self->latency > data.max) {
@@ -1678,9 +1688,11 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   PAD_UNLOCK_EVENT (aggpad);
   PAD_STREAM_UNLOCK (aggpad);
 
+  SRC_STREAM_LOCK (self);
   if (gst_aggregator_iterate_sinkpads (self,
           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
-    SRC_STREAM_BROADCAST (self);
+    SRC_STREAM_BROADCAST_UNLOCKED (self);
+  SRC_STREAM_UNLOCK (self);
 
   GST_DEBUG_OBJECT (aggpad, "Done chaining");