gst/playback/gstdecodebin.c: Also catch queue underruns but don't do anything yet.
authorWim Taymans <wim.taymans@gmail.com>
Thu, 11 May 2006 19:38:22 +0000 (19:38 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 11 May 2006 19:38:22 +0000 (19:38 +0000)
Original commit message from CVS:
* gst/playback/gstdecodebin.c: (try_to_link_1), (queue_enlarge),
(queue_underrun_cb), (queue_filled_cb):
Also catch queue underruns but don't do anything yet.
Refactor and comment queue enlarging code a bit.
* gst/playback/gstplaybasebin.c: (queue_overrun),
(queue_threshold_reached), (queue_out_of_data),
(gen_preroll_element):
If a queue over/underruns check that we don't create nasty
deadlocks when the min-threshold is not reached but the
max-bytes is. In those cases disable max-bytes when we
know that the queue is fed timed data.
Add more comments.

ChangeLog
gst/playback/gstdecodebin.c
gst/playback/gstplaybasebin.c

index 7fc10a1..7e3c94d 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,19 @@
+2006-05-11  Wim Taymans  <wim@fluendo.com>
+
+       * gst/playback/gstdecodebin.c: (try_to_link_1), (queue_enlarge),
+       (queue_underrun_cb), (queue_filled_cb):
+       Also catch queue underruns but don't do anything yet.
+       Refactor and comment queue enlarging code a bit.
+
+       * gst/playback/gstplaybasebin.c: (queue_overrun),
+       (queue_threshold_reached), (queue_out_of_data),
+       (gen_preroll_element):
+       If a queue over/underruns check that we don't create nasty
+       deadlocks when the min-threshold is not reached but the
+       max-bytes is. In those cases disable max-bytes when we
+       know that the queue is fed timed data.
+       Add more comments.
+
 2006-05-11  Tim-Philipp Müller  <tim at centricular dot net>
 
        * gst/playback/gstplaybin.c: (gen_audio_element):
index 02ff34d..5088451 100644 (file)
@@ -138,6 +138,7 @@ static void new_pad (GstElement * element, GstPad * pad, GstDynamic * dynamic);
 static void no_more_pads (GstElement * element, GstDynamic * dynamic);
 
 static void queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin);
+static void queue_underrun_cb (GstElement * queue, GstDecodeBin * decode_bin);
 
 static GstElementClass *parent_class;
 static guint gst_decode_bin_signals[LAST_SIGNAL] = { 0 };
@@ -805,6 +806,8 @@ try_to_link_1 (GstDecodeBin * decode_bin, GstElement * srcelement, GstPad * pad,
         decode_bin->queues = g_list_append (decode_bin->queues, queue);
         g_signal_connect (G_OBJECT (queue),
             "overrun", G_CALLBACK (queue_filled_cb), decode_bin);
+        g_signal_connect (G_OBJECT (queue),
+            "underrun", G_CALLBACK (queue_underrun_cb), decode_bin);
       }
 
       /* The link worked, now figure out what it was that we connected */
@@ -990,6 +993,48 @@ remove_element_chain (GstDecodeBin * decode_bin, GstPad * pad)
   gst_bin_remove (GST_BIN (decode_bin), elem);
 }
 
+/* there are @bytes bytes in @queue, enlarge it 
+ *
+ * Returns: new max number of bytes in @queue
+ */
+static guint
+queue_enlarge (GstElement * queue, guint bytes, GstDecodeBin * decode_bin)
+{
+  /* Increase the queue size by 1Mbyte if it is over 1Mb, else double its current limit
+   */
+  if (bytes > 1024 * 1024)
+    bytes += 1024 * 1024;
+  else
+    bytes *= 2;
+
+  GST_DEBUG_OBJECT (decode_bin,
+      "increasing queue %s max-size-bytes to %d", GST_ELEMENT_NAME (queue),
+      bytes);
+  g_object_set (G_OBJECT (queue), "max-size-bytes", bytes, NULL);
+
+  return bytes;
+}
+
+/* this callback is called when our queues fills up or are empty
+ * We then check the status of all other queues to make sure we
+ * never have an empty and full queue at the same time since that
+ * would block dataflow. In the case of a filled queue, we make
+ * it larger.
+ */
+static void
+queue_underrun_cb (GstElement * queue, GstDecodeBin * decode_bin)
+{
+  /* FIXME: we don't really do anything here for now. Ideally we should
+   * see if some of the queues are filled and increase their values
+   * in that case. 
+   * Note: be very carefull with thread safety here as this underrun
+   * signal is done from the streaming thread of queue srcpad which
+   * is different from the pad_added (where we add the queue to the
+   * list) and the overrun signals that are signalled from the
+   * demuxer thread.
+   */
+}
+
 /* Make sure we don't have a full queue and empty queue situation */
 static void
 queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin)
