/* position of src/sink */
GstClockTime sinktime, srctime;
+ /* cached input value, used for interleave */
+ GstClockTimeDiff cached_sinktime;
/* TRUE if either position needs to be recalculated */
gboolean sink_tainted, src_tainted;
gboolean is_eos;
gboolean is_sparse;
gboolean flushing;
+ gboolean active;
/* Protected by global lock */
guint32 nextid; /* ID of the next object waiting to be pushed */
GCond query_handled;
gboolean last_query;
GstQuery *last_handled_query;
+
+ /* For interleave calculation */
+ GThread *thread;
};
#define DEFAULT_LOW_PERCENT 10
#define DEFAULT_HIGH_PERCENT 99
#define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
+#define DEFAULT_USE_INTERLEAVE FALSE
enum
{
PROP_LOW_PERCENT,
PROP_HIGH_PERCENT,
PROP_SYNC_BY_RUNNING_TIME,
+ PROP_USE_INTERLEAVE,
PROP_LAST
};
DEFAULT_SYNC_BY_RUNNING_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_USE_INTERLEAVE,
+ g_param_spec_boolean ("use-interleave", "Use interleave",
+ "Adjust time limits based on input interleave",
+ DEFAULT_USE_INTERLEAVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
gobject_class->finalize = gst_multi_queue_finalize;
gst_element_class_set_static_metadata (gstelement_class,
mqueue->high_percent = DEFAULT_HIGH_PERCENT;
mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
+ mqueue->use_interleave = DEFAULT_USE_INTERLEAVE;
mqueue->counter = 1;
mqueue->highid = -1;
case PROP_SYNC_BY_RUNNING_TIME:
mq->sync_by_running_time = g_value_get_boolean (value);
break;
+ case PROP_USE_INTERLEAVE:
+ mq->use_interleave = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_SYNC_BY_RUNNING_TIME:
g_value_set_boolean (value, mq->sync_by_running_time);
break;
+ case PROP_USE_INTERLEAVE:
+ g_value_set_boolean (value, mq->use_interleave);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
sq->last_oldid = G_MAXUINT32;
sq->next_time = GST_CLOCK_TIME_NONE;
sq->last_time = GST_CLOCK_TIME_NONE;
+ sq->cached_sinktime = GST_CLOCK_TIME_NONE;
gst_data_queue_set_flushing (sq->queue, FALSE);
/* Reset high time to be recomputed next */
g_mutex_unlock (&mq->buffering_post_lock);
}
+static void
+calculate_interleave (GstMultiQueue * mq)
+{
+ GstClockTimeDiff low, high;
+ GstClockTime interleave;
+ GList *tmp;
+
+ low = high = GST_CLOCK_STIME_NONE;
+ interleave = mq->interleave;
+ /* Go over all single queues and calculate lowest/highest value */
+ for (tmp = mq->queues; tmp; tmp = tmp->next) {
+ GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+ /* Ignore sparse streams for interleave calculation */
+ if (sq->is_sparse)
+ continue;
+ /* If a stream is not active yet (hasn't received any buffers), set
+ * a maximum interleave to allow it to receive more data */
+ if (!sq->active) {
+ GST_LOG_OBJECT (mq,
+ "queue %d is not active yet, forcing interleave to 5s", sq->id);
+ mq->interleave = 5 * GST_SECOND;
+ /* Update max-size time */
+ mq->max_size.time = mq->interleave;
+ SET_CHILD_PROPERTY (mq, time);
+ goto beach;
+ }
+ if (GST_CLOCK_STIME_IS_VALID (sq->cached_sinktime)) {
+ if (low == GST_CLOCK_STIME_NONE || sq->cached_sinktime < low)
+ low = sq->cached_sinktime;
+ if (high == GST_CLOCK_STIME_NONE || sq->cached_sinktime > high)
+ high = sq->cached_sinktime;
+ }
+ GST_LOG_OBJECT (mq,
+ "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
+ " high:%" GST_STIME_FORMAT, sq->id,
+ GST_STIME_ARGS (sq->cached_sinktime), GST_STIME_ARGS (low),
+ GST_STIME_ARGS (high));
+ }
+
+ if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) {
+ interleave = high - low;
+ /* Padding of interleave and minimum value */
+ /* FIXME : Make the minimum time interleave a property */
+ interleave = (150 * interleave / 100) + 250 * GST_MSECOND;
+
+ /* Update the stored interleave if:
+ * * No data has arrived yet (high == low)
+ * * Or it went higher
+ * * Or it went lower and we've gone past the previous interleave needed */
+ if (high == low || interleave > mq->interleave ||
+ ((mq->last_interleave_update + (2 * MIN (GST_SECOND,
+ mq->interleave)) < low)
+ && interleave < (mq->interleave * 3 / 4))) {
+ /* Update the interleave */
+ mq->interleave = interleave;
+ mq->last_interleave_update = high;
+ /* Update max-size time */
+ mq->max_size.time = mq->interleave;
+ SET_CHILD_PROPERTY (mq, time);
+ }
+ }
+
+beach:
+ GST_DEBUG_OBJECT (mq,
+ "low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%"
+ GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT
+ " last_interleave_update:%" GST_TIME_FORMAT, GST_TIME_ARGS (low),
+ GST_TIME_ARGS (high), GST_TIME_ARGS (interleave),
+ GST_TIME_ARGS (mq->interleave),
+ GST_TIME_ARGS (mq->last_interleave_update));
+}
+
+
/* calculate the diff between running time on the sink and src of the queue.
* This is the total amount of time in the queue.
* WITH LOCK TAKEN */
if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE)) {
/* if we have a time, we become untainted and use the time */
sq->sink_tainted = FALSE;
+ if (mq->use_interleave) {
+ sq->cached_sinktime = sink_time;
+ calculate_interleave (mq);
+ }
+ }
} else
sink_time = sq->sinktime;
src_time = sq->srctime =
gst_segment_to_running_time (segment, GST_FORMAT_TIME, position);
/* if we have a time, we become untainted and use the time */
- if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE))
+ if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE)) {
sq->src_tainted = FALSE;
+ }
} else
src_time = sq->srctime;
if (duration != GST_CLOCK_TIME_NONE)
timestamp += duration;
- GST_DEBUG_OBJECT (mq, "queue %d, position updated to %" GST_TIME_FORMAT,
- sq->id, GST_TIME_ARGS (timestamp));
+ GST_DEBUG_OBJECT (mq, "queue %d, %s position updated to %" GST_TIME_FORMAT,
+ sq->id, segment == &sq->sink_segment ? "sink" : "src",
+ GST_TIME_ARGS (timestamp));
segment->position = timestamp;
if (sq->is_eos)
goto was_eos;
+ sq->active = TRUE;
+
/* Get a unique incrementing id */
curid = g_atomic_int_add ((gint *) & mq->counter, 1);
- GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
- sq->id, buffer, curid);
+ timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
+ duration = GST_BUFFER_DURATION (buffer);
+
+ GST_LOG_OBJECT (mq,
+ "SingleQueue %d : about to enqueue buffer %p with id %d (pts:%"
+ GST_TIME_FORMAT " dts:%" GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT ")",
+ sq->id, buffer, curid, GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DTS (buffer)), GST_TIME_ARGS (duration));
item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
- timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
- duration = GST_BUFFER_DURATION (buffer);
+ /* Update interleave before pushing data into queue */
+ if (mq->use_interleave) {
+ GstClockTime val = timestamp;
+ GstClockTimeDiff dval;
+
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ if (val == GST_CLOCK_TIME_NONE)
+ val = sq->sink_segment.position;
+ if (duration != GST_CLOCK_TIME_NONE)
+ val += duration;
+
+ dval = my_segment_to_running_time (&sq->sink_segment, val);
+ if (GST_CLOCK_STIME_IS_VALID (dval)) {
+ sq->cached_sinktime = dval;
+ GST_DEBUG_OBJECT (mq,
+ "Queue %d cached sink time now %" G_GINT64_FORMAT " %"
+ GST_STIME_FORMAT, sq->id, sq->cached_sinktime,
+ GST_STIME_ARGS (sq->cached_sinktime));
+ calculate_interleave (mq);
+ }
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ }
if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
goto flushing;
sq->thread = g_thread_self ();
}
+ sq->thread = g_thread_self ();
+
/* Remove EOS flag */
sq->is_eos = FALSE;
}
goto done;
case GST_EVENT_SEGMENT:
+ sref = gst_event_ref (event);
+ break;
case GST_EVENT_GAP:
/* take ref because the queue will take ownership and we need the event
* afterwards to update the segment */
sref = gst_event_ref (event);
+ if (mq->use_interleave) {
+ GstClockTime val, dur;
+ gst_event_parse_gap (event, &val, &dur);
+ if (GST_CLOCK_TIME_IS_VALID (val)) {
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ if (GST_CLOCK_TIME_IS_VALID (dur))
+ val += dur;
+ val =
+ gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
+ val);
+ if (GST_CLOCK_TIME_IS_VALID (val)) {
+ sq->cached_sinktime = val;
+ calculate_interleave (mq);
+ }
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ }
+ }
break;
default:
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
case GST_EVENT_GAP:
+ sq->active = TRUE;
apply_gap (mq, sq, sref, &sq->sink_segment);
gst_event_unref (sref);
default:
sq->is_eos = FALSE;
sq->is_sparse = FALSE;
sq->flushing = FALSE;
+ sq->active = FALSE;
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);