decodebin: aggregate buffering messages
authorThiago Santos <ts.santos@sisa.samsung.com>
Sun, 16 Mar 2014 17:27:30 +0000 (14:27 -0300)
committerThiago Santos <ts.santos@sisa.samsung.com>
Thu, 29 May 2014 21:59:30 +0000 (18:59 -0300)
Aggregate buffering messages to only post the lower value
to avoid setting pipeline to playing while any multiqueue
is still buffering.

There are 3 scenarios where the entries should be removed from
the list:

1) When decodebin is set to READY
2) When an element posts a 100% buffering (already implemented)
3) When a multiqueue is removed from decodebin.

For item 3 we don't need to handle it because this should only
happen when either 1 is hapenning or when it is playing a
chained file, for which number 2 should have happened for the
previous stream to finish

https://bugzilla.gnome.org/show_bug.cgi?id=726423

gst/playback/gstdecodebin2.c
tests/check/elements/decodebin.c

index 2789a14..b7b585c 100644 (file)
@@ -183,6 +183,8 @@ struct _GstDecodeBin
   gboolean expose_allstreams;   /* Whether to expose unknow type streams or not */
 
   GList *filtered;              /* elements for which error messages are filtered */
+
+  GList *buffering_status;      /* element currently buffering messages */
 };
 
 struct _GstDecodeBinClass
@@ -4563,6 +4565,9 @@ gst_decode_bin_change_state (GstElement * element, GstStateChange transition)
         dbin->decode_chain = NULL;
       }
       EXPOSE_UNLOCK (dbin);
+      g_list_free_full (dbin->buffering_status,
+          (GDestroyNotify) gst_message_unref);
+      dbin->buffering_status = NULL;
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
     default:
@@ -4597,6 +4602,80 @@ gst_decode_bin_handle_message (GstBin * bin, GstMessage * msg)
     GST_OBJECT_LOCK (dbin);
     drop = (g_list_find (dbin->filtered, GST_MESSAGE_SRC (msg)) != NULL);
     GST_OBJECT_UNLOCK (dbin);
+  } else if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_BUFFERING) {
+    gint perc, msg_perc;
+    gint smaller_perc = 100;
+    GstMessage *smaller = NULL;
+    GList *found = NULL;
+    GList *iter;
+
+    /* buffering messages must be aggregated as there might be multiple
+     * multiqueue in the pipeline and their independent buffering messages
+     * will confuse the application
+     *
+     * decodebin keeps a list of messages received from elements that are
+     * buffering.
+     * Rules are:
+     * 1) Always post the smaller buffering %
+     * 2) If an element posts a 100% buffering message, remove it from the list
+     * 3) When there are no more messages on the list, post 100% message
+     * 4) When an element posts a new buffering message, update the one
+     *    on the list to this new value
+     */
+
+    gst_message_parse_buffering (msg, &msg_perc);
+
+    /*
+     * Single loop for 2 things:
+     * 1) Look for a message with the same source
+     *   1.1) If the received message is 100%, remove it from the list
+     * 2) Find the minimum buffering from the list
+     */
+    for (iter = dbin->buffering_status; iter;) {
+      GstMessage *bufstats = iter->data;
+      if (GST_MESSAGE_SRC (bufstats) == GST_MESSAGE_SRC (msg)) {
+        found = iter;
+        if (msg_perc < 100) {
+          gst_message_unref (iter->data);
+          bufstats = iter->data = gst_message_ref (msg);
+        } else {
+          GList *current = iter;
+
+          /* remove the element here and avoid confusing the loop */
+          iter = g_list_next (iter);
+
+          gst_message_unref (current->data);
+          dbin->buffering_status =
+              g_list_delete_link (dbin->buffering_status, current);
+
+          continue;
+        }
+      }
+
+      gst_message_parse_buffering (bufstats, &perc);
+      if (perc < smaller_perc) {
+        smaller_perc = perc;
+        smaller = bufstats;
+      }
+      iter = g_list_next (iter);
+    }
+
+    if (found == NULL && msg_perc < 100) {
+      if (msg_perc < smaller_perc) {
+        smaller_perc = msg_perc;
+        smaller = msg;
+      }
+      dbin->buffering_status =
+          g_list_prepend (dbin->buffering_status, gst_message_ref (msg));
+    }
+
+    /* now compute the buffering message that should be posted */
+    if (smaller_perc == 100) {
+      g_assert (dbin->buffering_status == NULL);
+      /* we are posting the original received msg */
+    } else {
+      gst_message_replace (&msg, smaller);
+    }
   }
 
   if (drop)
