From defe732ae0ccc61cfb2f98f0768bba459dbf08af Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 6 Apr 2021 20:56:55 +0300 Subject: [PATCH] aggregator: Release pads' peeked buffer when removing the pad or finalizing it The peeked buffer was always reset after calling ::aggregate() but under no other circumstances. If a pad was removed after peeking and before ::aggregate() returned then the peeked buffer would be leaked. This can easily happen if pads are removed from the aggregator from a pad probe downstream of the source pad but still in the source pad's streaming thread. Part-of: --- libs/gst/base/gstaggregator.c | 2 ++ tests/check/libs/aggregator.c | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index c55f4cc..f4ee40c 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -1938,6 +1938,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad) SRC_LOCK (self); gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); + gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL); gst_element_remove_pad (element, pad); self->priv->has_peer_latency = FALSE; @@ -3199,6 +3200,7 @@ gst_aggregator_pad_finalize (GObject * object) { GstAggregatorPad *pad = (GstAggregatorPad *) object; + gst_buffer_replace (&pad->priv->peeked_buffer, NULL); g_cond_clear (&pad->priv->event_cond); g_mutex_clear (&pad->priv->flush_lock); g_mutex_clear (&pad->priv->lock); diff --git a/tests/check/libs/aggregator.c b/tests/check/libs/aggregator.c index 24df768..73c1242 100644 --- a/tests/check/libs/aggregator.c +++ b/tests/check/libs/aggregator.c @@ -59,6 +59,7 @@ struct _GstTestAggregator guint64 timestamp; gboolean gap_expected; gboolean do_flush_on_aggregate; + gboolean do_remove_pad_on_aggregate; }; struct _GstTestAggregatorClass @@ -113,6 +114,14 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout) fail_unless (buf == popped_buf); gst_buffer_unref (buf); gst_buffer_unref (popped_buf); + } else if (testagg->do_remove_pad_on_aggregate) { + buf = gst_aggregator_pad_peek_buffer (pad); + + GST_DEBUG_OBJECT (pad, "Removing pad on aggregate"); + + gst_buffer_unref (buf); + gst_element_release_request_pad (GST_ELEMENT (aggregator), + GST_PAD (pad)); } else { gst_aggregator_pad_drop_buffer (pad); } @@ -1304,6 +1313,35 @@ GST_START_TEST (test_flush_on_aggregate) GST_END_TEST; +GST_START_TEST (test_remove_pad_on_aggregate) +{ + GThread *thread1, *thread2; + ChainData data1 = { 0, }; + ChainData data2 = { 0, }; + TestData test = { 0, }; + + _test_data_init (&test, FALSE); + ((GstTestAggregator *) test.aggregator)->do_remove_pad_on_aggregate = TRUE; + _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL); + _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL); + + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); + + g_main_loop_run (test.ml); + g_source_remove (test.timeout_id); + + /* these will return immediately as when the data is popped the threads are + * unlocked and will terminate */ + g_thread_join (thread1); + g_thread_join (thread2); + + _chain_data_clear (&data1); + _chain_data_clear (&data2); + _test_data_clear (&test); +} + +GST_END_TEST; static Suite * gst_aggregator_suite (void) @@ -1333,6 +1371,7 @@ gst_aggregator_suite (void) tcase_add_test (general, test_add_remove); tcase_add_test (general, test_change_state_intensive); tcase_add_test (general, test_flush_on_aggregate); + tcase_add_test (general, test_remove_pad_on_aggregate); return suite; } -- 2.7.4