From 6f042f13aeb4cc2c8b92d81f4f5899d250fce5e3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 27 May 2013 15:59:07 +0200 Subject: [PATCH] queue: Fix handling of serialized queries During FLUSH_START the query needs to be unblocked already, otherwise it can lead to deadlocks if the FLUSH_START is the result of something done from the streaming thread of the srcpad (the queue will never be emptied!). --- plugins/elements/gstqueue.c | 92 +++++++++++++++++++++++++++------------------ plugins/elements/gstqueue.h | 1 + 2 files changed, 57 insertions(+), 36 deletions(-) diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 8348265..aae51aa 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -219,6 +219,13 @@ static gboolean gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, static gboolean gst_queue_is_empty (GstQueue * queue); static gboolean gst_queue_is_filled (GstQueue * queue); + +typedef struct +{ + gboolean is_query; + GstMiniObject *item; +} GstQueueItem; + #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ()) static GType @@ -437,6 +444,7 @@ gst_queue_init (GstQueue * queue) g_mutex_init (&queue->qlock); g_cond_init (&queue->item_add); g_cond_init (&queue->item_del); + g_cond_init (&queue->query_handled); queue->queue = gst_queue_array_new (DEFAULT_MAX_SIZE_BUFFERS * 3 / 2); @@ -456,22 +464,23 @@ gst_queue_init (GstQueue * queue) static void gst_queue_finalize (GObject * object) { - GstMiniObject *data; GstQueue *queue = GST_QUEUE (object); GST_DEBUG_OBJECT (queue, "finalizing queue"); while (!gst_queue_array_is_empty (queue->queue)) { - data = gst_queue_array_pop_head (queue->queue); + GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue); /* FIXME: if it's a query, shouldn't we unref that too? */ - if (!GST_IS_QUERY (data)) - gst_mini_object_unref (data); + if (!qitem->is_query) + gst_mini_object_unref (qitem->item); + g_slice_free (GstQueueItem, qitem); } gst_queue_array_free (queue->queue); g_mutex_clear (&queue->qlock); g_cond_clear (&queue->item_add); g_cond_clear (&queue->item_del); + g_cond_clear (&queue->query_handled); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -575,21 +584,22 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment, static void gst_queue_locked_flush (GstQueue * queue, gboolean full) { - GstMiniObject *data; - while (!gst_queue_array_is_empty (queue->queue)) { - data = gst_queue_array_pop_head (queue->queue); + GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue); + /* Then lose another reference because we are supposed to destroy that data when flushing */ - if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) && - GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT - && GST_EVENT_TYPE (data) != GST_EVENT_EOS) { - gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data)); + if (!full && GST_IS_EVENT (qitem->item) && GST_EVENT_IS_STICKY (qitem->item) + && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT + && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) { + gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (qitem->item)); } - if (!GST_IS_QUERY (data)) - gst_mini_object_unref (data); + if (!qitem->is_query) + gst_mini_object_unref (qitem->item); + g_slice_free (GstQueueItem, qitem); } queue->last_query = FALSE; + g_cond_signal (&queue->query_handled); GST_QUEUE_CLEAR_LEVEL (queue->cur_level); queue->min_threshold.buffers = queue->orig_min_threshold.buffers; queue->min_threshold.bytes = queue->orig_min_threshold.bytes; @@ -616,8 +626,12 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item) queue->cur_level.bytes += gst_buffer_get_size (buffer); apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE); - if (item) - gst_queue_array_push_tail (queue->queue, item); + if (item) { + GstQueueItem *qitem = g_slice_new (GstQueueItem); + qitem->item = item; + qitem->is_query = FALSE; + gst_queue_array_push_tail (queue->queue, qitem); + } GST_QUEUE_SIGNAL_ADD (queue); } @@ -654,8 +668,12 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) break; } - if (item) - gst_queue_array_push_tail (queue->queue, item); + if (item) { + GstQueueItem *qitem = g_slice_new (GstQueueItem); + qitem->item = item; + qitem->is_query = FALSE; + gst_queue_array_push_tail (queue->queue, qitem); + } GST_QUEUE_SIGNAL_ADD (queue); } @@ -663,12 +681,16 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) static GstMiniObject * gst_queue_locked_dequeue (GstQueue * queue) { + GstQueueItem *qitem; GstMiniObject *item; - item = gst_queue_array_pop_head (queue->queue); - if (item == NULL) + qitem = gst_queue_array_pop_head (queue->queue); + if (qitem == NULL) goto no_item; + item = qitem->item; + g_slice_free (GstQueueItem, qitem); + if (GST_IS_BUFFER (item)) { GstBuffer *buffer = GST_BUFFER_CAST (item); @@ -748,6 +770,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) /* unblock the loop and chain functions */ GST_QUEUE_SIGNAL_ADD (queue); GST_QUEUE_SIGNAL_DEL (queue); + queue->last_query = FALSE; + g_cond_signal (&queue->query_handled); GST_QUEUE_MUTEX_UNLOCK (queue); /* make sure it pauses, this should happen since we sent @@ -830,16 +854,19 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) switch (GST_QUERY_TYPE (query)) { default: if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) { + GstQueueItem *qitem; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); GST_LOG_OBJECT (queue, "queuing query %p (%s)", query, GST_QUERY_TYPE_NAME (query)); - gst_queue_array_push_tail (queue->queue, query); + qitem = g_slice_new (GstQueueItem); + qitem->item = GST_MINI_OBJECT_CAST (query); + qitem->is_query = TRUE; + gst_queue_array_push_tail (queue->queue, qitem); GST_QUEUE_SIGNAL_ADD (queue); - while (!gst_queue_array_is_empty (queue->queue)) { - /* for as long as the queue has items, we know the query is - * not handled yet */ - GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing); - } + g_cond_wait (&queue->query_handled, &queue->qlock); + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; res = queue->last_query; GST_QUEUE_MUTEX_UNLOCK (queue); } else { @@ -852,16 +879,7 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) /* ERRORS */ out_flushing: { - gint index; - GST_DEBUG_OBJECT (queue, "we are flushing"); - - /* Remove query from queue if still there, since we hold no ref to it */ - index = gst_queue_array_find (queue->queue, NULL, query); - - if (index >= 0) - gst_queue_array_drop_element (queue->queue, index); - GST_QUEUE_MUTEX_UNLOCK (queue); return FALSE; } @@ -870,7 +888,7 @@ out_flushing: static gboolean gst_queue_is_empty (GstQueue * queue) { - GstMiniObject *head; + GstQueueItem *head; if (gst_queue_array_is_empty (queue->queue)) return TRUE; @@ -880,7 +898,7 @@ gst_queue_is_empty (GstQueue * queue) * we would block forever on serialized queries. */ head = gst_queue_array_peek_head (queue->queue); - if (!GST_IS_BUFFER (head) && !GST_IS_BUFFER_LIST (head)) + if (!GST_IS_BUFFER (head->item) && !GST_IS_BUFFER_LIST (head->item)) return FALSE; /* It is possible that a max size is reached before all min thresholds are. @@ -1134,6 +1152,7 @@ next: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping query %p because of EOS", query); queue->last_query = FALSE; + g_cond_signal (&queue->query_handled); } } /* no more items in the queue. Set the unexpected flag so that upstream @@ -1162,6 +1181,7 @@ next: GstQuery *query = GST_QUERY_CAST (data); queue->last_query = gst_pad_peer_query (queue->srcpad, query); + g_cond_signal (&queue->query_handled); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "did query %p, return %d", query, queue->last_query); } diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 12bea13..5134578 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -133,6 +133,7 @@ struct _GstQueue { /* whether the first new segment has been applied to src */ gboolean newseg_applied_to_src; + GCond query_handled; gboolean last_query; gboolean flush_on_eos; /* flush on EOS */ -- 2.7.4