@@ -998,43 +1043,48 @@ queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin)
   gboolean increase = FALSE;
   guint bytes;
 
+  /* get current byte level from the queue that is filled */
   g_object_get (G_OBJECT (queue), "current-level-bytes", &bytes, NULL);
   GST_DEBUG_OBJECT (decode_bin, "One of the queues is full at %d bytes", bytes);
 
-  if (bytes > (20 * 1024 * 1024)) {
-    GST_WARNING_OBJECT (decode_bin,
-        "Queue is bigger than 20Mbytes, something else is going wrong");
-    return;
-  }
+  /* we do not buffer more than 20Mb */
+  if (bytes > (20 * 1024 * 1024))
+    goto too_large;
 
+  /* check all other queue to see if one is empty, in that case
+   * we need to enlarge @queue */
   for (tmp = decode_bin->queues; tmp; tmp = g_list_next (tmp)) {
     GstElement *aqueue = GST_ELEMENT (tmp->data);
     guint levelbytes = -1;
 
     if (aqueue != queue) {
-      g_object_get (G_OBJECT (aqueue),
-          "current-level-bytes", &levelbytes, NULL);
+      g_object_get (G_OBJECT (aqueue), "current-level-bytes", &levelbytes,
+          NULL);
       if (levelbytes == 0) {
+        /* yup, found an empty queue, we can stop the search and
+         * need to enlarge the queue */
         increase = TRUE;
+        break;
       }
     }
   }
 
   if (increase) {
-    /* 
-     * Increase the queue size by 1Mbyte if it is over 1Mb, else double its current limit
-     */
-    if (bytes > 1024 * 1024)
-      bytes += 1024 * 1024;
-    else
-      bytes *= 2;
-    GST_DEBUG_OBJECT (decode_bin,
-        "One of the other queues is empty, increasing queue byte limit to %d",
-        bytes);
-    g_object_set (G_OBJECT (queue), "max-size-bytes", bytes, NULL);
-  } else
+    /* enlarge @queue */
+    queue_enlarge (queue, bytes, decode_bin);
+  } else {
     GST_DEBUG_OBJECT (decode_bin,
         "Queue is full but other queues are not empty, not doing anything");
+  }
+  return;
+
+  /* errors */
+too_large:
+  {
+    GST_WARNING_OBJECT (decode_bin,
+        "Queue is bigger than 20Mbytes, something else is going wrong");
+    return;
+  }
 }
 
 /* This function will be called when a dynamic pad is created on an element.
index 578a98f..913ba33 100644 (file)
@@ -496,21 +496,50 @@ check_queue (GstPad * pad, GstBuffer * data, gpointer user_data)
 
 /* this signal will be fired when one of the queues with raw
  * data is filled. This means that the group building stage is over 
- * and playback of the new queued group should start */
+ * and playback of the new queued group should start
+ *
+ * If this queue overruns we can potentially create a deadlock when:
+ *
+ *  1) the max-bytes is hit and
+ *  2) the min-time is not hit.
+ *
+ * We recover from this situation in the overrun callback by
+ * setting the max-bytes to unlimited if we see that there is
+ * a current-time-level (which means some sort of timestamping is
+ * done).
+ */
 static void
-queue_overrun (GstElement * element, GstPlayBaseBin * play_base_bin)
+queue_overrun (GstElement * queue, GstPlayBaseBin * play_base_bin)
 {
-  GST_DEBUG ("queue %s overrun", GST_ELEMENT_NAME (element));
+  GST_DEBUG ("queue %s overrun", GST_ELEMENT_NAME (queue));
+
+  /* if we're streaming, check if we had a deadlock with the 
+   * max-bytes <> min-time thresholds */
+  if (play_base_bin->is_stream) {
+    guint64 time, min_time;
+
+    g_object_get (G_OBJECT (queue), "current-level-time", &time,
+        "min-threshold-time", &min_time, NULL);
+
+    GST_DEBUG_OBJECT (play_base_bin, "streaming mode, queue %s current %"
+        GST_TIME_FORMAT ", min %" GST_TIME_FORMAT,
+        GST_ELEMENT_NAME (queue),
+        GST_TIME_ARGS (time), GST_TIME_ARGS (min_time));
+    if (time != 0) {
+      /* queue knows about time, disable bytes checking. */
+      g_object_set (G_OBJECT (queue), "max-size-bytes", 0, NULL);
+    }
+  }
 
   group_commit (play_base_bin, FALSE,
-      GST_OBJECT_PARENT (GST_OBJECT_CAST (element)) ==
+      GST_OBJECT_PARENT (GST_OBJECT_CAST (queue)) ==
       GST_OBJECT (play_base_bin->subtitle));
 
-  g_signal_handlers_disconnect_by_func (element,
+  g_signal_handlers_disconnect_by_func (queue,
       G_CALLBACK (queue_overrun), play_base_bin);
   /* We have disconnected this signal, remove the signal_id from the object
      data */
-  g_object_set_data (G_OBJECT (element), "signal_id", NULL);
+  g_object_set_data (G_OBJECT (queue), "signal_id", NULL);
 }
 
 /* Used for time-based buffering. */
@@ -522,6 +551,8 @@ queue_threshold_reached (GstElement * queue, GstPlayBaseBin * play_base_bin)
   GST_DEBUG ("Running");
 
   /* play */
+  g_signal_handlers_disconnect_by_func (queue,
+      G_CALLBACK (queue_threshold_reached), play_base_bin);
   g_object_set (queue, "min-threshold-time", (guint64) 0, NULL);
 
   data = g_object_get_data (G_OBJECT (queue), "probe");
@@ -542,14 +573,41 @@ queue_threshold_reached (GstElement * queue, GstPlayBaseBin * play_base_bin)
   }
 }
 
