multiqueue: Not post BUFFERING message if one of the singlequeue doesn't need it
authorThibault Saunier <thibault.saunier@collabora.com>
Thu, 7 Aug 2014 10:18:04 +0000 (12:18 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Wed, 13 Aug 2014 15:25:12 +0000 (18:25 +0300)
Imagine the following 'pipeline'

                --------------
            p1/| 'fullqueue'  |--- 'laggy' downstream
  ---------  / |              |
-| demuxer |   | multiqueue   |
  ---------  \ |              |
            p2\| 'emptyqueue' |--- 'fast' downstream
                --------------

In the case downstream of one single queue (fullqueue) has (a lot of) latency
(for example for reverse playback with video), we can end up having the other
SingleQueue (emptyqueue) emptied, before that fullqueue gets
unblocked. In the meantime, the demuxer tries to push on fullqueue, and
is blocking there.

In that case the current code will post a BUFFERING message on the bus when
emptyqueue gets emptied, that leads to the application setting the pipeline state to
PAUSED. So now we end up in a situation where 'laggy downstream' is
prerolled and will not unblock anymore because the pipeline is set to
PAUSED, the fullequeue does not have a chance to be emptied and
the emptyqueue can not get filled anymore so no more BUFERRING message
will be posted and the pipeline is stucked in PAUSED for the eternity.

Making sure that we do not try to "buffer" if one of the single queue
does not need buffering, prevents this situtation from happening though it lets the
oportunity for buffering in all other cases.

That implements a new logic where we need all singlequeue to need
buffering for the multiqueue to actually state buffering is needed,
taking the maximum buffering of the single queue as the reference point.

https://bugzilla.gnome.org/show_bug.cgi?id=734412

plugins/elements/gstmultiqueue.c

index 9f58806..1769dd7 100644 (file)
@@ -832,8 +832,9 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
     result = gst_pad_pause_task (sq->srcpad);
     sq->sink_tainted = sq->src_tainted = TRUE;
   } else {
-    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
     gst_single_queue_flush_queue (sq, full);
+
+    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
     gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
     gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
     sq->has_src_segment = FALSE;
@@ -864,20 +865,16 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
   return result;
 }
 
-static void
-update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
+/* WITH LOCK TAKEN */
+static gint
+get_percentage (GstSingleQueue * sq)
 {
   GstDataQueueSize size;
   gint percent, tmp;
-  gboolean post = FALSE;
-
-  /* nothing to dowhen we are not in buffering mode */
-  if (!mq->use_buffering)
-    return;
 
   gst_data_queue_get_level (sq->queue, &size);
 
-  GST_DEBUG_OBJECT (mq,
+  GST_DEBUG_OBJECT (sq->mqueue,
       "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
       G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
       size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
@@ -897,6 +894,22 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
     }
   }
 
+  return percent;
+}
+
+/* WITH LOCK TAKEN */
+static void
+update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
+{
+  gint percent;
+  gboolean post = FALSE;
+
+  /* nothing to dowhen we are not in buffering mode */
+  if (!mq->use_buffering)
+    return;
+
+  percent = get_percentage (sq);
+
   if (mq->buffering) {
     post = TRUE;
     if (percent >= mq->high_percent) {
@@ -912,7 +925,20 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
       /* else keep last value we posted */
       mq->percent = percent;
   } else {
-    if (percent < mq->low_percent) {
+    GList *iter;
+    gboolean is_buffering = TRUE;
+
+    for (iter = mq->queues; iter; iter = g_list_next (iter)) {
+      GstSingleQueue *oq = (GstSingleQueue *) iter->data;
+
+      if (get_percentage (oq) >= 100) {
+        is_buffering = FALSE;
+
+        break;
+      }
+    }
+
+    if (is_buffering && percent < mq->low_percent) {
       mq->buffering = TRUE;
       mq->percent = percent;
       post = TRUE;
@@ -1698,7 +1724,9 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       }
 
       /* EOS affects the buffering state */
+      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
       update_buffering (mq, sq);
+      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       single_queue_overrun_cb (sq->queue, sq);
       break;
     case GST_EVENT_SEGMENT:
@@ -2165,7 +2193,9 @@ gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
   if (was_flushing)
     gst_data_queue_set_flushing (sq->queue, TRUE);
 
+  GST_MULTI_QUEUE_MUTEX_LOCK (sq->mqueue);
   update_buffering (sq->mqueue, sq);
+  GST_MULTI_QUEUE_MUTEX_UNLOCK (sq->mqueue);
 }
 
 static void