aggregator: Replace GMainContext with GAsyncQueue (v2)
authorJan Alexander Steffens (heftig) <jan.steffens@gmail.com>
Wed, 17 Sep 2014 14:48:02 +0000 (16:48 +0200)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
The previous implementation kept accumulating GSources,
slowing down the iteration and leaking memory.

Instead of trying to fix the main context flushing, replace
it with a GAsyncQueue which is simple to flush and has
less overhead.

https://bugzilla.gnome.org/show_bug.cgi?id=736782

libs/gst/base/gstaggregator.c

index 55ac868..dce7180 100644 (file)
@@ -170,34 +170,41 @@ _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
  *************************************/
 static GstElementClass *aggregator_parent_class = NULL;
 
-#define MAIN_CONTEXT_LOCK(self) G_STMT_START {                       \
-  GST_LOG_OBJECT (self, "Getting MAIN_CONTEXT_LOCK in thread %p",    \
+#define AGGREGATOR_QUEUE(self) (((GstAggregator*)self)->priv->queue)
+
+#define QUEUE_PUSH(self) G_STMT_START {                              \
+  GST_LOG_OBJECT (self, "Pushing to QUEUE in thread %p",             \
+      g_thread_self());                                              \
+  g_async_queue_push (AGGREGATOR_QUEUE (self), GINT_TO_POINTER (1)); \
+} G_STMT_END
+
+#define QUEUE_POP(self) G_STMT_START {                               \
+  GST_LOG_OBJECT (self, "Waiting on QUEUE in thread %p",             \
         g_thread_self());                                            \
-  g_mutex_lock(&((GstAggregator*)self)->priv->mcontext_lock);    \
-  GST_LOG_OBJECT (self, "Got MAIN_CONTEXT_LOCK in thread %p",        \
+  g_async_queue_pop (AGGREGATOR_QUEUE (self));                       \
+  GST_LOG_OBJECT (self, "Waited on QUEUE in thread %p",              \
         g_thread_self());                                            \
 } G_STMT_END
 
-#define MAIN_CONTEXT_UNLOCK(self) G_STMT_START {                     \
-  g_mutex_unlock(&((GstAggregator*)self)->priv->mcontext_lock);  \
-  GST_LOG_OBJECT (self, "Unlocked MAIN_CONTEXT_LOCK in thread %p",   \
-        g_thread_self());                                            \
+#define QUEUE_FLUSH(self) G_STMT_START {                             \
+  GST_LOG_OBJECT (self, "Flushing QUEUE in thread %p",               \
+      g_thread_self());                                              \
+  g_async_queue_lock (AGGREGATOR_QUEUE (self));                      \
+  while (g_async_queue_try_pop_unlocked (AGGREGATOR_QUEUE (self)));  \
+  g_async_queue_unlock (AGGREGATOR_QUEUE (self));                    \
+  GST_LOG_OBJECT (self, "Flushed QUEUE in thread %p",                \
+      g_thread_self());                                              \
 } G_STMT_END
 
 struct _GstAggregatorPrivate
 {
   gint padcount;
 
-  GMainContext *mcontext;
+  GAsyncQueue *queue;
 
   /* Our state is >= PAUSED */
   gboolean running;
 
-  /* Ensure that when we remove all sources from the maincontext
-   * we can not add any source, avoiding:
-   * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */
-  GMutex mcontext_lock;
-  GList *gsources;
 
   gint seqnum;
   gboolean send_stream_start;
@@ -430,31 +437,20 @@ _push_eos (GstAggregator * self)
   gst_pad_push_event (self->srcpad, event);
 }
 
-
-static void
-_destroy_gsource (GSource * source)
-{
-  g_source_destroy (source);
-  g_source_unref (source);
-}
-
 static void
-_remove_all_sources (GstAggregator * self)
-{
-  GstAggregatorPrivate *priv = self->priv;
-
-  MAIN_CONTEXT_LOCK (self);
-  g_list_free_full (priv->gsources, (GDestroyNotify) _destroy_gsource);
-  priv->gsources = NULL;
-  MAIN_CONTEXT_UNLOCK (self);
-}
-
-static gboolean
 aggregate_func (GstAggregator * self)
 {
   GstAggregatorPrivate *priv = self->priv;
   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
 
+  if (self->priv->running == FALSE) {
+    GST_DEBUG_OBJECT (self, "Not running anymore");
+
+    return;
+  }
+
+  QUEUE_POP (self);
+
   GST_LOG_OBJECT (self, "Checking aggregate");
   while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
@@ -464,8 +460,7 @@ aggregate_func (GstAggregator * self)
     priv->flow_return = klass->aggregate (self);
 
     if (priv->flow_return == GST_FLOW_EOS) {
-      g_main_context_wakeup (self->priv->mcontext);
-      _remove_all_sources (self);
+      QUEUE_FLUSH (self);
       _push_eos (self);
     }
 
@@ -480,19 +475,6 @@ aggregate_func (GstAggregator * self)
       break;
   }
 
