multiqueue: Add mode to synchronize deactivated/not-linked streams by the running...
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Tue, 22 Mar 2011 12:19:47 +0000 (13:19 +0100)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Sat, 14 May 2011 09:39:35 +0000 (11:39 +0200)
Fixes bug #645107, #600648.

plugins/elements/gstmultiqueue.c
plugins/elements/gstmultiqueue.h

index b6b2d24..e1ddc81 100644 (file)
@@ -156,6 +156,8 @@ struct _GstSingleQueue
   guint32 nextid;               /* ID of the next object waiting to be pushed */
   guint32 oldid;                /* ID of the last object pushed (last in a series) */
   guint32 last_oldid;           /* Previously observed old_id, reset to MAXUINT32 on flush */
+  GstClockTime next_time;       /* End running time of next buffer to be pushed */
+  GstClockTime last_time;       /* Start running time of last pushed buffer */
   GCond *turn;                  /* SingleQueue turn waiting conditional */
 };
 
@@ -179,6 +181,7 @@ static void gst_single_queue_free (GstSingleQueue * squeue);
 
 static void wake_up_next_non_linked (GstMultiQueue * mq);
 static void compute_high_id (GstMultiQueue * mq);
+static void compute_high_time (GstMultiQueue * mq);
 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
 static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
 
@@ -224,6 +227,7 @@ enum
 #define DEFAULT_USE_BUFFERING FALSE
 #define DEFAULT_LOW_PERCENT   10
 #define DEFAULT_HIGH_PERCENT  99
+#define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
 
 enum
 {
@@ -237,6 +241,7 @@ enum
   PROP_USE_BUFFERING,
   PROP_LOW_PERCENT,
   PROP_HIGH_PERCENT,
+  PROP_SYNC_BY_RUNNING_TIME,
   PROP_LAST
 };
 
@@ -396,6 +401,22 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass)
           "High threshold for buffering to finish", 0, 100,
           DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstMultiQueue:sync-by-running-time
+   * 
+   * If enabled multiqueue will synchronize deactivated or not-linked streams
+   * to the activated and linked streams by taking the running time.
+   * Otherwise multiqueue will synchronize the deactivated or not-linked
+   * streams by keeping the order in which buffers and events arrived compared
+   * to active and linked streams.
+   *
+   * Since: 0.10.33
+   */
+  g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
+      g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
+          "Synchronize deactivated or not-linked streams by running time",
+          DEFAULT_SYNC_BY_RUNNING_TIME,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gobject_class->finalize = gst_multi_queue_finalize;
 
@@ -425,8 +446,11 @@ gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
   mqueue->low_percent = DEFAULT_LOW_PERCENT;
   mqueue->high_percent = DEFAULT_HIGH_PERCENT;
 
+  mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
+
   mqueue->counter = 1;
   mqueue->highid = -1;
+  mqueue->high_time = GST_CLOCK_TIME_NONE;
 
   mqueue->qlock = g_mutex_new ();
 }
@@ -499,6 +523,9 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
     case PROP_HIGH_PERCENT:
       mq->high_percent = g_value_get_int (value);
       break;
+    case PROP_SYNC_BY_RUNNING_TIME:
+      mq->sync_by_running_time = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -541,6 +568,9 @@ gst_multi_queue_get_property (GObject * object, guint prop_id,
     case PROP_HIGH_PERCENT:
       g_value_set_int (value, mq->high_percent);
       break;
+    case PROP_SYNC_BY_RUNNING_TIME:
+      g_value_set_boolean (value, mq->sync_by_running_time);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -740,8 +770,15 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
     sq->nextid = 0;
     sq->oldid = 0;
     sq->last_oldid = G_MAXUINT32;
+    sq->next_time = GST_CLOCK_TIME_NONE;
+    sq->last_time = GST_CLOCK_TIME_NONE;
     gst_data_queue_set_flushing (sq->queue, FALSE);
 
+    /* Reset high time to be recomputed next */
+    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+    mq->high_time = GST_CLOCK_TIME_NONE;
+    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+
     sq->flushing = FALSE;
 
     GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
@@ -946,6 +983,71 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 }
 
+static GstClockTime
+get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
+{
+  GstClockTime time = GST_CLOCK_TIME_NONE;
+
+  if (GST_IS_BUFFER (object)) {
+    GstBuffer *buf = GST_BUFFER_CAST (object);
+
+    if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+      time = GST_BUFFER_TIMESTAMP (buf);
+      if (end && GST_BUFFER_DURATION_IS_VALID (buf))
+        time += GST_BUFFER_DURATION (buf);
+      if (time > segment->stop)
+        time = segment->stop;
+      time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
+    }
+  } else if (GST_IS_BUFFER_LIST (object)) {
+    GstBufferList *list = GST_BUFFER_LIST_CAST (object);
+    GstBufferListIterator *it = gst_buffer_list_iterate (list);
+    GstBuffer *buf;
+
+    do {
+      while ((buf = gst_buffer_list_iterator_next (it))) {
+        if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+          time = GST_BUFFER_TIMESTAMP (buf);
+          if (end && GST_BUFFER_DURATION_IS_VALID (buf))
+            time += GST_BUFFER_DURATION (buf);
+          if (time > segment->stop)
+            time = segment->stop;
+          time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
+          if (!end)
+            goto done;
+        } else if (!end) {
+          goto done;
+        }
+      }
+    } while (gst_buffer_list_iterator_next_group (it));
+  } else if (GST_IS_EVENT (object)) {
+    GstEvent *event = GST_EVENT_CAST (object);
+
+    /* For newsegment events return the running time of the start position */
+    if (GST_EVENT_TYPE (event) == GST_EVENT_NEWSEGMENT) {
+      GstSegment new_segment = *segment;
+      gboolean update;
+      gdouble rate, applied_rate;
+      GstFormat format;
+      gint64 start, stop, position;
+
+      gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
+          &format, &start, &stop, &position);
+      if (format == GST_FORMAT_TIME) {
+        gst_segment_set_newsegment_full (&new_segment, update, rate,
+            applied_rate, format, start, stop, position);
+
+        time =
+            gst_segment_to_running_time (&new_segment, GST_FORMAT_TIME,
+            new_segment.start);
+      }
+    }
+  }
+
+done:
+  return time;
+}
+
 static GstFlowReturn
 gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
     GstMiniObject * object)
@@ -1078,6 +1180,7 @@ gst_multi_queue_loop (GstPad * pad)
   GstMiniObject *object = NULL;
   guint32 newid;
   GstFlowReturn result;
+  GstClockTime next_time;
 
   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
   mq = sq->mqueue;
@@ -1099,6 +1202,9 @@ gst_multi_queue_loop (GstPad * pad)
   object = gst_multi_queue_item_steal_object (item);
   gst_multi_queue_item_destroy (item);
 
+  /* Get running time of the item. Events will have GST_CLOCK_TIME_NONE */
+  next_time = get_running_time (&sq->src_segment, object, TRUE);
+
   GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
       sq->id, newid, sq->last_oldid);
 
@@ -1107,9 +1213,9 @@ gst_multi_queue_loop (GstPad * pad)
    * or it's the first loop, or we just passed the previous highid, 
    * we might need to wake some sleeping pad up, so there's extra work 
    * there too */
-  if (sq->srcresult == GST_FLOW_NOT_LINKED ||
-      (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) ||
-      sq->last_oldid > mq->highid) {
+  if (sq->srcresult == GST_FLOW_NOT_LINKED
+      || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
+      || sq->last_oldid > mq->highid) {
     GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
         gst_flow_get_name (sq->srcresult));
 
@@ -1124,6 +1230,7 @@ gst_multi_queue_loop (GstPad * pad)
 
     /* Update the nextid so other threads know when to wake us up */
     sq->nextid = newid;
+    sq->next_time = next_time;
 
     /* Update the oldid (the last ID we output) for highid tracking */
     if (sq->last_oldid != G_MAXUINT32)
@@ -1134,10 +1241,20 @@ gst_multi_queue_loop (GstPad * pad)
 
       /* Recompute the highid */
       compute_high_id (mq);
-      while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
-        GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
-            "newid %u and highid %u", sq->id, newid, mq->highid);
+      /* Recompute the high time */
+      compute_high_time (mq);
 
+      while (((mq->sync_by_running_time && next_time != GST_CLOCK_TIME_NONE &&
+                  (mq->high_time == GST_CLOCK_TIME_NONE
+                      || next_time >= mq->high_time))
+              || (!mq->sync_by_running_time && newid > mq->highid))
+          && sq->srcresult == GST_FLOW_NOT_LINKED) {
+
+        GST_DEBUG_OBJECT (mq,
+            "queue %d sleeping for not-linked wakeup with "
+            "newid %u, highid %u, next_time %" GST_TIME_FORMAT
+            ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
+            GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
 
         /* Wake up all non-linked pads before we sleep */
         wake_up_next_non_linked (mq);
@@ -1151,8 +1268,13 @@ gst_multi_queue_loop (GstPad * pad)
           goto out_flushing;
         }
 
+        /* Recompute the high time */
+        compute_high_time (mq);
+
         GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
-            "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
+            "wakeup with newid %u, highid %u, next_time %" GST_TIME_FORMAT
+            ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
+            GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
       }
 
       /* Re-compute the high_id in case someone else pushed */
@@ -1162,8 +1284,9 @@ gst_multi_queue_loop (GstPad * pad)
       /* Wake up all non-linked pads */
       wake_up_next_non_linked (mq);
     }
