multiqueue: handle serialized queries
authorWim Taymans <wim.taymans@collabora.co.uk>
Mon, 19 Mar 2012 10:45:27 +0000 (11:45 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Mon, 19 Mar 2012 10:45:27 +0000 (11:45 +0100)
plugins/elements/gstmultiqueue.c

index 7028203..cf18dc5 100644 (file)
@@ -160,6 +160,10 @@ struct _GstSingleQueue
   GstClockTime next_time;       /* End running time of next buffer to be pushed */
   GstClockTime last_time;       /* Start running time of last pushed buffer */
   GCond turn;                   /* SingleQueue turn waiting conditional */
+
+  /* for serialized queries */
+  GCond query_handled;
+  gboolean last_query;
 };
 
 
@@ -750,6 +754,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
         sq->id);
     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
     g_cond_signal (&sq->turn);
+    sq->last_query = FALSE;
+    g_cond_signal (&sq->query_handled);
     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
     GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
@@ -1073,6 +1079,18 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
         sq->id, event, GST_EVENT_TYPE_NAME (event));
 
     gst_pad_push_event (sq->srcpad, event);
+  } else if (GST_IS_QUERY (object)) {
+    GstQuery *query;
+    gboolean res;
+
+    query = GST_QUERY_CAST (object);
+
+    res = gst_pad_peer_query (sq->srcpad, query);
+
+    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+    sq->last_query = res;
+    g_cond_signal (&sq->query_handled);
+    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   } else {
     g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
         sq->id);
@@ -1121,7 +1139,7 @@ gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
 }
 
 static GstMultiQueueItem *
-gst_multi_queue_event_item_new (GstMiniObject * object, guint32 curid)
+gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid)
 {
   GstMultiQueueItem *item;
 
@@ -1464,7 +1482,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
   /* Get an unique incrementing id. */
   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
 
-  item = gst_multi_queue_event_item_new ((GstMiniObject *) event, curid);
+  item = gst_multi_queue_mo_item_new ((GstMiniObject *) event, curid);
 
   GST_DEBUG_OBJECT (mq,
       "SingleQueue %d : Enqueuing event %p of type %s with id %d",
@@ -1516,12 +1534,32 @@ static gboolean
 gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
 {
   gboolean res;
+  GstSingleQueue *sq;
+  GstMultiQueue *mq;
+
+  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
+  mq = (GstMultiQueue *) parent;
 
   switch (GST_QUERY_TYPE (query)) {
     default:
       if (GST_QUERY_IS_SERIALIZED (query)) {
-        GST_WARNING_OBJECT (pad, "unhandled serialized query");
-        res = FALSE;
+        guint32 curid;
+        GstMultiQueueItem *item;
+
+        /* Get an unique incrementing id. */
+        curid = g_atomic_int_add ((gint *) & mq->counter, 1);
+
+        item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
+
+        GST_DEBUG_OBJECT (mq,
+            "SingleQueue %d : Enqueuing query %p of type %s with id %d",
+            sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
+
+        GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+        res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
+        g_cond_wait (&sq->query_handled, &mq->qlock);
+        res = sq->last_query;
+        GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       } else {
         /* default handling */
         res = gst_pad_query_default (pad, parent, query);
@@ -1839,6 +1877,7 @@ gst_single_queue_free (GstSingleQueue * sq)
   gst_data_queue_flush (sq->queue);
   g_object_unref (sq->queue);
   g_cond_clear (&sq->turn);
+  g_cond_clear (&sq->query_handled);
   g_free (sq);
 }
 
@@ -1903,6 +1942,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
   sq->next_time = GST_CLOCK_TIME_NONE;
   sq->last_time = GST_CLOCK_TIME_NONE;
   g_cond_init (&sq->turn);
+  g_cond_init (&sq->query_handled);
 
   sq->sinktime = GST_CLOCK_TIME_NONE;
   sq->srctime = GST_CLOCK_TIME_NONE;