sq->last_query = FALSE;
g_cond_signal (&sq->query_handled);
}
+ mqueue->interleave_incomplete = FALSE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
break;
}
{
GstClockTimeDiff low, high;
GstClockTime interleave, other_interleave = 0;
+ gboolean some_inactive = FALSE;
GList *tmp;
low = high = GST_CLOCK_STIME_NONE;
/* Ignore sparse streams for interleave calculation */
if (oq->is_sparse)
continue;
- /* If a stream is not active yet (hasn't received any buffers), set
- * a maximum interleave to allow it to receive more data */
+
+ /* If some streams aren't active yet (haven't received any buffers), we will
+ * grow interleave accordingly */
if (!oq->active) {
- GST_LOG_OBJECT (mq,
- "queue %d is not active yet, forcing interleave to 5s", oq->id);
- mq->interleave = 5 * GST_SECOND;
- /* Update max-size time */
- mq->max_size.time = mq->interleave;
- SET_CHILD_PROPERTY (mq, time);
- goto beach;
+ some_inactive = TRUE;
+ continue;
}
/* Calculate within each streaming thread */
continue;
}
- if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime)) {
+ /* If the stream isn't EOS, update the low/high input value */
+ if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime) && !oq->is_eos) {
if (low == GST_CLOCK_STIME_NONE || oq->cached_sinktime < low)
low = oq->cached_sinktime;
if (high == GST_CLOCK_STIME_NONE || oq->cached_sinktime > high)
high = oq->cached_sinktime;
+
+ /* If the input is before the segment start, consider as inactive to allow
+ * the interleave to grow until *all* streams have data within the segment.
+ *
+ * The reason for this is that there is no requirements for data before
+ * the segment start to be "aligned" and therefore interleave calculation
+ * can't reliably be done. For example a demuxer could provide video data
+ * from the previous keyframe but audio only from just before the segment
+ * start */
+ if (oq->cached_sinktime < 0)
+ some_inactive = TRUE;
}
GST_LOG_OBJECT (mq,
"queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
}
if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) {
+ gboolean do_update = high == low;
interleave = high - low;
/* Padding of interleave and minimum value */
interleave = (150 * interleave / 100) + mq->min_interleave_time;
interleave = MAX (interleave, other_interleave);
+ /* Progressively grow up the interleave up to 5s if some streams were inactive */
+ if (some_inactive && interleave <= mq->interleave) {
+ interleave = MIN (5 * GST_SECOND, mq->interleave + 500 * GST_MSECOND);
+ do_update = TRUE;
+ }
+
+ /* We force the interleave update if:
+ * * the interleave was previously set while some streams were not active
+ * yet but they now all are
+ * * OR the interleave was previously based on all streams being active
+ * whereas some now aren't
+ */
+ if (mq->interleave_incomplete != some_inactive)
+ do_update = TRUE;
+
+ mq->interleave_incomplete = some_inactive;
+
/* Update the stored interleave if:
* * No data has arrived yet (high == low)
* * Or it went higher
* * Or it went lower and we've gone past the previous interleave needed */
- if (high == low || interleave > mq->interleave ||
+ if (do_update || interleave > mq->interleave ||
((mq->last_interleave_update + (2 * MIN (GST_SECOND,
mq->interleave)) < low)
&& interleave < (mq->interleave * 3 / 4))) {
}
}
-beach:
GST_DEBUG_OBJECT (mq,
"low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%"
GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT
GstFlowReturn result;
GstClockTimeDiff next_time;
gboolean is_buffer;
+ gboolean is_query = FALSE;
gboolean do_update_buffering = FALSE;
gboolean dropping = FALSE;
GstPad *srcpad = NULL;
srcpad = g_weak_ref_get (&sq->srcpad);
if (!mq || !srcpad)
- goto out_flushing;
+ goto done;
next:
GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
item = (GstMultiQueueItem *) sitem;
newid = item->posid;
+ is_query = item->is_query;
+
/* steal the object and destroy the item */
object = gst_multi_queue_item_steal_object (item);
gst_multi_queue_item_destroy (item);
out_flushing:
{
- if (object && !GST_IS_QUERY (object))
+ if (object && !is_query)
gst_mini_object_unref (object);
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq = g_weak_ref_get (&sq->mqueue);
if (!mq)
- goto flushing;
+ goto done;
/* if eos, we are always full, so avoid hanging incoming indefinitely */
if (sq->is_eos)