From 783195ccefa8690cc4a3ccffbfd77c2e86b2c391 Mon Sep 17 00:00:00 2001 From: Thiago Santos Date: Sun, 16 Mar 2014 14:27:30 -0300 Subject: [PATCH] decodebin: aggregate buffering messages Aggregate buffering messages to only post the lower value to avoid setting pipeline to playing while any multiqueue is still buffering. There are 3 scenarios where the entries should be removed from the list: 1) When decodebin is set to READY 2) When an element posts a 100% buffering (already implemented) 3) When a multiqueue is removed from decodebin. For item 3 we don't need to handle it because this should only happen when either 1 is hapenning or when it is playing a chained file, for which number 2 should have happened for the previous stream to finish https://bugzilla.gnome.org/show_bug.cgi?id=726423 --- gst/playback/gstdecodebin2.c | 79 +++++++++++++++++++++++++++++++++ tests/check/elements/decodebin.c | 94 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+) diff --git a/gst/playback/gstdecodebin2.c b/gst/playback/gstdecodebin2.c index 2789a14..b7b585c 100644 --- a/gst/playback/gstdecodebin2.c +++ b/gst/playback/gstdecodebin2.c @@ -183,6 +183,8 @@ struct _GstDecodeBin gboolean expose_allstreams; /* Whether to expose unknow type streams or not */ GList *filtered; /* elements for which error messages are filtered */ + + GList *buffering_status; /* element currently buffering messages */ }; struct _GstDecodeBinClass @@ -4563,6 +4565,9 @@ gst_decode_bin_change_state (GstElement * element, GstStateChange transition) dbin->decode_chain = NULL; } EXPOSE_UNLOCK (dbin); + g_list_free_full (dbin->buffering_status, + (GDestroyNotify) gst_message_unref); + dbin->buffering_status = NULL; break; case GST_STATE_CHANGE_READY_TO_NULL: default: @@ -4597,6 +4602,80 @@ gst_decode_bin_handle_message (GstBin * bin, GstMessage * msg) GST_OBJECT_LOCK (dbin); drop = (g_list_find (dbin->filtered, GST_MESSAGE_SRC (msg)) != NULL); GST_OBJECT_UNLOCK (dbin); + } else if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_BUFFERING) { + gint perc, msg_perc; + gint smaller_perc = 100; + GstMessage *smaller = NULL; + GList *found = NULL; + GList *iter; + + /* buffering messages must be aggregated as there might be multiple + * multiqueue in the pipeline and their independent buffering messages + * will confuse the application + * + * decodebin keeps a list of messages received from elements that are + * buffering. + * Rules are: + * 1) Always post the smaller buffering % + * 2) If an element posts a 100% buffering message, remove it from the list + * 3) When there are no more messages on the list, post 100% message + * 4) When an element posts a new buffering message, update the one + * on the list to this new value + */ + + gst_message_parse_buffering (msg, &msg_perc); + + /* + * Single loop for 2 things: + * 1) Look for a message with the same source + * 1.1) If the received message is 100%, remove it from the list + * 2) Find the minimum buffering from the list + */ + for (iter = dbin->buffering_status; iter;) { + GstMessage *bufstats = iter->data; + if (GST_MESSAGE_SRC (bufstats) == GST_MESSAGE_SRC (msg)) { + found = iter; + if (msg_perc < 100) { + gst_message_unref (iter->data); + bufstats = iter->data = gst_message_ref (msg); + } else { + GList *current = iter; + + /* remove the element here and avoid confusing the loop */ + iter = g_list_next (iter); + + gst_message_unref (current->data); + dbin->buffering_status = + g_list_delete_link (dbin->buffering_status, current); + + continue; + } + } + + gst_message_parse_buffering (bufstats, &perc); + if (perc < smaller_perc) { + smaller_perc = perc; + smaller = bufstats; + } + iter = g_list_next (iter); + } + + if (found == NULL && msg_perc < 100) { + if (msg_perc < smaller_perc) { + smaller_perc = msg_perc; + smaller = msg; + } + dbin->buffering_status = + g_list_prepend (dbin->buffering_status, gst_message_ref (msg)); + } + + /* now compute the buffering message that should be posted */ + if (smaller_perc == 100) { + g_assert (dbin->buffering_status == NULL); + /* we are posting the original received msg */ + } else { + gst_message_replace (&msg, smaller); + } } if (drop) diff --git a/tests/check/elements/decodebin.c b/tests/check/elements/decodebin.c index 049b3a9..fba0b1c 100644 --- a/tests/check/elements/decodebin.c +++ b/tests/check/elements/decodebin.c @@ -623,6 +623,99 @@ GST_START_TEST (test_parser_negotiation) GST_END_TEST; +GST_START_TEST (test_buffering_aggregation) +{ + GstElement *pipe, *decodebin; + GstMessage *msg; + GstElement *mq0, *mq1, *mq2; + gint perc; + + pipe = gst_pipeline_new (NULL); + fail_unless (pipe != NULL, "failed to create pipeline"); + + decodebin = gst_element_factory_make ("decodebin", "decodebin"); + fail_unless (decodebin != NULL, "Failed to create decodebin element"); + + fail_unless (gst_bin_add (GST_BIN (pipe), decodebin)); + + /* to simulate the buffering scenarios we stuff 2 multiqueues inside + * decodebin. This is hacky, but sould make decodebin handle its buffering + * messages all the same */ + mq0 = gst_element_factory_make ("multiqueue", NULL); + mq1 = gst_element_factory_make ("multiqueue", NULL); + mq2 = gst_element_factory_make ("multiqueue", NULL); + + fail_unless (gst_bin_add (GST_BIN (decodebin), mq0)); + fail_unless (gst_bin_add (GST_BIN (decodebin), mq1)); + fail_unless (gst_bin_add (GST_BIN (decodebin), mq2)); + + fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_READY), + GST_STATE_CHANGE_SUCCESS); + fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_PAUSED), + GST_STATE_CHANGE_ASYNC); + + /* currently we shoud have no buffering messages */ + msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0); + fail_unless (msg == NULL); + + /* only a single element buffering, the buffering percent should be the + * same as it */ + gst_element_post_message (mq0, gst_message_new_buffering (GST_OBJECT (mq0), + 50)); + msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0); + fail_unless (msg != NULL); + fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq0); + gst_message_parse_buffering (msg, &perc); + fail_unless (perc == 50); + gst_message_unref (msg); + + /* two elements buffering, the buffering percent should be the + * lowest one */ + gst_element_post_message (mq1, gst_message_new_buffering (GST_OBJECT (mq1), + 20)); + msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0); + fail_unless (msg != NULL); + fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1); + gst_message_parse_buffering (msg, &perc); + fail_unless (perc == 20); + gst_message_unref (msg); + + /* a 100% message should be ignored */ + gst_element_post_message (mq2, gst_message_new_buffering (GST_OBJECT (mq2), + 100)); + msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0); + fail_unless (msg != NULL); + fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1); + gst_message_parse_buffering (msg, &perc); + fail_unless (perc == 20); + gst_message_unref (msg); + + /* a new buffering message is posted with a higher value, go with the 20 */ + gst_element_post_message (mq2, gst_message_new_buffering (GST_OBJECT (mq2), + 80)); + msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0); + fail_unless (msg != NULL); + fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1); + gst_message_parse_buffering (msg, &perc); + fail_unless (perc == 20); + gst_message_unref (msg); + + /* The mq1 finishes buffering, new buffering status is now 50% from mq0 */ + gst_element_post_message (mq1, gst_message_new_buffering (GST_OBJECT (mq1), + 100)); + msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0); + fail_unless (msg != NULL); + fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq0); + gst_message_parse_buffering (msg, &perc); + fail_unless (perc == 50); + gst_message_unref (msg); + + gst_element_set_state (pipe, GST_STATE_NULL); + gst_object_unref (pipe); +} + +GST_END_TEST; + static Suite * decodebin_suite (void) { @@ -634,6 +727,7 @@ decodebin_suite (void) tcase_add_test (tc_chain, test_reuse_without_decoders); tcase_add_test (tc_chain, test_mp3_parser_loop); tcase_add_test (tc_chain, test_parser_negotiation); + tcase_add_test (tc_chain, test_buffering_aggregation); return s; } -- 2.7.4