+/* this signal is only added when in streaming mode to catch underruns
+ */
 static void
 queue_out_of_data (GstElement * queue, GstPlayBaseBin * play_base_bin)
 {
+  guint64 time, min_time;
+  guint bytes, max_bytes;
+
   GST_DEBUG ("Underrun, re-caching");
 
+  g_object_get (G_OBJECT (queue), "current-level-time", &time,
+      "current-level-bytes", &bytes,
+      "max-size-bytes", &max_bytes, "min-threshold-time", &min_time, NULL);
+
+  GST_DEBUG_OBJECT (play_base_bin, "streaming mode, queue %s current %"
+      GST_TIME_FORMAT ", min %" GST_TIME_FORMAT
+      ", bytes %d",
+      GST_ELEMENT_NAME (queue),
+      GST_TIME_ARGS (time), GST_TIME_ARGS (min_time), bytes);
+
+  /* if the bytes in the queue represent time, we disable bytes
+   * overrun checking to not cause deadlocks.
+   */
+  if (bytes && time != 0 && time < min_time) {
+    /* queue knows about time but is filled with bytes that do
+     * not represent min-threshold time, disable bytes checking so
+     * the queue can grow some more. */
+    g_object_set (G_OBJECT (queue), "max-size-bytes", 0, NULL);
+  }
+
   /* On underrun, we want to temoprarily pause playback, set a "min-size"
    * threshold and wait for the running signal and then play again. Take
    * care of possible deadlocks and so on, */
+  g_signal_connect (G_OBJECT (queue), "running",
+      G_CALLBACK (queue_threshold_reached), play_base_bin);
   g_object_set (queue, "min-threshold-time",
       (guint64) play_base_bin->queue_threshold, NULL);
 
@@ -611,6 +669,19 @@ gen_preroll_element (GstPlayBaseBin * play_base_bin,
   g_free (name);
   g_free (padname);
 
+  /* for buffering of raw data we ideally want to buffer a
+   * very small amount of buffers since the memory used by
+   * this raw data can be enormously huge.
+   *
+   * FIXME: we abuse this buffer to do network buffering since
+   * we can then easily do time-based buffering. The better
+   * solution would be to add a specific network queue right
+   * after the source that measures the datarate and scales this
+   * queue of encoded data instead.
+   *
+   * We use an upper limit of typically a few seconds here but
+   * cap in case no timestamps are set on the raw data (bad!).
+   */
   g_object_set (G_OBJECT (preroll),
       "max-size-buffers", 0, "max-size-bytes",
       ((type == GST_STREAM_TYPE_VIDEO) ? 25 : 1) * 1024 * 1024,
@@ -641,9 +712,22 @@ gen_preroll_element (GstPlayBaseBin * play_base_bin,
     g_object_set_data (G_OBJECT (preroll), "pbb", play_base_bin);
     g_object_set_data (G_OBJECT (preroll), "probe", GINT_TO_POINTER (id));
 
+    /* 
+     * When we connect this queue, it will start running and immediatly
+     * fire an underrun when:
+     *
+     * 1) the max-bytes is hit
+     * 2) the min-time is not hit.
+     *
+     * We recover from this situation in the out_of_data callback by
+     * setting the max-bytes to unlimited if we see that there is
+     * a current-time-level (which means some sort of timestamping is
+     * done).
+     */
     g_signal_connect (G_OBJECT (preroll), "underrun",
         G_CALLBACK (queue_out_of_data), play_base_bin);
   }
+
   /* keep a ref to the signal id so that we can disconnect the signal callback
    * when we are done with the preroll */
   g_object_set_data (G_OBJECT (preroll), "signal_id", GINT_TO_POINTER (sig));