* * Base class for mixers and muxers. Subclasses should at least implement
* the #GstAggregatorClass.aggregate() virtual method.
*
- * * When data is queued on all pads, tha aggregate vmethod is called.
+ * * When data is queued on all pads, the aggregate vmethod is called.
*
* * One can peek at the data on any given GstAggregatorPad with the
* gst_aggregator_pad_get_buffer () method, and take ownership of it
gboolean first_buffer;
- GQueue buffers;
+ GQueue data; /* buffers, events and queries */
GstBuffer *clipped_buffer;
guint num_buffers;
GstClockTime head_position;
GstClockTime tail_position;
- GstClockTime head_time;
+ GstClockTime head_time; /* running time */
GstClockTime tail_time;
- GstClockTime time_level;
+ GstClockTime time_level; /* how much head is ahead of tail */
GstSegment head_segment; /* segment before the queue */
gboolean negotiated;
*
* This method guarantees that @func will be called only once for each
* sink pad.
+ *
+ * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE
*/
gboolean
gst_aggregator_iterate_sinkpads (GstAggregator * self,
static gboolean
gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
{
- return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
+ return (g_queue_peek_tail (&pad->priv->data) == NULL &&
pad->priv->clipped_buffer == NULL);
}
}
static gboolean
-check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
+gst_aggregator_do_events_and_queries (GstAggregator * self,
+ GstAggregatorPad * pad, gpointer user_data)
{
GstEvent *event = NULL;
GstQuery *query = NULL;
do {
event = NULL;
+ query = NULL;
PAD_LOCK (pad);
if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
pad->priv->eos = TRUE;
}
if (pad->priv->clipped_buffer == NULL &&
- !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
- if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers)))
- event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers));
- if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->buffers)))
- query = g_queue_peek_tail (&pad->priv->buffers);
+ !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+ if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
+ event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
+ if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
+ query = g_queue_peek_tail (&pad->priv->data);
}
PAD_UNLOCK (pad);
if (event || query) {
if (klass == NULL)
klass = GST_AGGREGATOR_GET_CLASS (self);
- GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
-
if (event) {
+ GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
gst_event_ref (event);
ret = klass->sink_event (self, pad, event);
PAD_LOCK (pad);
if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
pad->priv->negotiated = ret;
- if (g_queue_peek_tail (&pad->priv->buffers) == event)
- gst_event_unref (g_queue_pop_tail (&pad->priv->buffers));
+ if (g_queue_peek_tail (&pad->priv->data) == event)
+ gst_event_unref (g_queue_pop_tail (&pad->priv->data));
gst_event_unref (event);
- }
-
- if (query) {
- GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
+ } else if (query) {
+ GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
ret = klass->sink_query (self, pad, query);
PAD_LOCK (pad);
- if (g_queue_peek_tail (&pad->priv->buffers) == query) {
+ if (g_queue_peek_tail (&pad->priv->data) == query) {
GstStructure *s;
s = gst_query_writable_structure (query);
gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
NULL);
- g_queue_pop_tail (&pad->priv->buffers);
+ g_queue_pop_tail (&pad->priv->data);
}
}
PAD_BROADCAST_EVENT (pad);
PAD_UNLOCK (pad);
}
- if (query) {
- if (processed_event)
- *processed_event = TRUE;
- if (klass == NULL)
- klass = GST_AGGREGATOR_GET_CLASS (self);
- }
- } while (event != NULL);
+ } while (event || query);
return TRUE;
}
else
aggpad->priv->flow_return = flow_return;
- item = g_queue_peek_head_link (&aggpad->priv->buffers);
+ item = g_queue_peek_head_link (&aggpad->priv->data);
while (item) {
GList *next = item->next;
!GST_EVENT_IS_STICKY (item->data)) {
if (!GST_IS_QUERY (item->data))
gst_mini_object_unref (item->data);
- g_queue_delete_link (&aggpad->priv->buffers, item);
+ g_queue_delete_link (&aggpad->priv->data, item);
}
item = next;
}
GstFlowReturn flow_return = GST_FLOW_OK;
gboolean processed_event = FALSE;
- gst_aggregator_iterate_sinkpads (self, check_events, NULL);
+ gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
+ NULL);
if (!gst_aggregator_wait_and_check (self, &timeout))
continue;
- gst_aggregator_iterate_sinkpads (self, check_events, &processed_event);
+ gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
+ &processed_event);
if (processed_event)
continue;
GstPad *pad = GST_PAD (aggpad);
GstAggregatorPrivate *priv = self->priv;
+ GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
+
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
}
case GST_EVENT_FLUSH_STOP:
{
- GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
-
gst_aggregator_pad_flush (aggpad, self);
GST_OBJECT_LOCK (self);
if (priv->flush_seeking) {
}
case GST_EVENT_EOS:
{
- GST_DEBUG_OBJECT (aggpad, "EOS");
-
/* We still have a buffer, and we don't want the subclass to have to
* check for it. Mark pending_eos, eos will be set when steal_buffer is
* called
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
+ /* Remove GAP event so we can replace it with the buffer */
+ if (g_queue_peek_tail (&aggpad->priv->data) == event)
+ gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
+
if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
GST_FLOW_OK) {
GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
}
static void
if (gst_aggregator_pad_has_space (self, aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
if (head)
- g_queue_push_head (&aggpad->priv->buffers, buffer);
+ g_queue_push_head (&aggpad->priv->data, buffer);
else
- g_queue_push_tail (&aggpad->priv->buffers, buffer);
+ g_queue_push_tail (&aggpad->priv->data, buffer);
apply_buffer (aggpad, buffer, head);
aggpad->priv->num_buffers++;
buffer = NULL;
goto flushing;
}
- g_queue_push_head (&aggpad->priv->buffers, query);
+ g_queue_push_head (&aggpad->priv->data, query);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
gst_structure_remove_field (s, "gst-aggregator-retval");
else
- g_queue_remove (&aggpad->priv->buffers, query);
+ g_queue_remove (&aggpad->priv->data, query);
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
event);
- g_queue_push_head (&aggpad->priv->buffers, event);
+ g_queue_push_head (&aggpad->priv->data, event);
event = NULL;
SRC_BROADCAST (self);
}
G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
GstAggregatorPadPrivate);
- g_queue_init (&pad->priv->buffers);
+ g_queue_init (&pad->priv->data);
g_cond_init (&pad->priv->event_cond);
g_mutex_init (&pad->priv->flush_lock);
GstBuffer *buffer = NULL;
while (pad->priv->clipped_buffer == NULL &&
- GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
- buffer = g_queue_pop_tail (&pad->priv->buffers);
+ GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+ buffer = g_queue_pop_tail (&pad->priv->data);
apply_buffer (pad, buffer, FALSE);
gst_aggregator_pad_clip_buffer_unlocked (pad);
buffer = pad->priv->clipped_buffer;
- pad->priv->clipped_buffer = NULL;
if (buffer) {
+ pad->priv->clipped_buffer = NULL;
gst_aggregator_pad_buffer_consumed (pad);
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
}