GstDataQueueSize max_size, extra_size;
GstClockTime cur_time;
gboolean is_eos;
+ gboolean is_segment_done;
gboolean is_sparse;
gboolean flushing;
gboolean active;
GstQuery *last_handled_query;
/* For interleave calculation */
- GThread *thread;
+ GThread *thread; /* Streaming thread of SingleQueue */
+ GstClockTime interleave; /* Calculated interleve within the thread */
};
static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
-static void calculate_interleave (GstMultiQueue * mq);
-
+static void calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq);
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
GST_PAD_SINK,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
/* PROPERTIES */
+
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
"Max. amount of data in the queue (bytes, 0=disable)",
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->min_interleave_time = g_value_get_uint64 (value);
if (mq->use_interleave)
- calculate_interleave (mq);
+ calculate_interleave (mq, NULL);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
default:
}
static gboolean
-gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
- gboolean full)
+gst_single_queue_start (GstMultiQueue * mq, GstSingleQueue * sq)
+{
+ GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
+ return gst_pad_start_task (sq->srcpad,
+ (GstTaskFunction) gst_multi_queue_loop, sq->srcpad, NULL);
+}
+
+static gboolean
+gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq)
{
gboolean result;
+ GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
+ result = gst_pad_pause_task (sq->srcpad);
+ sq->sink_tainted = sq->src_tainted = TRUE;
+ return result;
+}
+
+static gboolean
+gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq)
+{
+ gboolean result;
+
+ GST_LOG_OBJECT (mq, "SingleQueue %d : stopping task", sq->id);
+ result = gst_pad_stop_task (sq->srcpad);
+ sq->sink_tainted = sq->src_tainted = TRUE;
+ return result;
+}
+
+static void
+gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
+ gboolean full)
+{
GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
sq->id);
sq->last_query = FALSE;
g_cond_signal (&sq->query_handled);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
-
- GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
- result = gst_pad_pause_task (sq->srcpad);
- sq->sink_tainted = sq->src_tainted = TRUE;
} else {
gst_single_queue_flush_queue (sq, full);
sq->cur_time = 0;
sq->max_size.visible = mq->max_size.visible;
sq->is_eos = FALSE;
+ sq->is_segment_done = FALSE;
sq->nextid = 0;
sq->oldid = 0;
sq->last_oldid = G_MAXUINT32;
sq->flushing = FALSE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
-
- GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
- result =
- gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
- sq->srcpad, NULL);
}
- return result;
}
/* WITH LOCK TAKEN */
size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
/* get bytes and time buffer levels and take the max */
- if (sq->is_eos || sq->srcresult == GST_FLOW_NOT_LINKED || sq->is_sparse) {
+ if (sq->is_eos || sq->is_segment_done || sq->srcresult == GST_FLOW_NOT_LINKED
+ || sq->is_sparse) {
buffering_level = MAX_BUFFERING_LEVEL;
} else {
buffering_level = 0;
update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
{
gint buffering_level, percent;
+
/* nothing to dowhen we are not in buffering mode */
if (!mq->use_buffering)
return;
}
static void
-calculate_interleave (GstMultiQueue * mq)
+calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
{
GstClockTimeDiff low, high;
- GstClockTime interleave;
+ GstClockTime interleave, other_interleave = 0;
GList *tmp;
low = high = GST_CLOCK_STIME_NONE;
interleave = mq->interleave;
/* Go over all single queues and calculate lowest/highest value */
for (tmp = mq->queues; tmp; tmp = tmp->next) {
- GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+ GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
/* Ignore sparse streams for interleave calculation */
- if (sq->is_sparse)
+ if (oq->is_sparse)
continue;
/* If a stream is not active yet (hasn't received any buffers), set
* a maximum interleave to allow it to receive more data */
- if (!sq->active) {
+ if (!oq->active) {
GST_LOG_OBJECT (mq,
- "queue %d is not active yet, forcing interleave to 5s", sq->id);
+ "queue %d is not active yet, forcing interleave to 5s", oq->id);
mq->interleave = 5 * GST_SECOND;
/* Update max-size time */
mq->max_size.time = mq->interleave;
SET_CHILD_PROPERTY (mq, time);
goto beach;
}
- if (GST_CLOCK_STIME_IS_VALID (sq->cached_sinktime)) {
- if (low == GST_CLOCK_STIME_NONE || sq->cached_sinktime < low)
- low = sq->cached_sinktime;
- if (high == GST_CLOCK_STIME_NONE || sq->cached_sinktime > high)
- high = sq->cached_sinktime;
+
+ /* Calculate within each streaming thread */
+ if (sq && sq->thread != oq->thread) {
+ if (oq->interleave > other_interleave)
+ other_interleave = oq->interleave;
+ continue;
+ }
+
+ if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime)) {
+ if (low == GST_CLOCK_STIME_NONE || oq->cached_sinktime < low)
+ low = oq->cached_sinktime;
+ if (high == GST_CLOCK_STIME_NONE || oq->cached_sinktime > high)
+ high = oq->cached_sinktime;
}
GST_LOG_OBJECT (mq,
"queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
- " high:%" GST_STIME_FORMAT, sq->id,
- GST_STIME_ARGS (sq->cached_sinktime), GST_STIME_ARGS (low),
+ " high:%" GST_STIME_FORMAT, oq->id,
+ GST_STIME_ARGS (oq->cached_sinktime), GST_STIME_ARGS (low),
GST_STIME_ARGS (high));
}
interleave = high - low;
/* Padding of interleave and minimum value */
interleave = (150 * interleave / 100) + mq->min_interleave_time;
+ if (sq)
+ sq->interleave = interleave;
+
+ interleave = MAX (interleave, other_interleave);
/* Update the stored interleave if:
* * No data has arrived yet (high == low)
sq->sink_tainted = FALSE;
if (mq->use_interleave) {
sq->cached_sinktime = sink_time;
- calculate_interleave (mq);
+ calculate_interleave (mq, sq);
}
}
} else
if (GST_CLOCK_TIME_IS_VALID (btime)) {
if (end && GST_BUFFER_DURATION_IS_VALID (buf))
btime += GST_BUFFER_DURATION (buf);
- if (btime > segment->stop)
- btime = segment->stop;
time = my_segment_to_running_time (segment, btime);
}
} else if (GST_IS_BUFFER_LIST (object)) {
if (GST_CLOCK_TIME_IS_VALID (btime)) {
if (end && GST_BUFFER_DURATION_IS_VALID (buf))
btime += GST_BUFFER_DURATION (buf);
- if (btime > segment->stop)
- btime = segment->stop;
time = my_segment_to_running_time (segment, btime);
if (!end)
goto done;
event = GST_EVENT_CAST (object);
switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEGMENT_DONE:
+ *allow_drop = FALSE;
+ break;
case GST_EVENT_EOS:
result = GST_FLOW_EOS;
if (G_UNLIKELY (*allow_drop))
*allow_drop = FALSE;
break;
+ case GST_EVENT_STREAM_START:
+ result = GST_FLOW_OK;
+ if (G_UNLIKELY (*allow_drop))
+ *allow_drop = FALSE;
+ break;
case GST_EVENT_SEGMENT:
apply_segment (mq, sq, event, &sq->src_segment);
/* Applying the segment may have made the queue non-full again, unblock it if needed */
* has returned an error flow return. After EOS there
* will be no further buffer which could propagate the
* error upstream */
- if (sq->is_eos && sq->srcresult < GST_FLOW_EOS) {
+ if ((sq->is_eos || sq->is_segment_done) && sq->srcresult < GST_FLOW_EOS) {
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_ELEMENT_FLOW_ERROR (mq, sq->srcresult);
} else {
"Queue %d cached sink time now %" G_GINT64_FORMAT " %"
GST_STIME_FORMAT, sq->id, sq->cached_sinktime,
GST_STIME_ARGS (sq->cached_sinktime));
- calculate_interleave (mq);
+ calculate_interleave (mq, sq);
}
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
res = gst_pad_push_event (sq->srcpad, event);
gst_single_queue_flush (mq, sq, TRUE, FALSE);
+ gst_single_queue_pause (mq, sq);
goto done;
case GST_EVENT_FLUSH_STOP:
res = gst_pad_push_event (sq->srcpad, event);
gst_single_queue_flush (mq, sq, FALSE, FALSE);
+ gst_single_queue_start (mq, sq);
#ifdef TIZEN_FEATURE_MQ_MODIFICATION
/* need to reset the buffering data after seeking */
#endif
goto done;
case GST_EVENT_SEGMENT:
+ sq->is_segment_done = FALSE;
sref = gst_event_ref (event);
break;
case GST_EVENT_GAP:
stime = my_segment_to_running_time (&sq->sink_segment, val);
if (GST_CLOCK_STIME_IS_VALID (stime)) {
sq->cached_sinktime = stime;
- calculate_interleave (mq);
+ calculate_interleave (mq, sq);
}
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
/* mark EOS when we received one, we must do that after putting the
* buffer in the queue because EOS marks the buffer as filled. */
switch (type) {
+ case GST_EVENT_SEGMENT_DONE:
+ sq->is_segment_done = TRUE;
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ update_buffering (mq, sq);
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ single_queue_overrun_cb (sq->queue, sq);
+ gst_multi_queue_post_buffering (mq);
+ break;
case GST_EVENT_EOS:
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
sq->is_eos = TRUE;
switch (mode) {
case GST_PAD_MODE_PUSH:
if (active) {
- result = gst_single_queue_flush (mq, sq, FALSE, TRUE);
+ gst_single_queue_flush (mq, sq, FALSE, TRUE);
+ result = parent ? gst_single_queue_start (mq, sq) : TRUE;
} else {
- result = gst_single_queue_flush (mq, sq, TRUE, TRUE);
- /* make sure streaming finishes */
- result |= gst_pad_stop_task (pad);
+ gst_single_queue_flush (mq, sq, TRUE, TRUE);
+ result = gst_single_queue_stop (mq, sq);
}
break;
default:
sq->max_size.bytes, sq->cur_time, sq->max_size.time);
/* we are always filled on EOS */
- if (sq->is_eos)
+ if (sq->is_eos || sq->is_segment_done)
return TRUE;
/* we never go past the max visible items unless we are in buffering mode */
}
gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
+ if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
+ gst_single_queue_start (mqueue, sq);
+ }
g_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
- if (sink_time >= src_time)
+ if (GST_CLOCK_STIME_IS_VALID (src_time)
+ && GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time)
queue->cur_level.time = sink_time - src_time;
else
queue->cur_level.time = 0;
queue = GST_QUEUE (parent);
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+ GST_EVENT_TYPE_NAME (event));
+
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
STATUS (queue, pad, "received flush start event");
if (GST_EVENT_IS_SERIALIZED (event)) {
/* serialized events go in the queue */
GST_QUEUE_MUTEX_LOCK (queue);
+
+ /* STREAM_START and SEGMENT reset the EOS status of a
+ * pad. Change the cached sinkpad flow result accordingly */
+ if (queue->srcresult == GST_FLOW_EOS
+ && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+ || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+ queue->srcresult = GST_FLOW_OK;
+
if (queue->srcresult != GST_FLOW_OK) {
/* Errors in sticky event pushing are no problem and ignored here
* as they will cause more meaningful errors during data flow.
goto out_flow_error;
}
}
- /* refuse more events on EOS */
- if (queue->eos)
- goto out_eos;
+
+ /* refuse more events on EOS unless they unset the EOS status */
+ if (queue->eos) {
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_STREAM_START:
+ case GST_EVENT_SEGMENT:
+ /* Restart the loop */
+ if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+ queue->srcresult = GST_FLOW_OK;
+ queue->eos = FALSE;
+ queue->unexpected = FALSE;
+ gst_pad_start_task (queue->srcpad,
+ (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
+ } else {
+ queue->eos = FALSE;
+ queue->unexpected = FALSE;
+ }
+
+ break;
+ default:
+ goto out_eos;
+ }
+ }
+
gst_queue_locked_enqueue_event (queue, event);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
- if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+ if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+ || type == GST_EVENT_STREAM_START) {
/* we found a pushable item in the queue, push it out */
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pushing pushable event %s after EOS",
#define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
- if (queue->is_eos) {
- /* on EOS we are always 100% full, we set the var here so that it we can
- * reuse the logic below to stop buffering */
+ if (queue->is_eos || queue->srcresult == GST_FLOW_NOT_LINKED) {
+ /* on EOS and NOT_LINKED we are always 100% full, we set the var
+ * here so that we can reuse the logic below to stop buffering */
buflevel = MAX_BUFFERING_LEVEL;
- GST_LOG_OBJECT (queue, "we are EOS");
+ GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
} else {
GST_LOG_OBJECT (queue,
"Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
queue = GST_QUEUE2 (parent);
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+ GST_EVENT_TYPE_NAME (event));
+
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
/* serialized events go in the queue */
+
+ /* STREAM_START and SEGMENT reset the EOS status of a
+ * pad. Change the cached sinkpad flow result accordingly */
+ if (queue->sinkresult == GST_FLOW_EOS
+ && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+ || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+ queue->sinkresult = GST_FLOW_OK;
+
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
if (queue->srcresult != GST_FLOW_OK) {
/* Errors in sticky event pushing are no problem and ignored here
goto out_flow_error;
}
}
- /* refuse more events on EOS */
- if (queue->is_eos)
- goto out_eos;
+
+ /* refuse more events on EOS unless they unset the EOS status */
+ if (queue->is_eos) {
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_STREAM_START:
+ case GST_EVENT_SEGMENT:
+ /* Restart the loop */
+ if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+ queue->srcresult = GST_FLOW_OK;
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
+ queue->seeking = FALSE;
+ queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
+ /* reset rate counters */
+ reset_rate_timer (queue);
+ gst_pad_start_task (queue->srcpad,
+ (GstTaskFunction) gst_queue2_loop, queue->srcpad, NULL);
+ } else {
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
+ queue->seeking = FALSE;
+ queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
+ }
+
+ break;
+ default:
+ goto out_eos;
+ }
+ }
+
gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_queue2_post_buffering (queue);
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
- if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+ if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+ || type == GST_EVENT_STREAM_START) {
/* we found a pushable item in the queue, push it out */
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
static GstFlowReturn
gst_queue2_push_one (GstQueue2 * queue)
{
- GstFlowReturn result = queue->srcresult;
+ GstFlowReturn result;
GstMiniObject *data;
GstQueue2ItemType item_type;
goto no_item;
next:
+ result = queue->srcresult;
STATUS (queue, queue->srcpad, "We have something dequeud");
g_atomic_int_set (&queue->downstream_may_block,
item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
GST_QUEUE2_MUTEX_UNLOCK (queue);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pause task, reason: %s", gst_flow_get_name (queue->srcresult));
+ /* Recalculate buffering levels before stopping since the source flow
+ * might cause a different buffering level (like NOT_LINKED making
+ * the queue appear as full) */
+ if (queue->use_buffering)
+ update_buffering (queue);
+ gst_queue2_post_buffering (queue);
/* let app know about us giving up if upstream is not expected to do so */
/* EOS is already taken care of elsewhere */
if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {