queue: add support for serialized queries
authorWim Taymans <wim.taymans@collabora.co.uk>
Wed, 14 Mar 2012 14:42:33 +0000 (15:42 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 14 Mar 2012 14:42:33 +0000 (15:42 +0100)
plugins/elements/gstqueue.c
plugins/elements/gstqueue.h

index a95f4f9..9bbb755 100644 (file)
@@ -626,7 +626,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
 
 /* dequeue an item from the queue and update level stats, with QUEUE_LOCK */
 static GstMiniObject *
-gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer)
+gst_queue_locked_dequeue (GstQueue * queue)
 {
   GstMiniObject *item;
 
@@ -648,7 +648,6 @@ gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer)
     if (queue->cur_level.buffers == 0)
       queue->cur_level.time = 0;
 
-    *is_buffer = TRUE;
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event = GST_EVENT_CAST (item);
 
@@ -671,8 +670,11 @@ gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer)
       default:
         break;
     }
+  } else if (GST_IS_QUERY (item)) {
+    GstQuery *query = GST_QUERY_CAST (item);
 
-    *is_buffer = FALSE;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved query %p from queue", query);
   } else {
     g_warning
         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
@@ -776,14 +778,38 @@ out_eos:
 static gboolean
 gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
 {
+  GstQueue *queue = GST_QUEUE_CAST (parent);
   gboolean res;
 
   switch (GST_QUERY_TYPE (query)) {
     default:
-      res = gst_pad_query_default (pad, parent, query);
+      if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
+        GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+        GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
+            GST_QUERY_TYPE_NAME (query));
+        g_queue_push_tail (&queue->queue, query);
+        GST_QUEUE_SIGNAL_ADD (queue);
+        while (queue->queue.length != 0) {
+          /* for as long as the queue has items, we know the query is
+           * not handled yet */
+          GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
+        }
+        res = queue->last_query;
+        GST_QUEUE_MUTEX_UNLOCK (queue);
+      } else {
+        res = gst_pad_query_default (pad, parent, query);
+      }
       break;
   }
   return res;
+
+  /* ERRORS */
+out_flushing:
+  {
+    GST_DEBUG_OBJECT (queue, "we are flushing");
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+    return FALSE;
+  }
 }
 
 static gboolean
@@ -820,9 +846,8 @@ gst_queue_leak_downstream (GstQueue * queue)
   /* for as long as the queue is filled, dequeue an item and discard it */
   while (gst_queue_is_filled (queue)) {
     GstMiniObject *leak;
-    gboolean is_buffer;
 
-    leak = gst_queue_locked_dequeue (queue, &is_buffer);
+    leak = gst_queue_locked_dequeue (queue);
     /* there is nothing to dequeue and the queue is still filled.. This should
      * not happen */
     g_assert (leak != NULL);
@@ -977,14 +1002,13 @@ gst_queue_push_one (GstQueue * queue)
 {
   GstFlowReturn result = GST_FLOW_OK;
   GstMiniObject *data;
-  gboolean is_buffer;
 
-  data = gst_queue_locked_dequeue (queue, &is_buffer);
+  data = gst_queue_locked_dequeue (queue);
   if (data == NULL)
     goto no_item;
 
 next:
-  if (is_buffer) {
+  if (GST_IS_BUFFER (data)) {
     GstBuffer *buffer;
 
     buffer = GST_BUFFER_CAST (data);
@@ -1013,12 +1037,12 @@ next:
        * can push again, which is EOS or SEGMENT. If there is nothing in the
        * queue we can push, we set a flag to make the sinkpad refuse more
        * buffers with an EOS return value. */
-      while ((data = gst_queue_locked_dequeue (queue, &is_buffer))) {
-        if (is_buffer) {
+      while ((data = gst_queue_locked_dequeue (queue))) {
+        if (GST_IS_BUFFER (data)) {
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
               "dropping EOS buffer %p", data);
           gst_buffer_unref (GST_BUFFER_CAST (data));
-        } else {
+        } else if (GST_IS_EVENT (data)) {
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);
 
@@ -1032,6 +1056,12 @@ next:
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
               "dropping EOS event %p", event);
           gst_event_unref (event);
+        } else if (GST_IS_QUERY (data)) {
+          GstQuery *query = GST_QUERY_CAST (data);
+
+          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+              "dropping query %p because of EOS", query);
+          queue->last_query = FALSE;
         }
       }
       /* no more items in the queue. Set the unexpected flag so that upstream
@@ -1041,7 +1071,7 @@ next:
       queue->unexpected = TRUE;
       result = GST_FLOW_OK;
     }
-  } else {
+  } else if (GST_IS_EVENT (data)) {
     GstEvent *event = GST_EVENT_CAST (data);
     GstEventType type = GST_EVENT_TYPE (event);
 
@@ -1056,6 +1086,12 @@ next:
           "pushed EOS event %p, return EOS", event);
       result = GST_FLOW_EOS;
     }
+  } else if (GST_IS_QUERY (data)) {
+    GstQuery *query = GST_QUERY_CAST (data);
+
+    queue->last_query = gst_pad_peer_query (queue->srcpad, query);
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "did query %p, return %d", query, queue->last_query);
   }
   return result;
 
index 3b0752c..57ccc2a 100644 (file)
@@ -131,6 +131,8 @@ struct _GstQueue {
 
   /* whether the first new segment has been applied to src */
   gboolean newseg_applied_to_src;
+
+  gboolean last_query;
 };
 
 struct _GstQueueClass {