tee: Allocate one more buffer when multi-plexing
[platform/upstream/gstreamer.git] / plugins / elements / gstmultiqueue.c
index a9dbdbd..09b67a6 100644 (file)
@@ -168,7 +168,8 @@ struct _GstSingleQueue
   GstQuery *last_handled_query;
 
   /* For interleave calculation */
-  GThread *thread;
+  GThread *thread;              /* Streaming thread of SingleQueue */
+  GstClockTime interleave;      /* Calculated interleve within the thread */
 };
 
 
@@ -203,6 +204,8 @@ static void recheck_buffering_status (GstMultiQueue * mq);
 
 static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
 
+static void calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq);
+
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
     GST_PAD_SINK,
     GST_PAD_REQUEST,
@@ -249,6 +252,8 @@ enum
 #define DEFAULT_USE_INTERLEAVE FALSE
 #define DEFAULT_UNLINKED_CACHE_TIME 250 * GST_MSECOND
 
+#define DEFAULT_MINIMUM_INTERLEAVE (250 * GST_MSECOND)
+
 enum
 {
   PROP_0,
@@ -266,6 +271,7 @@ enum
   PROP_SYNC_BY_RUNNING_TIME,
   PROP_USE_INTERLEAVE,
   PROP_UNLINKED_CACHE_TIME,
+  PROP_MINIMUM_INTERLEAVE,
   PROP_LAST
 };
 
@@ -611,6 +617,12 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass)
           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
           G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_MINIMUM_INTERLEAVE,
+      g_param_spec_uint64 ("min-interleave-time", "Minimum interleave time",
+          "Minimum extra buffering for deinterleaving (size of the queues) when use-interleave=true",
+          0, G_MAXUINT64, DEFAULT_MINIMUM_INTERLEAVE,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   gobject_class->finalize = gst_multi_queue_finalize;
 
@@ -648,6 +660,7 @@ gst_multi_queue_init (GstMultiQueue * mqueue)
 
   mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
   mqueue->use_interleave = DEFAULT_USE_INTERLEAVE;
+  mqueue->min_interleave_time = DEFAULT_MINIMUM_INTERLEAVE;
   mqueue->unlinked_cache_time = DEFAULT_UNLINKED_CACHE_TIME;
 
   mqueue->counter = 1;
@@ -790,6 +803,13 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       gst_multi_queue_post_buffering (mq);
       break;
+    case PROP_MINIMUM_INTERLEAVE:
+      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+      mq->min_interleave_time = g_value_get_uint64 (value);
+      if (mq->use_interleave)
+        calculate_interleave (mq, NULL);
+      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -849,6 +869,9 @@ gst_multi_queue_get_property (GObject * object, guint prop_id,
     case PROP_UNLINKED_CACHE_TIME:
       g_value_set_uint64 (value, mq->unlinked_cache_time);
       break;
+    case PROP_MINIMUM_INTERLEAVE:
+      g_value_set_uint64 (value, mq->min_interleave_time);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1237,49 +1260,60 @@ recheck_buffering_status (GstMultiQueue * mq)
 }
 
 static void
-calculate_interleave (GstMultiQueue * mq)
+calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
 {
   GstClockTimeDiff low, high;
-  GstClockTime interleave;
+  GstClockTime interleave, other_interleave = 0;
   GList *tmp;
 
   low = high = GST_CLOCK_STIME_NONE;
   interleave = mq->interleave;
   /* Go over all single queues and calculate lowest/highest value */
   for (tmp = mq->queues; tmp; tmp = tmp->next) {
-    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+    GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
     /* Ignore sparse streams for interleave calculation */
-    if (sq->is_sparse)
+    if (oq->is_sparse)
       continue;
     /* If a stream is not active yet (hasn't received any buffers), set
      * a maximum interleave to allow it to receive more data */
-    if (!sq->active) {
+    if (!oq->active) {
       GST_LOG_OBJECT (mq,
-          "queue %d is not active yet, forcing interleave to 5s", sq->id);
+          "queue %d is not active yet, forcing interleave to 5s", oq->id);
       mq->interleave = 5 * GST_SECOND;
       /* Update max-size time */
       mq->max_size.time = mq->interleave;
       SET_CHILD_PROPERTY (mq, time);
       goto beach;
     }
-    if (GST_CLOCK_STIME_IS_VALID (sq->cached_sinktime)) {
-      if (low == GST_CLOCK_STIME_NONE || sq->cached_sinktime < low)
-        low = sq->cached_sinktime;
-      if (high == GST_CLOCK_STIME_NONE || sq->cached_sinktime > high)
-        high = sq->cached_sinktime;
+
+    /* Calculate within each streaming thread */
+    if (sq && sq->thread != oq->thread) {
+      if (oq->interleave > other_interleave)
+        other_interleave = oq->interleave;
+      continue;
+    }
+
+    if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime)) {
+      if (low == GST_CLOCK_STIME_NONE || oq->cached_sinktime < low)
+        low = oq->cached_sinktime;
+      if (high == GST_CLOCK_STIME_NONE || oq->cached_sinktime > high)
+        high = oq->cached_sinktime;
     }
     GST_LOG_OBJECT (mq,
         "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
-        " high:%" GST_STIME_FORMAT, sq->id,
-        GST_STIME_ARGS (sq->cached_sinktime), GST_STIME_ARGS (low),
+        " high:%" GST_STIME_FORMAT, oq->id,
+        GST_STIME_ARGS (oq->cached_sinktime), GST_STIME_ARGS (low),
         GST_STIME_ARGS (high));
   }
 
   if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) {
     interleave = high - low;
     /* Padding of interleave and minimum value */
-    /* FIXME : Make the minimum time interleave a property */
-    interleave = (150 * interleave / 100) + 250 * GST_MSECOND;
+    interleave = (150 * interleave / 100) + mq->min_interleave_time;
+    if (sq)
+      sq->interleave = interleave;
+
+    interleave = MAX (interleave, other_interleave);
 
     /* Update the stored interleave if:
      * * No data has arrived yet (high == low)
@@ -1338,7 +1372,7 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
       sq->sink_tainted = FALSE;
       if (mq->use_interleave) {
         sq->cached_sinktime = sink_time;
-        calculate_interleave (mq);
+        calculate_interleave (mq, sq);
       }
     }
   } else
@@ -1597,6 +1631,11 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
         if (G_UNLIKELY (*allow_drop))
           *allow_drop = FALSE;
         break;
+      case GST_EVENT_STREAM_START:
+        result = GST_FLOW_OK;
+        if (G_UNLIKELY (*allow_drop))
+          *allow_drop = FALSE;
+        break;
       case GST_EVENT_SEGMENT:
         apply_segment (mq, sq, event, &sq->src_segment);
         /* Applying the segment may have made the queue non-full again, unblock it if needed */
@@ -2080,7 +2119,7 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
           "Queue %d cached sink time now %" G_GINT64_FORMAT " %"
           GST_STIME_FORMAT, sq->id, sq->cached_sinktime,
           GST_STIME_ARGS (sq->cached_sinktime));
-      calculate_interleave (mq);
+      calculate_interleave (mq, sq);
     }
     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   }
@@ -2239,7 +2278,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
           stime = my_segment_to_running_time (&sq->sink_segment, val);
           if (GST_CLOCK_STIME_IS_VALID (stime)) {
             sq->cached_sinktime = stime;
-            calculate_interleave (mq);
+            calculate_interleave (mq, sq);
           }
           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
         }