gboolean expose_allstreams; /* Whether to expose unknow type streams or not */
GList *filtered; /* elements for which error messages are filtered */
+
+ GList *buffering_status; /* element currently buffering messages */
};
struct _GstDecodeBinClass
dbin->decode_chain = NULL;
}
EXPOSE_UNLOCK (dbin);
+ g_list_free_full (dbin->buffering_status,
+ (GDestroyNotify) gst_message_unref);
+ dbin->buffering_status = NULL;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
default:
GST_OBJECT_LOCK (dbin);
drop = (g_list_find (dbin->filtered, GST_MESSAGE_SRC (msg)) != NULL);
GST_OBJECT_UNLOCK (dbin);
+ } else if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_BUFFERING) {
+ gint perc, msg_perc;
+ gint smaller_perc = 100;
+ GstMessage *smaller = NULL;
+ GList *found = NULL;
+ GList *iter;
+
+ /* buffering messages must be aggregated as there might be multiple
+ * multiqueue in the pipeline and their independent buffering messages
+ * will confuse the application
+ *
+ * decodebin keeps a list of messages received from elements that are
+ * buffering.
+ * Rules are:
+ * 1) Always post the smaller buffering %
+ * 2) If an element posts a 100% buffering message, remove it from the list
+ * 3) When there are no more messages on the list, post 100% message
+ * 4) When an element posts a new buffering message, update the one
+ * on the list to this new value
+ */
+
+ gst_message_parse_buffering (msg, &msg_perc);
+
+ /*
+ * Single loop for 2 things:
+ * 1) Look for a message with the same source
+ * 1.1) If the received message is 100%, remove it from the list
+ * 2) Find the minimum buffering from the list
+ */
+ for (iter = dbin->buffering_status; iter;) {
+ GstMessage *bufstats = iter->data;
+ if (GST_MESSAGE_SRC (bufstats) == GST_MESSAGE_SRC (msg)) {
+ found = iter;
+ if (msg_perc < 100) {
+ gst_message_unref (iter->data);
+ bufstats = iter->data = gst_message_ref (msg);
+ } else {
+ GList *current = iter;
+
+ /* remove the element here and avoid confusing the loop */
+ iter = g_list_next (iter);
+
+ gst_message_unref (current->data);
+ dbin->buffering_status =
+ g_list_delete_link (dbin->buffering_status, current);
+
+ continue;
+ }
+ }
+
+ gst_message_parse_buffering (bufstats, &perc);
+ if (perc < smaller_perc) {
+ smaller_perc = perc;
+ smaller = bufstats;
+ }
+ iter = g_list_next (iter);
+ }
+
+ if (found == NULL && msg_perc < 100) {
+ if (msg_perc < smaller_perc) {
+ smaller_perc = msg_perc;
+ smaller = msg;
+ }
+ dbin->buffering_status =
+ g_list_prepend (dbin->buffering_status, gst_message_ref (msg));
+ }
+
+ /* now compute the buffering message that should be posted */
+ if (smaller_perc == 100) {
+ g_assert (dbin->buffering_status == NULL);
+ /* we are posting the original received msg */
+ } else {
+ gst_message_replace (&msg, smaller);
+ }
}
if (drop)
GST_END_TEST;
+GST_START_TEST (test_buffering_aggregation)
+{
+ GstElement *pipe, *decodebin;
+ GstMessage *msg;
+ GstElement *mq0, *mq1, *mq2;
+ gint perc;
+
+ pipe = gst_pipeline_new (NULL);
+ fail_unless (pipe != NULL, "failed to create pipeline");
+
+ decodebin = gst_element_factory_make ("decodebin", "decodebin");
+ fail_unless (decodebin != NULL, "Failed to create decodebin element");
+
+ fail_unless (gst_bin_add (GST_BIN (pipe), decodebin));
+
+ /* to simulate the buffering scenarios we stuff 2 multiqueues inside
+ * decodebin. This is hacky, but sould make decodebin handle its buffering
+ * messages all the same */
+ mq0 = gst_element_factory_make ("multiqueue", NULL);
+ mq1 = gst_element_factory_make ("multiqueue", NULL);
+ mq2 = gst_element_factory_make ("multiqueue", NULL);
+
+ fail_unless (gst_bin_add (GST_BIN (decodebin), mq0));
+ fail_unless (gst_bin_add (GST_BIN (decodebin), mq1));
+ fail_unless (gst_bin_add (GST_BIN (decodebin), mq2));
+
+ fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_READY),
+ GST_STATE_CHANGE_SUCCESS);
+ fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_PAUSED),
+ GST_STATE_CHANGE_ASYNC);
+
+ /* currently we shoud have no buffering messages */
+ msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+ fail_unless (msg == NULL);
+
+ /* only a single element buffering, the buffering percent should be the
+ * same as it */
+ gst_element_post_message (mq0, gst_message_new_buffering (GST_OBJECT (mq0),
+ 50));
+ msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+ fail_unless (msg != NULL);
+ fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq0);
+ gst_message_parse_buffering (msg, &perc);
+ fail_unless (perc == 50);
+ gst_message_unref (msg);
+
+ /* two elements buffering, the buffering percent should be the
+ * lowest one */
+ gst_element_post_message (mq1, gst_message_new_buffering (GST_OBJECT (mq1),
+ 20));
+ msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+ fail_unless (msg != NULL);
+ fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1);
+ gst_message_parse_buffering (msg, &perc);
+ fail_unless (perc == 20);
+ gst_message_unref (msg);
+
+ /* a 100% message should be ignored */
+ gst_element_post_message (mq2, gst_message_new_buffering (GST_OBJECT (mq2),
+ 100));
+ msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+ fail_unless (msg != NULL);
+ fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1);
+ gst_message_parse_buffering (msg, &perc);
+ fail_unless (perc == 20);
+ gst_message_unref (msg);
+
+ /* a new buffering message is posted with a higher value, go with the 20 */
+ gst_element_post_message (mq2, gst_message_new_buffering (GST_OBJECT (mq2),
+ 80));
+ msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+ fail_unless (msg != NULL);
+ fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1);
+ gst_message_parse_buffering (msg, &perc);
+ fail_unless (perc == 20);
+ gst_message_unref (msg);
+
+ /* The mq1 finishes buffering, new buffering status is now 50% from mq0 */
+ gst_element_post_message (mq1, gst_message_new_buffering (GST_OBJECT (mq1),
+ 100));
+ msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+ fail_unless (msg != NULL);
+ fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq0);
+ gst_message_parse_buffering (msg, &perc);
+ fail_unless (perc == 50);
+ gst_message_unref (msg);
+
+ gst_element_set_state (pipe, GST_STATE_NULL);
+ gst_object_unref (pipe);
+}
+
+GST_END_TEST;
+
static Suite *
decodebin_suite (void)
{
tcase_add_test (tc_chain, test_reuse_without_decoders);
tcase_add_test (tc_chain, test_mp3_parser_loop);
tcase_add_test (tc_chain, test_parser_negotiation);
+ tcase_add_test (tc_chain, test_buffering_aggregation);
return s;
}