multiqueue: Fix high_time wakeup logic
authorEdward Hervey <edward@centricular.com>
Tue, 23 Aug 2016 05:57:33 +0000 (14:57 +0900)
committerSebastian Dröge <sebastian@centricular.com>
Tue, 30 Aug 2016 09:28:40 +0000 (12:28 +0300)
When calculating the high_time, cache the group value in each singlequeue.

This fixes the issue by which wake_up_next_non_linked() would use the global
high-time to decide whether to wake-up a waiting thread, instead of the group
one, resulting in those threads constantly spinning.

Tidy up a bit the waiting logic while we're at it.

With this patch, we go from 212% playing a 8 audio / 8 video file down to less
than 10% (most of it being the video decoding).

https://bugzilla.gnome.org/show_bug.cgi?id=770225

plugins/elements/gstmultiqueue.c

index 9503c2a..e382ba2 100644 (file)
@@ -139,6 +139,7 @@ struct _GstSingleQueue
   guint id;
   /* group of streams to which this queue belongs to */
   guint groupid;
+  GstClockTimeDiff group_high_time;
 
   GstMultiQueue *mqueue;
 
@@ -216,7 +217,7 @@ static void gst_single_queue_free (GstSingleQueue * squeue);
 
 static void wake_up_next_non_linked (GstMultiQueue * mq);
 static void compute_high_id (GstMultiQueue * mq);
-static GstClockTimeDiff compute_high_time (GstMultiQueue * mq, guint groupid);
+static void compute_high_time (GstMultiQueue * mq, guint groupid);
 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
 static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
 
@@ -1019,6 +1020,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
     sq->next_time = GST_CLOCK_STIME_NONE;
     sq->last_time = GST_CLOCK_STIME_NONE;
     sq->cached_sinktime = GST_CLOCK_STIME_NONE;
+    sq->group_high_time = GST_CLOCK_STIME_NONE;
     gst_data_queue_set_flushing (sq->queue, FALSE);
 
     /* We will become active again on the next buffer/gap */
@@ -1733,30 +1735,37 @@ next:
       sq->oldid = sq->last_oldid;
 
     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
-      GstClockTimeDiff high_time;
+      gboolean should_wait;
       /* Go to sleep until it's time to push this buffer */
 
       /* Recompute the highid */
       compute_high_id (mq);
       /* Recompute the high time */
-      high_time = compute_high_time (mq, sq->groupid);
+      compute_high_time (mq, sq->groupid);
 
       GST_DEBUG_OBJECT (mq,
           "groupid %d high_time %" GST_STIME_FORMAT " next_time %"
-          GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (high_time),
+          GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (sq->group_high_time),
           GST_STIME_ARGS (next_time));
 
-      while (((mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (next_time)
-                  && (high_time == GST_CLOCK_STIME_NONE
-                      || next_time > high_time))
-              || (!mq->sync_by_running_time && newid > mq->highid))
-          && sq->srcresult == GST_FLOW_NOT_LINKED) {
+      if (mq->sync_by_running_time)
+        /* In this case we only need to wait if:
+         * 1) there is a time against which to wait
+         * 2) and either we have gone over the high_time or there is no
+         *   high_time */
+        should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
+            (sq->group_high_time == GST_CLOCK_STIME_NONE
+            || next_time > sq->group_high_time);
+      else
+        should_wait = newid > mq->highid;
+
+      while (should_wait && sq->srcresult == GST_FLOW_NOT_LINKED) {
 
         GST_DEBUG_OBJECT (mq,
             "queue %d sleeping for not-linked wakeup with "
             "newid %u, highid %u, next_time %" GST_STIME_FORMAT
             ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
-            GST_STIME_ARGS (next_time), GST_STIME_ARGS (high_time));
+            GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time));
 
         /* Wake up all non-linked pads before we sleep */
         wake_up_next_non_linked (mq);
@@ -1771,13 +1780,20 @@ next:
         }
 
         /* Recompute the high time and ID */
-        high_time = compute_high_time (mq, sq->groupid);
+        compute_high_time (mq, sq->groupid);
         compute_high_id (mq);
 
         GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
             "wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT
             ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
-            GST_STIME_ARGS (next_time), GST_STIME_ARGS (high_time));
+            GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time));
+
+        if (mq->sync_by_running_time)
+          should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
+              (sq->group_high_time == GST_CLOCK_STIME_NONE
+              || next_time > sq->group_high_time);
+        else
+          should_wait = newid > mq->highid;
       }
 
       /* Re-compute the high_id in case someone else pushed */
@@ -1836,6 +1852,7 @@ next:
         sq->id);
 
     compute_high_id (mq);
+    compute_high_time (mq, sq->groupid);
     do_update_buffering = TRUE;
 
     /* maybe no-one is waiting */
@@ -2419,8 +2436,9 @@ wake_up_next_non_linked (GstMultiQueue * mq)
     for (tmp = mq->queues; tmp; tmp = tmp->next) {
       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
       if (sq->srcresult == GST_FLOW_NOT_LINKED
+          && GST_CLOCK_STIME_IS_VALID (sq->group_high_time)
           && GST_CLOCK_STIME_IS_VALID (sq->next_time)
-          && sq->next_time <= mq->high_time) {
+          && sq->next_time <= sq->group_high_time) {
         GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
         g_cond_signal (&sq->turn);
       }
@@ -2482,7 +2500,7 @@ compute_high_id (GstMultiQueue * mq)
 }
 
 /* WITH LOCK TAKEN */
-static GstClockTimeDiff
+static void
 compute_high_time (GstMultiQueue * mq, guint groupid)
 {
   /* The high-time is either the highest last time among the linked
@@ -2493,11 +2511,13 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
   GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE;
   GstClockTimeDiff group_high = GST_CLOCK_STIME_NONE;
   GstClockTimeDiff group_low = GST_CLOCK_STIME_NONE;
+  GstClockTimeDiff res;
   /* Number of streams which belong to groupid */
   guint group_count = 0;
 
   if (!mq->sync_by_running_time)
-    return GST_CLOCK_STIME_NONE;
+    /* return GST_CLOCK_STIME_NONE; */
+    return;
 
   for (tmp = mq->queues; tmp; tmp = tmp->next) {
     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
@@ -2557,11 +2577,17 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
 
   /* If there's only one stream of a given type, use the global high */
   if (group_count < 2)
-    return mq->high_time;
+    res = mq->high_time;
+  else if (group_high == GST_CLOCK_STIME_NONE)
+    res = group_low;
+  else
+    res = group_high;
 
-  if (group_high == GST_CLOCK_STIME_NONE)
-    return group_low;
-  return group_high;
+  for (tmp = mq->queues; tmp; tmp = tmp->next) {
+    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+    if (groupid == sq->groupid)
+      sq->group_high_time = res;
+  }
 }
 
 #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
@@ -2807,6 +2833,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
   mqueue->nbqueues++;
   sq->id = temp_id;
   sq->groupid = DEFAULT_PAD_GROUP_ID;
+  sq->group_high_time = GST_CLOCK_STIME_NONE;
 
   mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq);
   mqueue->queues_cookie++;