/* flowreturn when srcpad is paused */
GstFlowReturn srcresult;
gboolean is_eos;
+ gboolean unexpected;
/* the queue of data we're keeping our hands on */
GQueue *queue;
GstQueueSize max_level; /* max. amount of data allowed in the queue */
gboolean use_buffering;
gboolean use_rate_estimate;
+ GstClockTime buffering_interval;
gint low_percent; /* low/high watermarks for buffering */
gint high_percent;
/* current buffering state */
gboolean is_buffering;
+ guint buffering_iteration;
/* for measuring input/output rates */
guint64 bytes_in;
* below the low threshold */
if (percent < queue->low_percent) {
queue->is_buffering = TRUE;
+ queue->buffering_iteration++;
post = TRUE;
}
}
GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
- queue->is_eos = FALSE;
if (queue->starting_segment != NULL)
gst_event_unref (queue->starting_segment);
queue->starting_segment = NULL;
queue->segment_event_received = TRUE;
queue->starting_segment = event;
}
+ /* a new segment allows us to accept more buffers if we got UNEXPECTED
+ * from downstream */
+ queue->unexpected = FALSE;
break;
default:
if (QUEUE_IS_USING_TEMP_FILE (queue))
GST_QUEUE_MUTEX_LOCK (queue);
gst_queue_locked_flush (queue);
queue->srcresult = GST_FLOW_OK;
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
/* reset rate counters */
reset_rate_timer (queue);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
if (GST_EVENT_IS_SERIALIZED (event)) {
/* serialized events go in the queue */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ /* refuse more events on EOS */
+ if (queue->is_eos)
+ goto out_eos;
gst_queue_locked_enqueue (queue, event);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* ERRORS */
out_flushing:
{
- GST_DEBUG_OBJECT (queue, "we are flushing");
+ GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
GST_QUEUE_MUTEX_UNLOCK (queue);
-
- gst_buffer_unref (event);
+ gst_event_unref (event);
+ return FALSE;
+ }
+out_eos:
+ {
+ GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ gst_event_unref (event);
return FALSE;
}
}
/* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ /* when we received EOS, we refuse more data */
+ if (queue->is_eos)
+ goto out_eos;
+ /* when we received unexpected from downstream, refuse more buffers */
+ if (queue->unexpected)
+ goto out_unexpected;
/* We make space available if we're "full" according to whatever
* the user defined as "full". */
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because task paused, reason: %s", gst_flow_get_name (ret));
GST_QUEUE_MUTEX_UNLOCK (queue);
-
gst_buffer_unref (buffer);
return ret;
}
+out_eos:
+ {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ gst_buffer_unref (buffer);
+
+ return GST_FLOW_UNEXPECTED;
+ }
+out_unexpected:
+ {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "exit because we received UNEXPECTED");
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ gst_buffer_unref (buffer);
+
+ return GST_FLOW_UNEXPECTED;
+ }
}
/* dequeue an item from the queue an push it downstream. This functions returns
if (data == NULL)
goto no_item;
+next:
if (GST_IS_BUFFER (data)) {
GstBuffer *buffer = GST_BUFFER_CAST (data);
/* need to check for srcresult here as well */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ if (result == GST_FLOW_UNEXPECTED) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "got UNEXPECTED from downstream");
+ /* stop pushing buffers, we dequeue all items until we see an item that we
+ * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
+ * queue we can push, we set a flag to make the sinkpad refuse more
+ * buffers with an UNEXPECTED return value until we receive something
+ * pushable again or we get flushed. */
+ while ((data = gst_queue_locked_dequeue (queue))) {
+ if (GST_IS_BUFFER (data)) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping UNEXPECTED buffer %p", data);
+ gst_buffer_unref (GST_BUFFER_CAST (data));
+ } else if (GST_IS_EVENT (data)) {
+ GstEvent *event = GST_EVENT_CAST (data);
+ GstEventType type = GST_EVENT_TYPE (event);
+
+ if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
+ /* we found a pushable item in the queue, push it out */
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "pushing pushable event %s after UNEXPECTED %p",
+ GST_EVENT_TYPE_NAME (event));
+ goto next;
+ }
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping UNEXPECTED event %p", event);
+ gst_event_unref (event);
+ }
+ }
+ /* no more items in the queue. Set the unexpected flag so that upstream
+ * make us refuse any more buffers on the sinkpad. Since we will still
+ * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
+ * task function does not shut down. */
+ queue->unexpected = TRUE;
+ result = GST_FLOW_OK;
+ }
} else if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
/* if we're EOS, return UNEXPECTED so that the task pauses. */
- if (type == GST_EVENT_EOS)
+ if (type == GST_EVENT_EOS) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "pushed EOS event %p, return UNEXPECTED", event);
result = GST_FLOW_UNEXPECTED;
+ }
}
return result;
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating push mode");
queue->srcresult = GST_FLOW_OK;
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
reset_rate_timer (queue);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating push mode");
queue->srcresult = GST_FLOW_OK;
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating pull mode");
queue->srcresult = GST_FLOW_OK;
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
result = TRUE;
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {