multiqueue: only block serialized query when it's safe
authorSebastian Dröge <slomo@circular-chaos.org>
Tue, 16 Jul 2013 09:36:50 +0000 (11:36 +0200)
committerSebastian Dröge <slomo@circular-chaos.org>
Tue, 16 Jul 2013 09:36:50 +0000 (11:36 +0200)
We must be certain that we don't cause a deadlock when blocking the serialized
queries. One such deadlock can happen when we are buffering and downstream is
blocked in preroll and a serialized query arrives. Downstream will not unblock
(and allow our query to execute) until we complete buffering and buffering will
not complete until we can answer the query..

https://bugzilla.gnome.org/show_bug.cgi?id=702840

plugins/elements/gstmultiqueue.c

index 9e9209c..7a9cd5f 100644 (file)
@@ -1614,17 +1614,30 @@ gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
         if (sq->srcresult != GST_FLOW_OK)
           goto out_flushing;
 
-        /* Get an unique incrementing id. */
-        curid = g_atomic_int_add ((gint *) & mq->counter, 1);
-
-        item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
-
-        GST_DEBUG_OBJECT (mq,
-            "SingleQueue %d : Enqueuing query %p of type %s with id %d",
-            sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
-        res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
-        g_cond_wait (&sq->query_handled, &mq->qlock);
-        res = sq->last_query;
+        /* serialized events go in the queue. We need to be certain that we
+         * don't cause deadlocks waiting for the query return value. We check if
+         * the queue is empty (nothing is blocking downstream and the query can
+         * be pushed for sure) or we are not buffering. If we are buffering,
+         * the pipeline waits to unblock downstream until our queue fills up
+         * completely, which can not happen if we block on the query..
+         * Therefore we only potentially block when we are not buffering. */
+        if (!mq->use_buffering || gst_data_queue_is_empty (sq->queue)) {
+          /* Get an unique incrementing id. */
+          curid = g_atomic_int_add ((gint *) & mq->counter, 1);
+
+          item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
+
+          GST_DEBUG_OBJECT (mq,
+              "SingleQueue %d : Enqueuing query %p of type %s with id %d",
+              sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
+          res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
+          g_cond_wait (&sq->query_handled, &mq->qlock);
+          res = sq->last_query;
+        } else {
+          GST_DEBUG_OBJECT (mq, "refusing query, we are buffering and the "
+              "queue is not empty");
+          res = FALSE;
+        }
         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       } else {
         /* default handling */