aggregator: Release pads' peeked buffer when removing the pad or finalizing it
authorSebastian Dröge <sebastian@centricular.com>
Tue, 6 Apr 2021 17:56:55 +0000 (20:56 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Tue, 6 Apr 2021 18:17:56 +0000 (21:17 +0300)
The peeked buffer was always reset after calling ::aggregate() but under
no other circumstances. If a pad was removed after peeking and before
::aggregate() returned then the peeked buffer would be leaked.

This can easily happen if pads are removed from the aggregator from a
pad probe downstream of the source pad but still in the source pad's
streaming thread.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/784>

libs/gst/base/gstaggregator.c
tests/check/libs/aggregator.c

index c55f4cc..f4ee40c 100644 (file)
@@ -1938,6 +1938,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad)
 
   SRC_LOCK (self);
   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
+  gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
   gst_element_remove_pad (element, pad);
 
   self->priv->has_peer_latency = FALSE;
@@ -3199,6 +3200,7 @@ gst_aggregator_pad_finalize (GObject * object)
 {
   GstAggregatorPad *pad = (GstAggregatorPad *) object;
 
+  gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
   g_cond_clear (&pad->priv->event_cond);
   g_mutex_clear (&pad->priv->flush_lock);
   g_mutex_clear (&pad->priv->lock);
index 24df768..73c1242 100644 (file)
@@ -59,6 +59,7 @@ struct _GstTestAggregator
   guint64 timestamp;
   gboolean gap_expected;
   gboolean do_flush_on_aggregate;
+  gboolean do_remove_pad_on_aggregate;
 };
 
 struct _GstTestAggregatorClass
@@ -113,6 +114,14 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
           fail_unless (buf == popped_buf);
           gst_buffer_unref (buf);
           gst_buffer_unref (popped_buf);
+        } else if (testagg->do_remove_pad_on_aggregate) {
+          buf = gst_aggregator_pad_peek_buffer (pad);
+
+          GST_DEBUG_OBJECT (pad, "Removing pad on aggregate");
+
+          gst_buffer_unref (buf);
+          gst_element_release_request_pad (GST_ELEMENT (aggregator),
+              GST_PAD (pad));
         } else {
           gst_aggregator_pad_drop_buffer (pad);
         }
@@ -1304,6 +1313,35 @@ GST_START_TEST (test_flush_on_aggregate)
 
 GST_END_TEST;
 
+GST_START_TEST (test_remove_pad_on_aggregate)
+{
+  GThread *thread1, *thread2;
+  ChainData data1 = { 0, };
+  ChainData data2 = { 0, };
+  TestData test = { 0, };
+
+  _test_data_init (&test, FALSE);
+  ((GstTestAggregator *) test.aggregator)->do_remove_pad_on_aggregate = TRUE;
+  _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
+  _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
+
+  thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
+  thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
+
+  g_main_loop_run (test.ml);
+  g_source_remove (test.timeout_id);
+
+  /* these will return immediately as when the data is popped the threads are
+   * unlocked and will terminate */
+  g_thread_join (thread1);
+  g_thread_join (thread2);
+
+  _chain_data_clear (&data1);
+  _chain_data_clear (&data2);
+  _test_data_clear (&test);
+}
+
+GST_END_TEST;
 
 static Suite *
 gst_aggregator_suite (void)
@@ -1333,6 +1371,7 @@ gst_aggregator_suite (void)
   tcase_add_test (general, test_add_remove);
   tcase_add_test (general, test_change_state_intensive);
   tcase_add_test (general, test_flush_on_aggregate);
+  tcase_add_test (general, test_remove_pad_on_aggregate);
 
   return suite;
 }