static void no_more_pads (GstElement * element, GstDynamic * dynamic);
static void queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin);
+static void queue_underrun_cb (GstElement * queue, GstDecodeBin * decode_bin);
static GstElementClass *parent_class;
static guint gst_decode_bin_signals[LAST_SIGNAL] = { 0 };
decode_bin->queues = g_list_append (decode_bin->queues, queue);
g_signal_connect (G_OBJECT (queue),
"overrun", G_CALLBACK (queue_filled_cb), decode_bin);
+ g_signal_connect (G_OBJECT (queue),
+ "underrun", G_CALLBACK (queue_underrun_cb), decode_bin);
}
/* The link worked, now figure out what it was that we connected */
gst_bin_remove (GST_BIN (decode_bin), elem);
}
+/* there are @bytes bytes in @queue, enlarge it
+ *
+ * Returns: new max number of bytes in @queue
+ */
+static guint
+queue_enlarge (GstElement * queue, guint bytes, GstDecodeBin * decode_bin)
+{
+ /* Increase the queue size by 1Mbyte if it is over 1Mb, else double its current limit
+ */
+ if (bytes > 1024 * 1024)
+ bytes += 1024 * 1024;
+ else
+ bytes *= 2;
+
+ GST_DEBUG_OBJECT (decode_bin,
+ "increasing queue %s max-size-bytes to %d", GST_ELEMENT_NAME (queue),
+ bytes);
+ g_object_set (G_OBJECT (queue), "max-size-bytes", bytes, NULL);
+
+ return bytes;
+}
+
+/* this callback is called when our queues fills up or are empty
+ * We then check the status of all other queues to make sure we
+ * never have an empty and full queue at the same time since that
+ * would block dataflow. In the case of a filled queue, we make
+ * it larger.
+ */
+static void
+queue_underrun_cb (GstElement * queue, GstDecodeBin * decode_bin)
+{
+ /* FIXME: we don't really do anything here for now. Ideally we should
+ * see if some of the queues are filled and increase their values
+ * in that case.
+ * Note: be very carefull with thread safety here as this underrun
+ * signal is done from the streaming thread of queue srcpad which
+ * is different from the pad_added (where we add the queue to the
+ * list) and the overrun signals that are signalled from the
+ * demuxer thread.
+ */
+}
+
/* Make sure we don't have a full queue and empty queue situation */
static void
queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin)
gboolean increase = FALSE;
guint bytes;
+ /* get current byte level from the queue that is filled */
g_object_get (G_OBJECT (queue), "current-level-bytes", &bytes, NULL);
GST_DEBUG_OBJECT (decode_bin, "One of the queues is full at %d bytes", bytes);
- if (bytes > (20 * 1024 * 1024)) {
- GST_WARNING_OBJECT (decode_bin,
- "Queue is bigger than 20Mbytes, something else is going wrong");
- return;
- }
+ /* we do not buffer more than 20Mb */
+ if (bytes > (20 * 1024 * 1024))
+ goto too_large;
+ /* check all other queue to see if one is empty, in that case
+ * we need to enlarge @queue */
for (tmp = decode_bin->queues; tmp; tmp = g_list_next (tmp)) {
GstElement *aqueue = GST_ELEMENT (tmp->data);
guint levelbytes = -1;
if (aqueue != queue) {
- g_object_get (G_OBJECT (aqueue),
- "current-level-bytes", &levelbytes, NULL);
+ g_object_get (G_OBJECT (aqueue), "current-level-bytes", &levelbytes,
+ NULL);
if (levelbytes == 0) {
+ /* yup, found an empty queue, we can stop the search and
+ * need to enlarge the queue */
increase = TRUE;
+ break;
}
}
}
if (increase) {
- /*
- * Increase the queue size by 1Mbyte if it is over 1Mb, else double its current limit
- */
- if (bytes > 1024 * 1024)
- bytes += 1024 * 1024;
- else
- bytes *= 2;
- GST_DEBUG_OBJECT (decode_bin,
- "One of the other queues is empty, increasing queue byte limit to %d",
- bytes);
- g_object_set (G_OBJECT (queue), "max-size-bytes", bytes, NULL);
- } else
+ /* enlarge @queue */
+ queue_enlarge (queue, bytes, decode_bin);
+ } else {
GST_DEBUG_OBJECT (decode_bin,
"Queue is full but other queues are not empty, not doing anything");
+ }
+ return;
+
+ /* errors */
+too_large:
+ {
+ GST_WARNING_OBJECT (decode_bin,
+ "Queue is bigger than 20Mbytes, something else is going wrong");
+ return;
+ }
}
/* This function will be called when a dynamic pad is created on an element.
/* this signal will be fired when one of the queues with raw
* data is filled. This means that the group building stage is over
- * and playback of the new queued group should start */
+ * and playback of the new queued group should start
+ *
+ * If this queue overruns we can potentially create a deadlock when:
+ *
+ * 1) the max-bytes is hit and
+ * 2) the min-time is not hit.
+ *
+ * We recover from this situation in the overrun callback by
+ * setting the max-bytes to unlimited if we see that there is
+ * a current-time-level (which means some sort of timestamping is
+ * done).
+ */
static void
-queue_overrun (GstElement * element, GstPlayBaseBin * play_base_bin)
+queue_overrun (GstElement * queue, GstPlayBaseBin * play_base_bin)
{
- GST_DEBUG ("queue %s overrun", GST_ELEMENT_NAME (element));
+ GST_DEBUG ("queue %s overrun", GST_ELEMENT_NAME (queue));
+
+ /* if we're streaming, check if we had a deadlock with the
+ * max-bytes <> min-time thresholds */
+ if (play_base_bin->is_stream) {
+ guint64 time, min_time;
+
+ g_object_get (G_OBJECT (queue), "current-level-time", &time,
+ "min-threshold-time", &min_time, NULL);
+
+ GST_DEBUG_OBJECT (play_base_bin, "streaming mode, queue %s current %"
+ GST_TIME_FORMAT ", min %" GST_TIME_FORMAT,
+ GST_ELEMENT_NAME (queue),
+ GST_TIME_ARGS (time), GST_TIME_ARGS (min_time));
+ if (time != 0) {
+ /* queue knows about time, disable bytes checking. */
+ g_object_set (G_OBJECT (queue), "max-size-bytes", 0, NULL);
+ }
+ }
group_commit (play_base_bin, FALSE,
- GST_OBJECT_PARENT (GST_OBJECT_CAST (element)) ==
+ GST_OBJECT_PARENT (GST_OBJECT_CAST (queue)) ==
GST_OBJECT (play_base_bin->subtitle));
- g_signal_handlers_disconnect_by_func (element,
+ g_signal_handlers_disconnect_by_func (queue,
G_CALLBACK (queue_overrun), play_base_bin);
/* We have disconnected this signal, remove the signal_id from the object
data */
- g_object_set_data (G_OBJECT (element), "signal_id", NULL);
+ g_object_set_data (G_OBJECT (queue), "signal_id", NULL);
}
/* Used for time-based buffering. */
GST_DEBUG ("Running");
/* play */
+ g_signal_handlers_disconnect_by_func (queue,
+ G_CALLBACK (queue_threshold_reached), play_base_bin);
g_object_set (queue, "min-threshold-time", (guint64) 0, NULL);
data = g_object_get_data (G_OBJECT (queue), "probe");
}
}
+/* this signal is only added when in streaming mode to catch underruns
+ */
static void
queue_out_of_data (GstElement * queue, GstPlayBaseBin * play_base_bin)
{
+ guint64 time, min_time;
+ guint bytes, max_bytes;
+
GST_DEBUG ("Underrun, re-caching");
+ g_object_get (G_OBJECT (queue), "current-level-time", &time,
+ "current-level-bytes", &bytes,
+ "max-size-bytes", &max_bytes, "min-threshold-time", &min_time, NULL);
+
+ GST_DEBUG_OBJECT (play_base_bin, "streaming mode, queue %s current %"
+ GST_TIME_FORMAT ", min %" GST_TIME_FORMAT
+ ", bytes %d",
+ GST_ELEMENT_NAME (queue),
+ GST_TIME_ARGS (time), GST_TIME_ARGS (min_time), bytes);
+
+ /* if the bytes in the queue represent time, we disable bytes
+ * overrun checking to not cause deadlocks.
+ */
+ if (bytes && time != 0 && time < min_time) {
+ /* queue knows about time but is filled with bytes that do
+ * not represent min-threshold time, disable bytes checking so
+ * the queue can grow some more. */
+ g_object_set (G_OBJECT (queue), "max-size-bytes", 0, NULL);
+ }
+
/* On underrun, we want to temoprarily pause playback, set a "min-size"
* threshold and wait for the running signal and then play again. Take
* care of possible deadlocks and so on, */
+ g_signal_connect (G_OBJECT (queue), "running",
+ G_CALLBACK (queue_threshold_reached), play_base_bin);
g_object_set (queue, "min-threshold-time",
(guint64) play_base_bin->queue_threshold, NULL);
g_free (name);
g_free (padname);
+ /* for buffering of raw data we ideally want to buffer a
+ * very small amount of buffers since the memory used by
+ * this raw data can be enormously huge.
+ *
+ * FIXME: we abuse this buffer to do network buffering since
+ * we can then easily do time-based buffering. The better
+ * solution would be to add a specific network queue right
+ * after the source that measures the datarate and scales this
+ * queue of encoded data instead.
+ *
+ * We use an upper limit of typically a few seconds here but
+ * cap in case no timestamps are set on the raw data (bad!).
+ */
g_object_set (G_OBJECT (preroll),
"max-size-buffers", 0, "max-size-bytes",
((type == GST_STREAM_TYPE_VIDEO) ? 25 : 1) * 1024 * 1024,
g_object_set_data (G_OBJECT (preroll), "pbb", play_base_bin);
g_object_set_data (G_OBJECT (preroll), "probe", GINT_TO_POINTER (id));
+ /*
+ * When we connect this queue, it will start running and immediatly
+ * fire an underrun when:
+ *
+ * 1) the max-bytes is hit
+ * 2) the min-time is not hit.
+ *
+ * We recover from this situation in the out_of_data callback by
+ * setting the max-bytes to unlimited if we see that there is
+ * a current-time-level (which means some sort of timestamping is
+ * done).
+ */
g_signal_connect (G_OBJECT (preroll), "underrun",
G_CALLBACK (queue_out_of_data), play_base_bin);
}
+
/* keep a ref to the signal id so that we can disconnect the signal callback
* when we are done with the preroll */
g_object_set_data (G_OBJECT (preroll), "signal_id", GINT_TO_POINTER (sig));