From f49cb86d169e06489c741929827000913573e085 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 14 Sep 2007 20:24:22 +0000 Subject: [PATCH] plugins/elements/gstqueue.*: When downstream returns UNEXPECTED from pushing a buffer, don't try to push more buffers... Original commit message from CVS: * plugins/elements/gstqueue.c: (gst_queue_locked_enqueue), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_push_one), (gst_queue_handle_src_query), (gst_queue_sink_activate_push), (gst_queue_src_activate_push): * plugins/elements/gstqueue.h: When downstream returns UNEXPECTED from pushing a buffer, don't try to push more buffers but allow pushing of EOS and NEWSEGMENT. Add some more debug info here and there. Fixes #476514. --- ChangeLog | 11 ++++++ plugins/elements/gstqueue.c | 92 ++++++++++++++++++++++++++++++++++++++++++++- plugins/elements/gstqueue.h | 2 + 3 files changed, 103 insertions(+), 2 deletions(-) diff --git a/ChangeLog b/ChangeLog index 49a3e35..58c66a3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,16 @@ 2007-09-14 Wim Taymans + * plugins/elements/gstqueue.c: (gst_queue_locked_enqueue), + (gst_queue_handle_sink_event), (gst_queue_chain), + (gst_queue_push_one), (gst_queue_handle_src_query), + (gst_queue_sink_activate_push), (gst_queue_src_activate_push): + * plugins/elements/gstqueue.h: + When downstream returns UNEXPECTED from pushing a buffer, don't try to + push more buffers but allow pushing of EOS and NEWSEGMENT. + Add some more debug info here and there. Fixes #476514. + +2007-09-14 Wim Taymans + * libs/gst/base/gstbasesink.c: (gst_base_sink_init), (gst_base_sink_preroll_queue_flush), (gst_base_sink_commit_state), (gst_base_sink_wait_preroll), (gst_base_sink_needs_preroll), diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 61481e4..6c313ed 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -616,7 +616,7 @@ gst_queue_locked_flush (GstQueue * queue) GST_QUEUE_SIGNAL_DEL (queue); } -/* enqueue an item an update the level stats */ +/* enqueue an item an update the level stats, with QUEUE_LOCK */ static void gst_queue_locked_enqueue (GstQueue * queue, gpointer item) { @@ -636,9 +636,15 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item) /* Zero the thresholds, this makes sure the queue is completely * filled and we can read all data from the queue. */ GST_QUEUE_CLEAR_LEVEL (queue->min_threshold); + /* mark the queue as EOS. This prevents us from accepting more data. */ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream"); + queue->eos = TRUE; break; case GST_EVENT_NEWSEGMENT: apply_segment (queue, event, &queue->sink_segment); + /* a new segment allows us to accept more buffers if we got UNEXPECTED + * from downstream */ + queue->unexpected = FALSE; break; default: break; @@ -747,6 +753,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) GST_QUEUE_MUTEX_LOCK (queue); gst_queue_locked_flush (queue); queue->srcresult = GST_FLOW_OK; + queue->eos = FALSE; + queue->unexpected = FALSE; if (gst_pad_is_linked (queue->srcpad)) { gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, queue->srcpad); @@ -762,6 +770,9 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) if (GST_EVENT_IS_SERIALIZED (event)) { /* serialized events go in the queue */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + /* refuse more events on EOS */ + if (queue->eos) + goto out_eos; gst_queue_locked_enqueue (queue, event); GST_QUEUE_MUTEX_UNLOCK (queue); } else { @@ -776,6 +787,15 @@ done: /* ERRORS */ out_flushing: { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "refusing event, we are flushing"); + GST_QUEUE_MUTEX_UNLOCK (queue); + gst_buffer_unref (event); + return FALSE; + } +out_eos: + { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS"); GST_QUEUE_MUTEX_UNLOCK (queue); gst_buffer_unref (event); return FALSE; @@ -815,6 +835,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + /* when we received EOS, we refuse any more data */ + if (queue->eos) + goto out_eos; + if (queue->unexpected) + goto out_unexpected; timestamp = GST_BUFFER_TIMESTAMP (buffer); duration = GST_BUFFER_DURATION (buffer); @@ -910,6 +935,25 @@ out_flushing: return ret; } +out_eos: + { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); + GST_QUEUE_MUTEX_UNLOCK (queue); + + gst_buffer_unref (buffer); + + return GST_FLOW_UNEXPECTED; + } +out_unexpected: + { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "exit because we received UNEXPECTED"); + GST_QUEUE_MUTEX_UNLOCK (queue); + + gst_buffer_unref (buffer); + + return GST_FLOW_UNEXPECTED; + } } /* dequeue an item from the queue an push it downstream. This functions returns @@ -924,6 +968,7 @@ gst_queue_push_one (GstQueue * queue) if (data == NULL) goto no_item; +next: if (GST_IS_BUFFER (data)) { GstBuffer *buffer = GST_BUFFER_CAST (data); @@ -933,6 +978,42 @@ gst_queue_push_one (GstQueue * queue) /* need to check for srcresult here as well */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + + if (result == GST_FLOW_UNEXPECTED) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "got UNEXPECTED from downstream"); + /* stop pushing buffers, we dequeue all items until we see an item that we + * can push again, which is EOS or NEWSEGMENT. If there is nothing in the + * queue we can push, we set a flag to make the sinkpad refuse more + * buffers with an UNEXPECTED return value. */ + while ((data = gst_queue_locked_dequeue (queue))) { + if (GST_IS_BUFFER (data)) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping UNEXPECTED buffer %p", data); + gst_buffer_unref (GST_BUFFER_CAST (data)); + } else if (GST_IS_EVENT (data)) { + GstEvent *event = GST_EVENT_CAST (data); + GstEventType type = GST_EVENT_TYPE (event); + + if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) { + /* we found a pushable item in the queue, push it out */ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "pushing pushable event %s after UNEXPECTED %p", + GST_EVENT_TYPE_NAME (event)); + goto next; + } + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping UNEXPECTED event %p", event); + gst_event_unref (event); + } + } + /* no more items in the queue. Set the unexpected flag so that upstream + * make us refuse any more buffers on the sinkpad. Since we will still + * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the + * task function does not shut down. */ + queue->unexpected = TRUE; + result = GST_FLOW_OK; + } } else if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); @@ -943,8 +1024,11 @@ gst_queue_push_one (GstQueue * queue) GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); /* if we're EOS, return UNEXPECTED so that the task pauses. */ - if (type == GST_EVENT_EOS) + if (type == GST_EVENT_EOS) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "pushed EOS event %p, return UNEXPECTED", event); result = GST_FLOW_UNEXPECTED; + } } return result; @@ -1106,6 +1190,8 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) if (active) { GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_OK; + queue->eos = FALSE; + queue->unexpected = FALSE; GST_QUEUE_MUTEX_UNLOCK (queue); } else { /* step 1, unblock chain function */ @@ -1131,6 +1217,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) if (active) { GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_OK; + queue->eos = FALSE; + queue->unexpected = FALSE; /* we do not start the task yet if the pad is not connected */ if (gst_pad_is_linked (pad)) result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 8ed5af8..aaff87c 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -87,6 +87,8 @@ struct _GstQueue { /* flowreturn when srcpad is paused */ GstFlowReturn srcresult; + gboolean unexpected; + gboolean eos; /* the queue of data we're keeping our grubby hands on */ GQueue *queue; -- 2.7.4