queue2: Unblock any waiting serialize queries on FLUSH_START
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Mon, 27 May 2013 13:41:14 +0000 (15:41 +0200)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Mon, 27 May 2013 13:41:14 +0000 (15:41 +0200)
Fixes some deadlocks during flushing.

And store queue items differently to not accidentially read
already unreffed queries when flushing. Queries are owned by
upstream and not us.

plugins/elements/gstqueue2.c

index 90395df..4c3202a 100644 (file)
@@ -265,6 +265,12 @@ typedef enum
   GST_QUEUE2_ITEM_TYPE_QUERY
 } GstQueue2ItemType;
 
+typedef struct
+{
+  GstQueue2ItemType type;
+  GstMiniObject *item;
+} GstQueue2Item;
+
 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
 
 static void
@@ -467,10 +473,11 @@ gst_queue2_finalize (GObject * object)
   GST_DEBUG_OBJECT (queue, "finalizing queue");
 
   while (!g_queue_is_empty (&queue->queue)) {
-    GstMiniObject *data = g_queue_pop_head (&queue->queue);
+    GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
 
-    if (!GST_IS_QUERY (data))
-      gst_mini_object_unref (data);
+    if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
+      gst_mini_object_unref (qitem->item);
+    g_slice_free (GstQueue2Item, qitem);
   }
 
   queue->last_query = FALSE;
@@ -1528,18 +1535,21 @@ gst_queue2_locked_flush (GstQueue2 * queue, gboolean full)
     init_ranges (queue);
   } else {
     while (!g_queue_is_empty (&queue->queue)) {
-      GstMiniObject *data = g_queue_pop_head (&queue->queue);
-
-      if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
-          GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
-          && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
-        gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
+      GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
+
+      if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
+          && GST_EVENT_IS_STICKY (qitem->item)
+          && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
+          && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
+        gst_pad_store_sticky_event (queue->srcpad,
+            GST_EVENT_CAST (qitem->item));
       }
 
       /* Then lose another reference because we are supposed to destroy that
          data when flushing */
-      if (!GST_IS_QUERY (data))
-        gst_mini_object_unref (data);
+      if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
+        gst_mini_object_unref (qitem->item);
+      g_slice_free (GstQueue2Item, qitem);
     }
   }
   g_cond_signal (&queue->query_handled);
@@ -2034,7 +2044,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
       update_buffering (queue);
 
     if (QUEUE_IS_USING_QUEUE (queue)) {
-      g_queue_push_tail (&queue->queue, item);
+      GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
+      qitem->type = item_type;
+      qitem->item = item;
+      g_queue_push_tail (&queue->queue, qitem);
     } else {
       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
     }
@@ -2062,10 +2075,13 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
 {
   GstMiniObject *item;
 
-  if (!QUEUE_IS_USING_QUEUE (queue))
+  if (!QUEUE_IS_USING_QUEUE (queue)) {
     item = gst_queue2_read_item_from_file (queue);
-  else
-    item = g_queue_pop_head (&queue->queue);
+  } else {
+    GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
+    item = qitem->item;
+    g_slice_free (GstQueue2Item, qitem);
+  }
 
   if (item == NULL)
     goto no_item;
@@ -2182,6 +2198,8 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
         /* unblock the loop and chain functions */
         GST_QUEUE2_SIGNAL_ADD (queue);
         GST_QUEUE2_SIGNAL_DEL (queue);
+        queue->last_query = FALSE;
+        g_cond_signal (&queue->query_handled);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
         /* make sure it pauses, this should happen since we sent
@@ -2193,6 +2211,8 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
         /* flush the sink pad */
         queue->sinkresult = GST_FLOW_FLUSHING;
         GST_QUEUE2_SIGNAL_DEL (queue);
+        queue->last_query = FALSE;
+        g_cond_signal (&queue->query_handled);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
         gst_event_unref (event);