/* dequeue an item from the queue and update level stats, with QUEUE_LOCK */
static GstMiniObject *
-gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer)
+gst_queue_locked_dequeue (GstQueue * queue)
{
GstMiniObject *item;
if (queue->cur_level.buffers == 0)
queue->cur_level.time = 0;
- *is_buffer = TRUE;
} else if (GST_IS_EVENT (item)) {
GstEvent *event = GST_EVENT_CAST (item);
default:
break;
}
+ } else if (GST_IS_QUERY (item)) {
+ GstQuery *query = GST_QUERY_CAST (item);
- *is_buffer = FALSE;
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "retrieved query %p from queue", query);
} else {
g_warning
("Unexpected item %p dequeued from queue %s (refcounting problem?)",
static gboolean
gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
+ GstQueue *queue = GST_QUEUE_CAST (parent);
gboolean res;
switch (GST_QUERY_TYPE (query)) {
default:
- res = gst_pad_query_default (pad, parent, query);
+ if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
+ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
+ GST_QUERY_TYPE_NAME (query));
+ g_queue_push_tail (&queue->queue, query);
+ GST_QUEUE_SIGNAL_ADD (queue);
+ while (queue->queue.length != 0) {
+ /* for as long as the queue has items, we know the query is
+ * not handled yet */
+ GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
+ }
+ res = queue->last_query;
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ } else {
+ res = gst_pad_query_default (pad, parent, query);
+ }
break;
}
return res;
+
+ /* ERRORS */
+out_flushing:
+ {
+ GST_DEBUG_OBJECT (queue, "we are flushing");
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ return FALSE;
+ }
}
static gboolean
/* for as long as the queue is filled, dequeue an item and discard it */
while (gst_queue_is_filled (queue)) {
GstMiniObject *leak;
- gboolean is_buffer;
- leak = gst_queue_locked_dequeue (queue, &is_buffer);
+ leak = gst_queue_locked_dequeue (queue);
/* there is nothing to dequeue and the queue is still filled.. This should
* not happen */
g_assert (leak != NULL);
{
GstFlowReturn result = GST_FLOW_OK;
GstMiniObject *data;
- gboolean is_buffer;
- data = gst_queue_locked_dequeue (queue, &is_buffer);
+ data = gst_queue_locked_dequeue (queue);
if (data == NULL)
goto no_item;
next:
- if (is_buffer) {
+ if (GST_IS_BUFFER (data)) {
GstBuffer *buffer;
buffer = GST_BUFFER_CAST (data);
* can push again, which is EOS or SEGMENT. If there is nothing in the
* queue we can push, we set a flag to make the sinkpad refuse more
* buffers with an EOS return value. */
- while ((data = gst_queue_locked_dequeue (queue, &is_buffer))) {
- if (is_buffer) {
+ while ((data = gst_queue_locked_dequeue (queue))) {
+ if (GST_IS_BUFFER (data)) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping EOS buffer %p", data);
gst_buffer_unref (GST_BUFFER_CAST (data));
- } else {
+ } else if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping EOS event %p", event);
gst_event_unref (event);
+ } else if (GST_IS_QUERY (data)) {
+ GstQuery *query = GST_QUERY_CAST (data);
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping query %p because of EOS", query);
+ queue->last_query = FALSE;
}
}
/* no more items in the queue. Set the unexpected flag so that upstream
queue->unexpected = TRUE;
result = GST_FLOW_OK;
}
- } else {
+ } else if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
"pushed EOS event %p, return EOS", event);
result = GST_FLOW_EOS;
}
+ } else if (GST_IS_QUERY (data)) {
+ GstQuery *query = GST_QUERY_CAST (data);
+
+ queue->last_query = gst_pad_peer_query (queue->srcpad, query);
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "did query %p, return %d", query, queue->last_query);
}
return result;