-  return G_SOURCE_REMOVE;
-}
-
-static void
-iterate_main_context_func (GstAggregator * self)
-{
-  if (self->priv->running == FALSE) {
-    GST_DEBUG_OBJECT (self, "Not running anymore");
-
-    return;
-  }
-
-  g_main_context_iteration (self->priv->mcontext, TRUE);
 }
 
 static gboolean
@@ -523,15 +505,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
       flush_start ? "Pausing" : "Stopping");
 
   self->priv->running = FALSE;
+  QUEUE_PUSH (self);
 
-  /*  Clean the stack of GSource set on the MainContext */
-  g_main_context_wakeup (self->priv->mcontext);
-  _remove_all_sources (self);
   if (flush_start) {
     res = gst_pad_push_event (self->srcpad, flush_start);
   }
 
   gst_pad_stop_task (self->srcpad);
+  QUEUE_FLUSH (self);
 
   return res;
 }
@@ -543,21 +524,7 @@ _start_srcpad_task (GstAggregator * self)
 
   self->priv->running = TRUE;
   gst_pad_start_task (GST_PAD (self->srcpad),
-      (GstTaskFunction) iterate_main_context_func, self, NULL);
-}
-
-static inline void
-_add_aggregate_gsource (GstAggregator * self)
-{
-  GSource *source;
-  GstAggregatorPrivate *priv = self->priv;
-
-  MAIN_CONTEXT_LOCK (self);
-  source = g_idle_source_new ();
-  g_source_set_callback (source, (GSourceFunc) aggregate_func, self, NULL);
-  priv->gsources = g_list_prepend (priv->gsources, source);
-  g_source_attach (source, priv->mcontext);
-  MAIN_CONTEXT_UNLOCK (self);
+      (GstTaskFunction) aggregate_func, self, NULL);
 }
 
 static GstFlowReturn
@@ -672,7 +639,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
             gst_pad_push_event (self->srcpad, event);
             priv->send_eos = TRUE;
             event = NULL;
-            _add_aggregate_gsource (self);
+            QUEUE_PUSH (self);
 
             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
             GST_PAD_STREAM_UNLOCK (self->srcpad);
@@ -700,7 +667,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
       }
       PAD_UNLOCK_EVENT (aggpad);
 
-      _add_aggregate_gsource (self);
+      QUEUE_PUSH (self);
       goto eat;
     }
     case GST_EVENT_SEGMENT:
@@ -825,7 +792,7 @@ _release_pad (GstElement * element, GstPad * pad)
   gst_element_remove_pad (element, pad);
 
   /* Something changed make sure we try to aggregate */
-  _add_aggregate_gsource (self);
+  QUEUE_PUSH (self);
 }
 
 static GstPad *
@@ -1160,7 +1127,6 @@ gst_aggregator_finalize (GObject * object)
 {
   GstAggregator *self = (GstAggregator *) object;
 
-  g_mutex_clear (&self->priv->mcontext_lock);
   g_mutex_clear (&self->priv->setcaps_lock);
 
   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
@@ -1173,8 +1139,10 @@ gst_aggregator_dispose (GObject * object)
 
   G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
 
-  g_main_context_unref (self->priv->mcontext);
-  _remove_all_sources (self);
+  if (AGGREGATOR_QUEUE (self)) {
+    g_async_queue_unref (AGGREGATOR_QUEUE (self));
+    AGGREGATOR_QUEUE (self) = NULL;
+  }
 }
 
 /* GObject vmethods implementations */
@@ -1231,7 +1199,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
   priv->tags_changed = FALSE;
   _reset_flow_values (self);
 
-  priv->mcontext = g_main_context_new ();
+  AGGREGATOR_QUEUE (self) = g_async_queue_new ();
   self->srcpad = gst_pad_new_from_template (pad_template, "src");
 
   gst_pad_set_event_function (self->srcpad,
@@ -1243,7 +1211,6 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 
   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
 
-  g_mutex_init (&self->priv->mcontext_lock);
   g_mutex_init (&self->priv->setcaps_lock);
 }
 
@@ -1314,7 +1281,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   PAD_UNLOCK_EVENT (aggpad);
   PAD_STREAM_UNLOCK (aggpad);
 
-  _add_aggregate_gsource (self);
+  QUEUE_PUSH (self);
 
   GST_DEBUG_OBJECT (aggpad, "Done chaining");