index 049b3a9..fba0b1c 100644 (file)
@@ -623,6 +623,99 @@ GST_START_TEST (test_parser_negotiation)
 
 GST_END_TEST;
 
+GST_START_TEST (test_buffering_aggregation)
+{
+  GstElement *pipe, *decodebin;
+  GstMessage *msg;
+  GstElement *mq0, *mq1, *mq2;
+  gint perc;
+
+  pipe = gst_pipeline_new (NULL);
+  fail_unless (pipe != NULL, "failed to create pipeline");
+
+  decodebin = gst_element_factory_make ("decodebin", "decodebin");
+  fail_unless (decodebin != NULL, "Failed to create decodebin element");
+
+  fail_unless (gst_bin_add (GST_BIN (pipe), decodebin));
+
+  /* to simulate the buffering scenarios we stuff 2 multiqueues inside
+   * decodebin. This is hacky, but sould make decodebin handle its buffering
+   * messages all the same */
+  mq0 = gst_element_factory_make ("multiqueue", NULL);
+  mq1 = gst_element_factory_make ("multiqueue", NULL);
+  mq2 = gst_element_factory_make ("multiqueue", NULL);
+
+  fail_unless (gst_bin_add (GST_BIN (decodebin), mq0));
+  fail_unless (gst_bin_add (GST_BIN (decodebin), mq1));
+  fail_unless (gst_bin_add (GST_BIN (decodebin), mq2));
+
+  fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_READY),
+      GST_STATE_CHANGE_SUCCESS);
+  fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_PAUSED),
+      GST_STATE_CHANGE_ASYNC);
+
+  /* currently we shoud have no buffering messages */
+  msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+  fail_unless (msg == NULL);
+
+  /* only a single element buffering, the buffering percent should be the
+   * same as it */
+  gst_element_post_message (mq0, gst_message_new_buffering (GST_OBJECT (mq0),
+          50));
+  msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+  fail_unless (msg != NULL);
+  fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq0);
+  gst_message_parse_buffering (msg, &perc);
+  fail_unless (perc == 50);
+  gst_message_unref (msg);
+
+  /* two elements buffering, the buffering percent should be the
+   * lowest one */
+  gst_element_post_message (mq1, gst_message_new_buffering (GST_OBJECT (mq1),
+          20));
+  msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+  fail_unless (msg != NULL);
+  fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1);
+  gst_message_parse_buffering (msg, &perc);
+  fail_unless (perc == 20);
+  gst_message_unref (msg);
+
+  /* a 100% message should be ignored */
+  gst_element_post_message (mq2, gst_message_new_buffering (GST_OBJECT (mq2),
+          100));
+  msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+  fail_unless (msg != NULL);
+  fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1);
+  gst_message_parse_buffering (msg, &perc);
+  fail_unless (perc == 20);
+  gst_message_unref (msg);
+
+  /* a new buffering message is posted with a higher value, go with the 20 */
+  gst_element_post_message (mq2, gst_message_new_buffering (GST_OBJECT (mq2),
+          80));
+  msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+  fail_unless (msg != NULL);
+  fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1);
+  gst_message_parse_buffering (msg, &perc);
+  fail_unless (perc == 20);
+  gst_message_unref (msg);
+
+  /* The mq1 finishes buffering, new buffering status is now 50% from mq0 */
+  gst_element_post_message (mq1, gst_message_new_buffering (GST_OBJECT (mq1),
+          100));
+  msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
+  fail_unless (msg != NULL);
+  fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq0);
+  gst_message_parse_buffering (msg, &perc);
+  fail_unless (perc == 50);
+  gst_message_unref (msg);
+
+  gst_element_set_state (pipe, GST_STATE_NULL);
+  gst_object_unref (pipe);
+}
+
+GST_END_TEST;
+
 static Suite *
 decodebin_suite (void)
 {
@@ -634,6 +727,7 @@ decodebin_suite (void)
   tcase_add_test (tc_chain, test_reuse_without_decoders);
   tcase_add_test (tc_chain, test_mp3_parser_loop);
   tcase_add_test (tc_chain, test_parser_negotiation);
+  tcase_add_test (tc_chain, test_buffering_aggregation);
 
   return s;
 }