check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
{
GstEvent *event = NULL;
+ GstQuery *query = NULL;
GstAggregatorClass *klass = NULL;
gboolean *processed_event = user_data;
pad->priv->eos = TRUE;
}
if (pad->priv->clipped_buffer == NULL &&
- GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
- event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers));
+ !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
+ if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers)))
+ event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers));
+ if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->buffers)))
+ query = g_queue_peek_tail (&pad->priv->buffers);
}
PAD_UNLOCK (pad);
- if (event) {
+ if (event || query) {
gboolean ret;
if (processed_event)
klass = GST_AGGREGATOR_GET_CLASS (self);
GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
- gst_event_ref (event);
- ret = klass->sink_event (self, pad, event);
- PAD_LOCK (pad);
- if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
- pad->priv->negotiated = ret;
- if (g_queue_peek_tail (&pad->priv->buffers) == event)
- gst_event_unref (g_queue_pop_tail (&pad->priv->buffers));
- gst_event_unref (event);
+ if (event) {
+ gst_event_ref (event);
+ ret = klass->sink_event (self, pad, event);
+
+ PAD_LOCK (pad);
+ if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
+ pad->priv->negotiated = ret;
+ if (g_queue_peek_tail (&pad->priv->buffers) == event)
+ gst_event_unref (g_queue_pop_tail (&pad->priv->buffers));
+ gst_event_unref (event);
+ }
+
+ if (query) {
+ GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
+ ret = klass->sink_query (self, pad, query);
+
+ PAD_LOCK (pad);
+ if (g_queue_peek_tail (&pad->priv->buffers) == query) {
+ GstStructure *s;
+
+ s = gst_query_writable_structure (query);
+ gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
+ NULL);
+ g_queue_pop_tail (&pad->priv->buffers);
+ }
+ }
+
PAD_BROADCAST_EVENT (pad);
PAD_UNLOCK (pad);
}
+ if (query) {
+ if (processed_event)
+ *processed_event = TRUE;
+ if (klass == NULL)
+ klass = GST_AGGREGATOR_GET_CLASS (self);
+ }
} while (event != NULL);
return TRUE;
GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
!GST_EVENT_IS_STICKY (item->data)) {
- gst_mini_object_unref (item->data);
+ if (!GST_IS_QUERY (item->data))
+ gst_mini_object_unref (item->data);
g_queue_delete_link (&aggpad->priv->buffers, item);
}
item = next;
gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
GstQuery * query)
{
+ GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
if (GST_QUERY_IS_SERIALIZED (query)) {
+ GstStructure *s;
+ gboolean ret = FALSE;
+
+ SRC_LOCK (self);
PAD_LOCK (aggpad);
+ if (aggpad->priv->flow_return != GST_FLOW_OK) {
+ SRC_UNLOCK (self);
+ goto flushing;
+ }
+
+ g_queue_push_head (&aggpad->priv->buffers, query);
+ SRC_BROADCAST (self);
+ SRC_UNLOCK (self);
+
while (!gst_aggregator_pad_queue_is_empty (aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
}
+ s = gst_query_writable_structure (query);
+ if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
+ gst_structure_remove_field (s, "gst-aggregator-retval");
+ else
+ g_queue_remove (&aggpad->priv->buffers, query);
+
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
PAD_UNLOCK (aggpad);
+
+ return ret;
}
- return klass->sink_query (GST_AGGREGATOR (parent),
- GST_AGGREGATOR_PAD (pad), query);
+ return klass->sink_query (self, aggpad, query);
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
+
return FALSE;
}