*/
/**
* SECTION: gstaggregator
+ * @title: GstAggregator
* @short_description: manages a set of pads with the purpose of
* aggregating their buffers.
* @see_also: gstcollectpads for historical reasons.
*
* Manages a set of pads with the purpose of aggregating their buffers.
* Control is given to the subclass when all pads have data.
- * <itemizedlist>
- * <listitem><para>
- * Base class for mixers and muxers. Subclasses should at least implement
+ *
+ * * Base class for mixers and muxers. Subclasses should at least implement
* the #GstAggregatorClass.aggregate() virtual method.
- * </para></listitem>
- * <listitem><para>
- * When data is queued on all pads, tha aggregate vmethod is called.
- * </para></listitem>
- * <listitem><para>
- * One can peek at the data on any given GstAggregatorPad with the
+ *
+ * * When data is queued on all pads, tha aggregate vmethod is called.
+ *
+ * * One can peek at the data on any given GstAggregatorPad with the
* gst_aggregator_pad_get_buffer () method, and take ownership of it
* with the gst_aggregator_pad_steal_buffer () method. When a buffer
* has been taken with steal_buffer (), a new buffer can be queued
* on that pad.
- * </para></listitem>
- * <listitem><para>
- * If the subclass wishes to push a buffer downstream in its aggregate
+ *
+ * * If the subclass wishes to push a buffer downstream in its aggregate
* implementation, it should do so through the
* gst_aggregator_finish_buffer () method. This method will take care
* of sending and ordering mandatory events such as stream start, caps
* and segment.
- * </para></listitem>
- * <listitem><para>
- * Same goes for EOS events, which should not be pushed directly by the
+ *
+ * * Same goes for EOS events, which should not be pushed directly by the
* subclass, it should instead return GST_FLOW_EOS in its aggregate
* implementation.
- * </para></listitem>
- * <listitem><para>
- * Note that the aggregator logic regarding gap event handling is to turn
+ *
+ * * Note that the aggregator logic regarding gap event handling is to turn
* these into gap buffers with matching PTS and duration. It will also
* flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
* to ease their identification and subsequent processing.
- * </para></listitem>
- * </itemizedlist>
+ *
*/
#ifdef HAVE_CONFIG_H
GMutex flush_lock;
};
-static gboolean
-gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
+/* Must be called with PAD_LOCK held */
+static void
+gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
{
- GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
-
- PAD_LOCK (aggpad);
aggpad->priv->pending_eos = FALSE;
aggpad->priv->eos = FALSE;
aggpad->priv->flow_return = GST_FLOW_OK;
aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
aggpad->priv->time_level = 0;
+ aggpad->priv->first_buffer = TRUE;
+}
+
+static gboolean
+gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
+{
+ GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
+
+ PAD_LOCK (aggpad);
+ gst_aggregator_pad_reset_unlocked (aggpad);
PAD_UNLOCK (aggpad);
if (klass->flush)
{
GstAggregatorPad *pad;
GList *l, *sinkpads;
- gboolean have_data = TRUE;
+ gboolean have_buffer = TRUE;
+ gboolean have_event = FALSE;
GST_LOG_OBJECT (self, "checking pads");
PAD_LOCK (pad);
- if (gst_aggregator_pad_queue_is_empty (pad)) {
+ if (pad->priv->num_buffers == 0) {
+ if (!gst_aggregator_pad_queue_is_empty (pad))
+ have_event = TRUE;
if (!pad->priv->eos) {
- have_data = FALSE;
+ have_buffer = FALSE;
/* If not live we need data on all pads, so leave the loop */
if (!self->priv->peer_latency_live) {
PAD_UNLOCK (pad);
}
- if (!have_data)
+ if (!have_buffer && !have_event)
goto pad_not_ready;
- self->priv->first_buffer = FALSE;
+ if (have_buffer)
+ self->priv->first_buffer = FALSE;
GST_OBJECT_UNLOCK (self);
GST_LOG_OBJECT (self, "pads are ready");
}
pad_not_ready:
{
- GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
+ if (have_event)
+ GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
+ " but waking up for serialized event");
+ else
+ GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
GST_OBJECT_UNLOCK (self);
- return FALSE;
+ return have_event;
}
}
GST_FORMAT_TIME, aggpad->priv->head_position);
else
aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
+
+ if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
+ aggpad->priv->tail_time = aggpad->priv->head_time;
} else {
if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
aggpad->segment.format == GST_FORMAT_TIME)
GST_OBJECT_UNLOCK (self);
}
- aggpad->priv->first_buffer = TRUE;
-
/* We never forward the event */
goto eat;
}
gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
{
- GstBuffer *actual_buf = buffer;
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
GstFlowReturn flow_return;
GstClockTime buf_pts;
PAD_UNLOCK (aggpad);
if (aggclass->clip && head) {
- aggclass->clip (self, aggpad, buffer, &actual_buf);
+ buffer = aggclass->clip (self, aggpad, buffer);
}
- if (actual_buf == NULL) {
- GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function");
+ if (buffer == NULL) {
+ GST_LOG_OBJECT (aggpad, "Buffer dropped by clip function");
goto done;
}
- buf_pts = GST_BUFFER_PTS (actual_buf);
+ buf_pts = GST_BUFFER_PTS (buffer);
aggpad->priv->first_buffer = FALSE;
if (gst_aggregator_pad_has_space (self, aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
if (head)
- g_queue_push_head (&aggpad->priv->buffers, actual_buf);
+ g_queue_push_head (&aggpad->priv->buffers, buffer);
else
- g_queue_push_tail (&aggpad->priv->buffers, actual_buf);
- apply_buffer (aggpad, actual_buf, head);
+ g_queue_push_tail (&aggpad->priv->buffers, buffer);
+ apply_buffer (aggpad, buffer, head);
aggpad->priv->num_buffers++;
- actual_buf = buffer = NULL;
+ buffer = NULL;
SRC_BROADCAST (self);
break;
}
}
if (event) {
+ gboolean is_caps = (GST_EVENT_TYPE (event) == GST_EVENT_CAPS);
+
if (!klass->sink_event (self, aggpad, event)) {
/* Copied from GstPad to convert boolean to a GstFlowReturn in
* the event handling func */
- switch (GST_EVENT_TYPE (event)) {
- case GST_EVENT_CAPS:
- ret = GST_FLOW_NOT_NEGOTIATED;
- break;
- default:
- ret = GST_FLOW_ERROR;
- break;
- }
+ ret = is_caps ? GST_FLOW_NOT_NEGOTIATED : GST_FLOW_ERROR;
}
}
gst_pad_set_chain_function (pad,
GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
gst_pad_set_event_full_function_full (pad,
- GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func),
- NULL, NULL);
+ GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
gst_pad_set_query_function (pad,
GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
gst_pad_set_activatemode_function (pad,
g_mutex_init (&pad->priv->flush_lock);
g_mutex_init (&pad->priv->lock);
- pad->priv->first_buffer = TRUE;
+ gst_aggregator_pad_reset_unlocked (pad);
}
/**