* * Base class for mixers and muxers. Subclasses should at least implement
* the #GstAggregatorClass.aggregate() virtual method.
*
- * * When data is queued on all pads, tha aggregate vmethod is called.
+ * * When data is queued on all pads, the aggregate vmethod is called.
*
* * One can peek at the data on any given GstAggregatorPad with the
* gst_aggregator_pad_get_buffer () method, and take ownership of it
gboolean first_buffer;
- GQueue buffers;
+ GQueue data; /* buffers, events and queries */
GstBuffer *clipped_buffer;
guint num_buffers;
GstClockTime head_position;
GstClockTime tail_position;
- GstClockTime head_time;
+ GstClockTime head_time; /* running time */
GstClockTime tail_time;
- GstClockTime time_level;
+ GstClockTime time_level; /* how much head is ahead of tail */
GstSegment head_segment; /* segment before the queue */
gboolean negotiated;
*
* This method guarantees that @func will be called only once for each
* sink pad.
+ *
+ * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE
*/
gboolean
gst_aggregator_iterate_sinkpads (GstAggregator * self,
static gboolean
gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
{
- return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
+ return (g_queue_peek_tail (&pad->priv->data) == NULL &&
pad->priv->clipped_buffer == NULL);
}
}
static gboolean
-check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
+gst_aggregator_do_events_and_queries (GstAggregator * self,
+ GstAggregatorPad * pad, gpointer user_data)
{
GstEvent *event = NULL;
+ GstQuery *query = NULL;
GstAggregatorClass *klass = NULL;
gboolean *processed_event = user_data;
do {
event = NULL;
+ query = NULL;
PAD_LOCK (pad);
if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
pad->priv->eos = TRUE;
}
if (pad->priv->clipped_buffer == NULL &&
- GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
- event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers));
+ !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+ if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
+ event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
+ if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
+ query = g_queue_peek_tail (&pad->priv->data);
}
PAD_UNLOCK (pad);
- if (event) {
+ if (event || query) {
gboolean ret;
if (processed_event)
if (klass == NULL)
klass = GST_AGGREGATOR_GET_CLASS (self);
- GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
- gst_event_ref (event);
- ret = klass->sink_event (self, pad, event);
+ if (event) {
+ GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
+ gst_event_ref (event);
+ ret = klass->sink_event (self, pad, event);
+
+ PAD_LOCK (pad);
+ if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
+ pad->priv->negotiated = ret;
+ if (g_queue_peek_tail (&pad->priv->data) == event)
+ gst_event_unref (g_queue_pop_tail (&pad->priv->data));
+ gst_event_unref (event);
+ } else if (query) {
+ GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
+ ret = klass->sink_query (self, pad, query);
+
+ PAD_LOCK (pad);
+ if (g_queue_peek_tail (&pad->priv->data) == query) {
+ GstStructure *s;
+
+ s = gst_query_writable_structure (query);
+ gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
+ NULL);
+ g_queue_pop_tail (&pad->priv->data);
+ }
+ }
- PAD_LOCK (pad);
- if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
- pad->priv->negotiated = ret;
- if (g_queue_peek_tail (&pad->priv->buffers) == event)
- gst_event_unref (g_queue_pop_tail (&pad->priv->buffers));
- gst_event_unref (event);
PAD_BROADCAST_EVENT (pad);
PAD_UNLOCK (pad);
}
- } while (event != NULL);
+ } while (event || query);
return TRUE;
}
else
aggpad->priv->flow_return = flow_return;
- item = g_queue_peek_head_link (&aggpad->priv->buffers);
+ item = g_queue_peek_head_link (&aggpad->priv->data);
while (item) {
GList *next = item->next;
GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
!GST_EVENT_IS_STICKY (item->data)) {
- gst_mini_object_unref (item->data);
- g_queue_delete_link (&aggpad->priv->buffers, item);
+ if (!GST_IS_QUERY (item->data))
+ gst_mini_object_unref (item->data);
+ g_queue_delete_link (&aggpad->priv->data, item);
}
item = next;
}
GstFlowReturn flow_return = GST_FLOW_OK;
gboolean processed_event = FALSE;
- gst_aggregator_iterate_sinkpads (self, check_events, NULL);
+ gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
+ NULL);
if (!gst_aggregator_wait_and_check (self, &timeout))
continue;
- gst_aggregator_iterate_sinkpads (self, check_events, &processed_event);
+ gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
+ &processed_event);
if (processed_event)
continue;
GstPad *pad = GST_PAD (aggpad);
GstAggregatorPrivate *priv = self->priv;
+ GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
+
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
}
case GST_EVENT_FLUSH_STOP:
{
- GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
-
gst_aggregator_pad_flush (aggpad, self);
GST_OBJECT_LOCK (self);
if (priv->flush_seeking) {
}
case GST_EVENT_EOS:
{
- GST_DEBUG_OBJECT (aggpad, "EOS");
-
/* We still have a buffer, and we don't want the subclass to have to
* check for it. Mark pending_eos, eos will be set when steal_buffer is
* called
PAD_LOCK (aggpad);
GST_OBJECT_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->segment);
+ /* We've got a new segment, tail_position is now meaningless
+ * and may interfere with the time_level calculation
+ */
+ aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
update_time_level (aggpad, FALSE);
GST_OBJECT_UNLOCK (aggpad);
PAD_UNLOCK (aggpad);
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
+ /* Remove GAP event so we can replace it with the buffer */
+ if (g_queue_peek_tail (&aggpad->priv->data) == event)
+ gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
+
if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
GST_FLOW_OK) {
GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
gst_aggregator_pad_flush (pad, self);
PAD_LOCK (pad);
+ pad->priv->flow_return = GST_FLOW_FLUSHING;
pad->priv->negotiated = FALSE;
+ PAD_BROADCAST_EVENT (pad);
PAD_UNLOCK (pad);
return TRUE;
DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
}
static void
if (gst_aggregator_pad_has_space (self, aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
if (head)
- g_queue_push_head (&aggpad->priv->buffers, buffer);
+ g_queue_push_head (&aggpad->priv->data, buffer);
else
- g_queue_push_tail (&aggpad->priv->buffers, buffer);
+ g_queue_push_tail (&aggpad->priv->data, buffer);
apply_buffer (aggpad, buffer, head);
aggpad->priv->num_buffers++;
buffer = NULL;
gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
GstQuery * query)
{
+ GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
if (GST_QUERY_IS_SERIALIZED (query)) {
+ GstStructure *s;
+ gboolean ret = FALSE;
+
+ SRC_LOCK (self);
PAD_LOCK (aggpad);
+ if (aggpad->priv->flow_return != GST_FLOW_OK) {
+ SRC_UNLOCK (self);
+ goto flushing;
+ }
+
+ g_queue_push_head (&aggpad->priv->data, query);
+ SRC_BROADCAST (self);
+ SRC_UNLOCK (self);
+
while (!gst_aggregator_pad_queue_is_empty (aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
}
+ s = gst_query_writable_structure (query);
+ if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
+ gst_structure_remove_field (s, "gst-aggregator-retval");
+ else
+ g_queue_remove (&aggpad->priv->data, query);
+
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
PAD_UNLOCK (aggpad);
+
+ return ret;
}
- return klass->sink_query (GST_AGGREGATOR (parent),
- GST_AGGREGATOR_PAD (pad), query);
+ return klass->sink_query (self, aggpad, query);
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
+
return FALSE;
}
if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
event);
- g_queue_push_head (&aggpad->priv->buffers, event);
+ g_queue_push_head (&aggpad->priv->data, event);
event = NULL;
SRC_BROADCAST (self);
}
G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
GstAggregatorPadPrivate);
- g_queue_init (&pad->priv->buffers);
+ g_queue_init (&pad->priv->data);
g_cond_init (&pad->priv->event_cond);
g_mutex_init (&pad->priv->flush_lock);
gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
{
GstAggregator *self = NULL;
- GstAggregatorClass *aggclass;
+ GstAggregatorClass *aggclass = NULL;
GstBuffer *buffer = NULL;
while (pad->priv->clipped_buffer == NULL &&
- GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
- buffer = g_queue_pop_tail (&pad->priv->buffers);
+ GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+ buffer = g_queue_pop_tail (&pad->priv->data);
apply_buffer (pad, buffer, FALSE);
gst_aggregator_pad_clip_buffer_unlocked (pad);
buffer = pad->priv->clipped_buffer;
- pad->priv->clipped_buffer = NULL;
if (buffer) {
+ pad->priv->clipped_buffer = NULL;
gst_aggregator_pad_buffer_consumed (pad);
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
}