GstClockTime next_time; /* End running time of next buffer to be pushed */
GstClockTime last_time; /* Start running time of last pushed buffer */
GCond turn; /* SingleQueue turn waiting conditional */
+
+ /* for serialized queries */
+ GCond query_handled;
+ gboolean last_query;
};
sq->id);
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
g_cond_signal (&sq->turn);
+ sq->last_query = FALSE;
+ g_cond_signal (&sq->query_handled);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
sq->id, event, GST_EVENT_TYPE_NAME (event));
gst_pad_push_event (sq->srcpad, event);
+ } else if (GST_IS_QUERY (object)) {
+ GstQuery *query;
+ gboolean res;
+
+ query = GST_QUERY_CAST (object);
+
+ res = gst_pad_peer_query (sq->srcpad, query);
+
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ sq->last_query = res;
+ g_cond_signal (&sq->query_handled);
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
} else {
g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
sq->id);
}
static GstMultiQueueItem *
-gst_multi_queue_event_item_new (GstMiniObject * object, guint32 curid)
+gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid)
{
GstMultiQueueItem *item;
/* Get an unique incrementing id. */
curid = g_atomic_int_add ((gint *) & mq->counter, 1);
- item = gst_multi_queue_event_item_new ((GstMiniObject *) event, curid);
+ item = gst_multi_queue_mo_item_new ((GstMiniObject *) event, curid);
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Enqueuing event %p of type %s with id %d",
gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
gboolean res;
+ GstSingleQueue *sq;
+ GstMultiQueue *mq;
+
+ sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
+ mq = (GstMultiQueue *) parent;
switch (GST_QUERY_TYPE (query)) {
default:
if (GST_QUERY_IS_SERIALIZED (query)) {
- GST_WARNING_OBJECT (pad, "unhandled serialized query");
- res = FALSE;
+ guint32 curid;
+ GstMultiQueueItem *item;
+
+ /* Get an unique incrementing id. */
+ curid = g_atomic_int_add ((gint *) & mq->counter, 1);
+
+ item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
+
+ GST_DEBUG_OBJECT (mq,
+ "SingleQueue %d : Enqueuing query %p of type %s with id %d",
+ sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
+
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
+ g_cond_wait (&sq->query_handled, &mq->qlock);
+ res = sq->last_query;
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
} else {
/* default handling */
res = gst_pad_query_default (pad, parent, query);
gst_data_queue_flush (sq->queue);
g_object_unref (sq->queue);
g_cond_clear (&sq->turn);
+ g_cond_clear (&sq->query_handled);
g_free (sq);
}
sq->next_time = GST_CLOCK_TIME_NONE;
sq->last_time = GST_CLOCK_TIME_NONE;
g_cond_init (&sq->turn);
+ g_cond_init (&sq->query_handled);
sq->sinktime = GST_CLOCK_TIME_NONE;
sq->srctime = GST_CLOCK_TIME_NONE;