queue: Fix handling of serialized queries
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Mon, 27 May 2013 13:59:07 +0000 (15:59 +0200)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Mon, 27 May 2013 13:59:07 +0000 (15:59 +0200)
During FLUSH_START the query needs to be unblocked already, otherwise
it can lead to deadlocks if the FLUSH_START is the result of something
done from the streaming thread of the srcpad (the queue will never be
emptied!).

plugins/elements/gstqueue.c
plugins/elements/gstqueue.h

index 8348265..aae51aa 100644 (file)
@@ -219,6 +219,13 @@ static gboolean gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
 static gboolean gst_queue_is_empty (GstQueue * queue);
 static gboolean gst_queue_is_filled (GstQueue * queue);
 
+
+typedef struct
+{
+  gboolean is_query;
+  GstMiniObject *item;
+} GstQueueItem;
+
 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
 
 static GType
@@ -437,6 +444,7 @@ gst_queue_init (GstQueue * queue)
   g_mutex_init (&queue->qlock);
   g_cond_init (&queue->item_add);
   g_cond_init (&queue->item_del);
+  g_cond_init (&queue->query_handled);
 
   queue->queue = gst_queue_array_new (DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
 
@@ -456,22 +464,23 @@ gst_queue_init (GstQueue * queue)
 static void
 gst_queue_finalize (GObject * object)
 {
-  GstMiniObject *data;
   GstQueue *queue = GST_QUEUE (object);
 
   GST_DEBUG_OBJECT (queue, "finalizing queue");
 
   while (!gst_queue_array_is_empty (queue->queue)) {
-    data = gst_queue_array_pop_head (queue->queue);
+    GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
     /* FIXME: if it's a query, shouldn't we unref that too? */
-    if (!GST_IS_QUERY (data))
-      gst_mini_object_unref (data);
+    if (!qitem->is_query)
+      gst_mini_object_unref (qitem->item);
+    g_slice_free (GstQueueItem, qitem);
   }
   gst_queue_array_free (queue->queue);
 
   g_mutex_clear (&queue->qlock);
   g_cond_clear (&queue->item_add);
   g_cond_clear (&queue->item_del);
+  g_cond_clear (&queue->query_handled);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -575,21 +584,22 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
 static void
 gst_queue_locked_flush (GstQueue * queue, gboolean full)
 {
-  GstMiniObject *data;
-
   while (!gst_queue_array_is_empty (queue->queue)) {
-    data = gst_queue_array_pop_head (queue->queue);
+    GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
+
     /* Then lose another reference because we are supposed to destroy that
        data when flushing */
-    if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
-        GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
-        && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
-      gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
+    if (!full && GST_IS_EVENT (qitem->item) && GST_EVENT_IS_STICKY (qitem->item)
+        && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
+        && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
+      gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (qitem->item));
     }
-    if (!GST_IS_QUERY (data))
-      gst_mini_object_unref (data);
+    if (!qitem->is_query)
+      gst_mini_object_unref (qitem->item);
+    g_slice_free (GstQueueItem, qitem);
   }
   queue->last_query = FALSE;
+  g_cond_signal (&queue->query_handled);
   GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
   queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
   queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
@@ -616,8 +626,12 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
   queue->cur_level.bytes += gst_buffer_get_size (buffer);
   apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
 
-  if (item)
-    gst_queue_array_push_tail (queue->queue, item);
+  if (item) {
+    GstQueueItem *qitem = g_slice_new (GstQueueItem);
+    qitem->item = item;
+    qitem->is_query = FALSE;
+    gst_queue_array_push_tail (queue->queue, qitem);
+  }
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
@@ -654,8 +668,12 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
       break;
   }
 
-  if (item)
-    gst_queue_array_push_tail (queue->queue, item);
+  if (item) {
+    GstQueueItem *qitem = g_slice_new (GstQueueItem);
+    qitem->item = item;
+    qitem->is_query = FALSE;
+    gst_queue_array_push_tail (queue->queue, qitem);
+  }
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
@@ -663,12 +681,16 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
 static GstMiniObject *
 gst_queue_locked_dequeue (GstQueue * queue)
 {
+  GstQueueItem *qitem;
   GstMiniObject *item;
 
-  item = gst_queue_array_pop_head (queue->queue);
-  if (item == NULL)
+  qitem = gst_queue_array_pop_head (queue->queue);
+  if (qitem == NULL)
     goto no_item;
 
+  item = qitem->item;
+  g_slice_free (GstQueueItem, qitem);
+
   if (GST_IS_BUFFER (item)) {
     GstBuffer *buffer = GST_BUFFER_CAST (item);
 
@@ -748,6 +770,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       /* unblock the loop and chain functions */
       GST_QUEUE_SIGNAL_ADD (queue);
       GST_QUEUE_SIGNAL_DEL (queue);
+      queue->last_query = FALSE;
+      g_cond_signal (&queue->query_handled);
       GST_QUEUE_MUTEX_UNLOCK (queue);
 
       /* make sure it pauses, this should happen since we sent
@@ -830,16 +854,19 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
   switch (GST_QUERY_TYPE (query)) {
     default:
       if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
+        GstQueueItem *qitem;
+
         GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
         GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
             GST_QUERY_TYPE_NAME (query));
-        gst_queue_array_push_tail (queue->queue, query);
+        qitem = g_slice_new (GstQueueItem);
+        qitem->item = GST_MINI_OBJECT_CAST (query);
+        qitem->is_query = TRUE;
+        gst_queue_array_push_tail (queue->queue, qitem);
         GST_QUEUE_SIGNAL_ADD (queue);
-        while (!gst_queue_array_is_empty (queue->queue)) {
-          /* for as long as the queue has items, we know the query is
-           * not handled yet */
-          GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
-        }
+        g_cond_wait (&queue->query_handled, &queue->qlock);
+        if (queue->srcresult != GST_FLOW_OK)
+          goto out_flushing;
         res = queue->last_query;
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
@@ -852,16 +879,7 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
   /* ERRORS */
 out_flushing:
   {
-    gint index;
-
     GST_DEBUG_OBJECT (queue, "we are flushing");
-
-    /* Remove query from queue if still there, since we hold no ref to it */
-    index = gst_queue_array_find (queue->queue, NULL, query);
-
-    if (index >= 0)
-      gst_queue_array_drop_element (queue->queue, index);
-
     GST_QUEUE_MUTEX_UNLOCK (queue);
     return FALSE;
   }
@@ -870,7 +888,7 @@ out_flushing:
 static gboolean
 gst_queue_is_empty (GstQueue * queue)
 {
-  GstMiniObject *head;
+  GstQueueItem *head;
 
   if (gst_queue_array_is_empty (queue->queue))
     return TRUE;
@@ -880,7 +898,7 @@ gst_queue_is_empty (GstQueue * queue)
    * we would block forever on serialized queries.
    */
   head = gst_queue_array_peek_head (queue->queue);
-  if (!GST_IS_BUFFER (head) && !GST_IS_BUFFER_LIST (head))
+  if (!GST_IS_BUFFER (head->item) && !GST_IS_BUFFER_LIST (head->item))
     return FALSE;
 
   /* It is possible that a max size is reached before all min thresholds are.
@@ -1134,6 +1152,7 @@ next:
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
               "dropping query %p because of EOS", query);
           queue->last_query = FALSE;
+          g_cond_signal (&queue->query_handled);
         }
       }
       /* no more items in the queue. Set the unexpected flag so that upstream
@@ -1162,6 +1181,7 @@ next:
     GstQuery *query = GST_QUERY_CAST (data);
 
     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
+    g_cond_signal (&queue->query_handled);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "did query %p, return %d", query, queue->last_query);
   }
index 12bea13..5134578 100644 (file)
@@ -133,6 +133,7 @@ struct _GstQueue {
   /* whether the first new segment has been applied to src */
   gboolean newseg_applied_to_src;
 
+  GCond query_handled;
   gboolean last_query;
 
   gboolean flush_on_eos; /* flush on EOS */