aggregator: Don't count the number of times we need to wake up but instead check...
authorSebastian Dröge <sebastian@centricular.com>
Mon, 22 Dec 2014 14:00:36 +0000 (15:00 +0100)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
This simplifies the code and also makes sure that we don't forget to check all
conditions for waiting.

Also fix a potential deadlock caused by not checking if we're actually still
running before starting to wait.

libs/gst/base/gstaggregator.c

index 7e5b70e..9c2e702 100644 (file)
@@ -167,19 +167,12 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
   } G_STMT_END
 
 #define SRC_STREAM_BROADCAST(self) G_STMT_START {                          \
-  GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",             \
-        g_thread_self());                                                  \
-  g_cond_broadcast(&(self->priv->src_cond));                               \
-  } G_STMT_END
-
-#define KICK_SRC_THREAD(self) G_STMT_START {                               \
     SRC_STREAM_LOCK (self);                                                \
-    GST_LOG_OBJECT (self, "kicking src STREAM from thread %p",             \
-          g_thread_self ());                                               \
+    GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",           \
+        g_thread_self());                                                  \
     if (self->priv->aggregate_id)                                          \
       gst_clock_id_unschedule (self->priv->aggregate_id);                  \
-    self->priv->n_kicks++;                                                 \
-    SRC_STREAM_BROADCAST (self);                                           \
+    g_cond_broadcast(&(self->priv->src_cond));                             \
     SRC_STREAM_UNLOCK (self);                                              \
   } G_STMT_END
 
@@ -248,7 +241,6 @@ struct _GstAggregatorPrivate
 
   /* aggregate */
   GstClockID aggregate_id;
-  gint n_kicks;
   GMutex src_lock;
   GCond src_cond;
 };
@@ -507,13 +499,23 @@ _wait_and_check (GstAggregator * self, gboolean * timeout)
   }
 
   SRC_STREAM_LOCK (self);
+
+  /* Before waiting, check if we're actually still running */
+  if (!self->priv->running || !self->priv->send_eos) {
+    SRC_STREAM_UNLOCK (self);
+
+    return FALSE;
+  }
+
   start = gst_aggregator_get_next_time (self);
 
   if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
       || !GST_CLOCK_TIME_IS_VALID (start)) {
-    while (self->priv->n_kicks <= 0)
-      SRC_STREAM_WAIT (self);
-    self->priv->n_kicks--;
+    /* We wake up here when something happened, and below
+     * then check if we're ready now. If we return FALSE,
+     * we will be directly called again.
+     */
+    SRC_STREAM_WAIT (self);
   } else {
     GstClockTime base_time, time;
     GstClock *clock;
@@ -559,7 +561,6 @@ _wait_and_check (GstAggregator * self, gboolean * timeout)
       gst_clock_id_unref (self->priv->aggregate_id);
       self->priv->aggregate_id = NULL;
     }
-    self->priv->n_kicks--;
 
     GST_DEBUG_OBJECT (self, "clock returned %d", status);
 
@@ -641,14 +642,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
       flush_start ? "Pausing" : "Stopping");
 
   self->priv->running = FALSE;
-  KICK_SRC_THREAD (self);
+  SRC_STREAM_BROADCAST (self);
 
   if (flush_start) {
     res = gst_pad_push_event (self->srcpad, flush_start);
   }
 
   gst_pad_stop_task (self->srcpad);
-  KICK_SRC_THREAD (self);
+  SRC_STREAM_BROADCAST (self);
 
   return res;
 }
@@ -659,7 +660,6 @@ _start_srcpad_task (GstAggregator * self)
   GST_INFO_OBJECT (self, "Starting srcpad task");
 
   self->priv->running = TRUE;
-  self->priv->n_kicks = 0;
   gst_pad_start_task (GST_PAD (self->srcpad),
       (GstTaskFunction) aggregate_func, self, NULL);
 }
@@ -776,7 +776,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
             gst_pad_push_event (self->srcpad, event);
             priv->send_eos = TRUE;
             event = NULL;
-            KICK_SRC_THREAD (self);
+            SRC_STREAM_BROADCAST (self);
 
             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
             GST_PAD_STREAM_UNLOCK (self->srcpad);
@@ -804,7 +804,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
       }
       PAD_UNLOCK_EVENT (aggpad);
 
-      KICK_SRC_THREAD (self);
+      SRC_STREAM_BROADCAST (self);
       goto eat;
     }
     case GST_EVENT_SEGMENT:
@@ -928,7 +928,7 @@ _release_pad (GstElement * element, GstPad * pad)
   gst_buffer_replace (&tmpbuf, NULL);
   gst_element_remove_pad (element, pad);
 
-  KICK_SRC_THREAD (self);
+  SRC_STREAM_BROADCAST (self);
 }
 
 static GstPad *
@@ -1655,7 +1655,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
 
   if (gst_aggregator_iterate_sinkpads (self,
           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
-    KICK_SRC_THREAD (self);
+    SRC_STREAM_BROADCAST (self);
 
   GST_DEBUG_OBJECT (aggpad, "Done chaining");