-    /* We're done waiting, we can clear the nextid */
+    /* We're done waiting, we can clear the nextid and nexttime */
     sq->nextid = 0;
+    sq->next_time = GST_CLOCK_TIME_NONE;
 
     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   }
@@ -1174,6 +1297,18 @@ gst_multi_queue_loop (GstPad * pad)
   GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
       gst_flow_get_name (sq->srcresult));
 
+  /* Update time stats */
+  next_time = get_running_time (&sq->src_segment, object, FALSE);
+  if (next_time != GST_CLOCK_TIME_NONE) {
+    if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
+      sq->last_time = next_time;
+    if (mq->high_time == GST_CLOCK_TIME_NONE || mq->high_time <= next_time) {
+      /* Wake up all non-linked pads now that we advanceed the high time */
+      mq->high_time = next_time;
+      wake_up_next_non_linked (mq);
+    }
+  }
+
   /* Try to push out the new object */
   result = gst_single_queue_push_one (mq, sq, object);
   sq->srcresult = result;
@@ -1187,6 +1322,7 @@ gst_multi_queue_loop (GstPad * pad)
       gst_flow_get_name (sq->srcresult));
 
   sq->last_oldid = newid;
+
   return;
 
 out_flushing:
@@ -1516,7 +1652,10 @@ wake_up_next_non_linked (GstMultiQueue * mq)
     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
 
     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
-      if (sq->nextid != 0 && sq->nextid <= mq->highid) {
+      if ((mq->sync_by_running_time && mq->high_time != GST_CLOCK_TIME_NONE
+              && sq->next_time != GST_CLOCK_TIME_NONE
+              && sq->next_time >= mq->high_time)
+          || (sq->nextid != 0 && sq->nextid <= mq->highid)) {
         GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
         g_cond_signal (sq->turn);
       }
@@ -1567,6 +1706,49 @@ compute_high_id (GstMultiQueue * mq)
       lowest);
 }
 
+/* WITH LOCK TAKEN */
+static void
+compute_high_time (GstMultiQueue * mq)
+{
+  /* The high-id is either the highest id among the linked pads, or if all
+   * pads are not-linked, it's the lowest not-linked pad */
+  GList *tmp;
+  GstClockTime highest = GST_CLOCK_TIME_NONE;
+  GstClockTime lowest = GST_CLOCK_TIME_NONE;
+
+  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
+    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+
+    GST_LOG_OBJECT (mq,
+        "inspecting sq:%d , next_time:%" GST_TIME_FORMAT ", last_time:%"
+        GST_TIME_FORMAT ", srcresult:%s", sq->id, GST_TIME_ARGS (sq->next_time),
+        GST_TIME_ARGS (sq->last_time), gst_flow_get_name (sq->srcresult));
+
+    if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+      /* No need to consider queues which are not waiting */
+      if (sq->next_time == GST_CLOCK_TIME_NONE) {
+        GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
+        continue;
+      }
+
+      if (lowest == GST_CLOCK_TIME_NONE || sq->next_time < lowest)
+        lowest = sq->next_time;
+    } else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
+      /* If we don't have a global highid, or the global highid is lower than
+       * this single queue's last outputted id, store the queue's one, 
+       * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
+      if (highest == GST_CLOCK_TIME_NONE || sq->last_time > highest)
+        highest = sq->last_time;
+    }
+  }
+
+  mq->high_time = highest;
+
+  GST_LOG_OBJECT (mq,
+      "High time is now : %" GST_TIME_FORMAT ", lowest non-linked %"
+      GST_TIME_FORMAT, GST_TIME_ARGS (mq->high_time), GST_TIME_ARGS (lowest));
+}
+
 #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
      ((q)->max_size.format) <= (value))
 
@@ -1770,6 +1952,8 @@ gst_single_queue_new (GstMultiQueue * mqueue, gint id)
 
   sq->nextid = 0;
   sq->oldid = 0;
+  sq->next_time = GST_CLOCK_TIME_NONE;
+  sq->last_time = GST_CLOCK_TIME_NONE;
   sq->turn = g_cond_new ();
 
   sq->sinktime = GST_CLOCK_TIME_NONE;
index b9c28cd..bb3d840 100644 (file)
@@ -50,6 +50,8 @@ typedef struct _GstMultiQueueClass GstMultiQueueClass;
 struct _GstMultiQueue {
   GstElement element;
 
+  gboolean sync_by_running_time;
+
   /* number of queues */
   guint        nbqueues;
 
@@ -65,6 +67,7 @@ struct _GstMultiQueue {
 
   guint32  counter;    /* incoming object counter, use atomic accesses */
   guint32  highid;     /* contains highest id of last outputted object */
+  GstClockTime high_time; /* highest start running time */
 
   GMutex * qlock;      /* Global queue lock (vs object lock or individual */
                        /* queues lock). Protects nbqueues, queues, global */