/**
* SECTION: gstaggregator
* @title: GstAggregator
- * @short_description: manages a set of pads with the purpose of
- * aggregating their buffers.
+ * @short_description: Base class for mixers and muxers, manages a set of input
+ * pads and aggregates their streams
* @see_also: gstcollectpads for historical reasons.
*
* Manages a set of pads with the purpose of aggregating their buffers.
* * Base class for mixers and muxers. Subclasses should at least implement
* the #GstAggregatorClass.aggregate() virtual method.
*
- * * When data is queued on all pads, tha aggregate vmethod is called.
+ * * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
+ * #GstPadQueryFunction to queue all serialized data packets per sink pad.
+ * Subclasses should not overwrite those, but instead implement
+ * #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
+ * needed.
+ *
+ * * When data is queued on all pads, the aggregate vmethod is called.
*
* * One can peek at the data on any given GstAggregatorPad with the
- * gst_aggregator_pad_get_buffer () method, and take ownership of it
- * with the gst_aggregator_pad_steal_buffer () method. When a buffer
- * has been taken with steal_buffer (), a new buffer can be queued
+ * gst_aggregator_pad_peek_buffer () method, and remove it from the pad
+ * with the gst_aggregator_pad_pop_buffer () method. When a buffer
+ * has been taken with pop_buffer (), a new buffer can be queued
* on that pad.
*
* * If the subclass wishes to push a buffer downstream in its aggregate
* flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
* to ease their identification and subsequent processing.
*
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
+ */
+
+/**
+ * SECTION: gstaggregatorpad
+ * @title: GstAggregatorPad
+ * @short_description: #GstPad subclass for pads managed by #GstAggregator
+ * @see_also: gstcollectpads for historical reasons.
+ *
+ * Pads managed by a #GstAggregor subclass.
+ *
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
*/
#ifdef HAVE_CONFIG_H
}
/* Might become API */
+#if 0
static void gst_aggregator_merge_tags (GstAggregator * aggregator,
const GstTagList * tags, GstTagMergeMode mode);
+#endif
static void gst_aggregator_set_latency_property (GstAggregator * agg,
- gint64 latency);
-static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
+ GstClockTime latency);
+static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
+
+static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
+GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
+#define GST_CAT_DEFAULT aggregator_debug
/* Locking order, locks in this element must always be taken in this order
*
* standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
*/
-
-static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
-
-GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
-#define GST_CAT_DEFAULT aggregator_debug
-
/* GstAggregatorPad definitions */
#define PAD_LOCK(pad) G_STMT_START { \
GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \
GstFlowReturn flow_return;
gboolean pending_flush_start;
gboolean pending_flush_stop;
- gboolean pending_eos;
gboolean first_buffer;
- GQueue buffers;
+ GQueue data; /* buffers, events and queries */
GstBuffer *clipped_buffer;
guint num_buffers;
+
+ /* used to track fill state of queues, only used with live-src and when
+ * latency property is set to > 0 */
GstClockTime head_position;
GstClockTime tail_position;
- GstClockTime head_time;
+ GstClockTime head_time; /* running time */
GstClockTime tail_time;
- GstClockTime time_level;
+ GstClockTime time_level; /* how much head is ahead of tail */
GstSegment head_segment; /* segment before the queue */
gboolean negotiated;
static void
gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
{
- aggpad->priv->pending_eos = FALSE;
aggpad->priv->eos = FALSE;
aggpad->priv->flow_return = GST_FLOW_OK;
GST_OBJECT_LOCK (aggpad);
/* Our state is >= PAUSED */
gboolean running; /* protected by src_lock */
+ /* seqnum from seek or segment,
+ * to be applied to synthetic segment/eos events */
gint seqnum;
gboolean send_stream_start; /* protected by srcpad stream lock */
gboolean send_segment;
gint64 latency; /* protected by both src_lock and all pad locks */
};
+/* Seek event forwarding helper */
typedef struct
{
+ /* parameters */
GstEvent *event;
- gboolean result;
gboolean flush;
gboolean only_to_active_pads;
+ /* results */
+ gboolean result;
gboolean one_actually_seeked;
} EventData;
static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
-/**
- * gst_aggregator_iterate_sinkpads:
- * @self: The #GstAggregator
- * @func: (scope call): The function to call.
- * @user_data: (closure): The data to pass to @func.
- *
- * Iterate the sinkpads of aggregator to call a function on them.
- *
- * This method guarantees that @func will be called only once for each
- * sink pad.
- *
- * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE
- */
-gboolean
-gst_aggregator_iterate_sinkpads (GstAggregator * self,
- GstAggregatorPadForeachFunc func, gpointer user_data)
-{
- gboolean result = FALSE;
- GstIterator *iter;
- gboolean done = FALSE;
- GValue item = { 0, };
- GList *seen_pads = NULL;
-
- iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
-
- if (!iter)
- goto no_iter;
-
- while (!done) {
- switch (gst_iterator_next (iter, &item)) {
- case GST_ITERATOR_OK:
- {
- GstAggregatorPad *pad;
-
- pad = g_value_get_object (&item);
-
- /* if already pushed, skip. FIXME, find something faster to tag pads */
- if (pad == NULL || g_list_find (seen_pads, pad)) {
- g_value_reset (&item);
- break;
- }
-
- GST_LOG_OBJECT (pad, "calling function %s on pad",
- GST_DEBUG_FUNCPTR_NAME (func));
-
- result = func (self, pad, user_data);
-
- done = !result;
-
- seen_pads = g_list_prepend (seen_pads, pad);
-
- g_value_reset (&item);
- break;
- }
- case GST_ITERATOR_RESYNC:
- gst_iterator_resync (iter);
- break;
- case GST_ITERATOR_ERROR:
- GST_ERROR_OBJECT (self,
- "Could not iterate over internally linked pads");
- done = TRUE;
- break;
- case GST_ITERATOR_DONE:
- done = TRUE;
- break;
- }
- }
- g_value_unset (&item);
- gst_iterator_free (iter);
-
- if (seen_pads == NULL) {
- GST_DEBUG_OBJECT (self, "No pad seen");
- return FALSE;
- }
-
- g_list_free (seen_pads);
-
-no_iter:
- return result;
-}
-
static gboolean
gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
{
- return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
+ return (g_queue_peek_tail (&pad->priv->data) == NULL &&
pad->priv->clipped_buffer == NULL);
}
static gboolean
gst_aggregator_check_pads_ready (GstAggregator * self)
{
- GstAggregatorPad *pad;
+ GstAggregatorPad *pad = NULL;
GList *l, *sinkpads;
gboolean have_buffer = TRUE;
- gboolean have_event = FALSE;
+ gboolean have_event_or_query = FALSE;
GST_LOG_OBJECT (self, "checking pads");
if (pad->priv->num_buffers == 0) {
if (!gst_aggregator_pad_queue_is_empty (pad))
- have_event = TRUE;
+ have_event_or_query = TRUE;
if (!pad->priv->eos) {
have_buffer = FALSE;
PAD_UNLOCK (pad);
}
- if (!have_buffer && !have_event)
+ if (!have_buffer && !have_event_or_query)
goto pad_not_ready;
if (have_buffer)
}
pad_not_ready:
{
- if (have_event)
+ if (have_event_or_query)
GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
" but waking up for serialized event");
else
GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
GST_OBJECT_UNLOCK (self);
- return have_event;
+ return have_event_or_query;
}
}
segment = gst_event_new_segment (&self->segment);
if (!self->priv->seqnum)
+ /* This code-path is in preparation to be able to run without a source
+ * connected. Then we won't have a seq-num from a segment event. */
self->priv->seqnum = gst_event_get_seqnum (segment);
else
gst_event_set_seqnum (segment, self->priv->seqnum);
GST_PAD_STREAM_UNLOCK (self->srcpad);
}
-/**
- * gst_aggregator_finish_buffer:
- * @self: The #GstAggregator
- * @buffer: (transfer full): the #GstBuffer to push.
- *
- * This method will push the provided output buffer downstream. If needed,
- * mandatory events such as stream-start, caps, and segment events will be
- * sent before pushing the buffer.
- */
-GstFlowReturn
-gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
+static GstFlowReturn
+gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
{
gst_aggregator_push_mandatory_events (self);
}
}
+/**
+ * gst_aggregator_finish_buffer:
+ * @aggregator: The #GstAggregator
+ * @buffer: (transfer full): the #GstBuffer to push.
+ *
+ * This method will push the provided output buffer downstream. If needed,
+ * mandatory events such as stream-start, caps, and segment events will be
+ * sent before pushing the buffer.
+ */
+GstFlowReturn
+gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
+{
+ GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
+
+ g_assert (klass->finish_buffer != NULL);
+
+ return klass->finish_buffer (aggregator, buffer);
+}
+
static void
gst_aggregator_push_eos (GstAggregator * self)
{
}
static gboolean
-check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
+gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
+ gpointer user_data)
{
+ GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
+ GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
GstEvent *event = NULL;
GstQuery *query = NULL;
GstAggregatorClass *klass = NULL;
do {
event = NULL;
+ query = NULL;
PAD_LOCK (pad);
- if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
- pad->priv->pending_eos = FALSE;
- pad->priv->eos = TRUE;
- }
if (pad->priv->clipped_buffer == NULL &&
- !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
- if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers)))
- event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers));
- if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->buffers)))
- query = g_queue_peek_tail (&pad->priv->buffers);
+ !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+ if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
+ event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
+ if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
+ query = g_queue_peek_tail (&pad->priv->data);
}
PAD_UNLOCK (pad);
if (event || query) {
if (event) {
GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
gst_event_ref (event);
- ret = klass->sink_event (self, pad, event);
+ ret = klass->sink_event (aggregator, pad, event);
PAD_LOCK (pad);
if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
pad->priv->negotiated = ret;
- if (g_queue_peek_tail (&pad->priv->buffers) == event)
- gst_event_unref (g_queue_pop_tail (&pad->priv->buffers));
+ if (g_queue_peek_tail (&pad->priv->data) == event)
+ gst_event_unref (g_queue_pop_tail (&pad->priv->data));
gst_event_unref (event);
- }
-
- if (query) {
+ } else if (query) {
GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
- ret = klass->sink_query (self, pad, query);
+ ret = klass->sink_query (aggregator, pad, query);
PAD_LOCK (pad);
- if (g_queue_peek_tail (&pad->priv->buffers) == query) {
+ if (g_queue_peek_tail (&pad->priv->data) == query) {
GstStructure *s;
s = gst_query_writable_structure (query);
gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
NULL);
- g_queue_pop_tail (&pad->priv->buffers);
+ g_queue_pop_tail (&pad->priv->data);
}
}
PAD_BROADCAST_EVENT (pad);
PAD_UNLOCK (pad);
}
- } while (event != NULL);
+ } while (event || query);
return TRUE;
}
else
aggpad->priv->flow_return = flow_return;
- item = g_queue_peek_head_link (&aggpad->priv->buffers);
+ item = g_queue_peek_head_link (&aggpad->priv->data);
while (item) {
GList *next = item->next;
!GST_EVENT_IS_STICKY (item->data)) {
if (!GST_IS_QUERY (item->data))
gst_mini_object_unref (item->data);
- g_queue_delete_link (&aggpad->priv->buffers, item);
+ g_queue_delete_link (&aggpad->priv->data, item);
}
item = next;
}
GstFlowReturn flow_return = GST_FLOW_OK;
gboolean processed_event = FALSE;
- gst_aggregator_iterate_sinkpads (self, check_events, NULL);
+ gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+ gst_aggregator_do_events_and_queries, NULL);
+ /* Ensure we have buffers ready (either in clipped_buffer or at the head of
+ * the queue */
if (!gst_aggregator_wait_and_check (self, &timeout))
continue;
- gst_aggregator_iterate_sinkpads (self, check_events, &processed_event);
+ gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+ gst_aggregator_do_events_and_queries, &processed_event);
+
if (processed_event)
continue;
static void
update_time_level (GstAggregatorPad * aggpad, gboolean head)
{
+ GstAggregatorPadPrivate *priv = aggpad->priv;
+
if (head) {
- if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
- aggpad->priv->head_segment.format == GST_FORMAT_TIME)
- aggpad->priv->head_time =
- gst_segment_to_running_time (&aggpad->priv->head_segment,
- GST_FORMAT_TIME, aggpad->priv->head_position);
+ if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
+ priv->head_segment.format == GST_FORMAT_TIME)
+ priv->head_time = gst_segment_to_running_time (&priv->head_segment,
+ GST_FORMAT_TIME, priv->head_position);
else
- aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
+ priv->head_time = GST_CLOCK_TIME_NONE;
- if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
- aggpad->priv->tail_time = aggpad->priv->head_time;
+ if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
+ priv->tail_time = priv->head_time;
} else {
- if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
+ if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
aggpad->segment.format == GST_FORMAT_TIME)
- aggpad->priv->tail_time =
- gst_segment_to_running_time (&aggpad->segment,
- GST_FORMAT_TIME, aggpad->priv->tail_position);
+ priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
+ GST_FORMAT_TIME, priv->tail_position);
else
- aggpad->priv->tail_time = aggpad->priv->head_time;
+ priv->tail_time = priv->head_time;
}
- if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
- aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
- aggpad->priv->time_level = 0;
+ if (priv->head_time == GST_CLOCK_TIME_NONE ||
+ priv->tail_time == GST_CLOCK_TIME_NONE) {
+ priv->time_level = 0;
return;
}
- if (aggpad->priv->tail_time > aggpad->priv->head_time)
- aggpad->priv->time_level = 0;
+ if (priv->tail_time > priv->head_time)
+ priv->time_level = 0;
else
- aggpad->priv->time_level = aggpad->priv->head_time -
- aggpad->priv->tail_time;
+ priv->time_level = priv->head_time - priv->tail_time;
}
}
case GST_EVENT_EOS:
{
- /* We still have a buffer, and we don't want the subclass to have to
- * check for it. Mark pending_eos, eos will be set when steal_buffer is
- * called
- */
SRC_LOCK (self);
PAD_LOCK (aggpad);
- if (aggpad->priv->num_buffers == 0) {
- aggpad->priv->eos = TRUE;
- } else {
- aggpad->priv->pending_eos = TRUE;
- }
+ g_assert (aggpad->priv->num_buffers == 0);
+ aggpad->priv->eos = TRUE;
PAD_UNLOCK (aggpad);
-
SRC_BROADCAST (self);
SRC_UNLOCK (self);
goto eat;
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
/* Remove GAP event so we can replace it with the buffer */
- if (g_queue_peek_tail (&aggpad->priv->buffers) == event)
- gst_event_unref (g_queue_pop_tail (&aggpad->priv->buffers));
+ if (g_queue_peek_tail (&aggpad->priv->data) == event)
+ gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
GST_FLOW_OK) {
goto eat;
}
case GST_EVENT_TAG:
- {
- GstTagList *tags;
-
- gst_event_parse_tag (event, &tags);
-
- if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
- gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
- gst_event_unref (event);
- event = NULL;
- goto eat;
- }
- break;
- }
+ goto eat;
default:
{
break;
return res;
}
-static inline gboolean
-gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
- gpointer unused_udata)
+static gboolean
+gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
{
- gst_aggregator_pad_flush (pad, self);
+ GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
+ GstAggregator *agg = GST_AGGREGATOR_CAST (self);
+
+ gst_aggregator_pad_flush (pad, agg);
PAD_LOCK (pad);
pad->priv->flow_return = GST_FLOW_FLUSHING;
gst_aggregator_reset_flow_values (agg);
- gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
+ /* Application needs to make sure no pads are added while it shuts us down */
+ gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
+ gst_aggregator_stop_pad, NULL);
klass = GST_AGGREGATOR_GET_CLASS (agg);
agg->priv->has_peer_latency = FALSE;
agg->priv->peer_latency_live = FALSE;
- agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
+ agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
if (agg->priv->tags)
gst_tag_list_unref (agg->priv->tags);
GstAggregatorPrivate *priv = self->priv;
gint serial = 0;
gchar *name = NULL;
+ GType pad_type =
+ GST_PAD_TEMPLATE_GTYPE (templ) ==
+ G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
if (templ->direction != GST_PAD_SINK)
goto not_sink;
}
name = g_strdup_printf ("sink_%u", serial);
- agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
+ agg_pad = g_object_new (pad_type,
"name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
g_free (name);
gst_query_parse_latency (query, &live, &min, &max);
- our_latency = self->priv->latency;
-
if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
return FALSE;
}
+ our_latency = self->priv->latency;
+
self->priv->peer_latency_live = live;
self->priv->peer_latency_min = min;
self->priv->peer_latency_max = max;
return FALSE;
}
-static EventData
+static void
gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
- GstEvent * event, gboolean flush, gboolean only_to_active_pads)
+ EventData * evdata)
{
- EventData evdata;
-
- evdata.event = event;
- evdata.result = TRUE;
- evdata.flush = flush;
- evdata.one_actually_seeked = FALSE;
- evdata.only_to_active_pads = only_to_active_pads;
+ evdata->result = TRUE;
+ evdata->one_actually_seeked = FALSE;
/* We first need to set all pads as flushing in a first pass
* as flush_start flush_stop is sometimes sent synchronously
* while we send the seek event */
- if (flush) {
+ if (evdata->flush) {
GList *l;
GST_OBJECT_LOCK (self);
GST_OBJECT_UNLOCK (self);
}
- gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
-
- gst_event_unref (event);
+ gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
- return evdata;
+ gst_event_unref (evdata->event);
}
static gboolean
GstSeekType start_type, stop_type;
gint64 start, stop;
gboolean flush;
- EventData evdata;
+ EventData evdata = { 0, };
GstAggregatorPrivate *priv = self->priv;
gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
GST_OBJECT_UNLOCK (self);
/* forward the seek upstream */
- evdata =
- gst_aggregator_forward_event_to_all_sinkpads (self, event, flush, FALSE);
+ evdata.event = event;
+ evdata.flush = flush;
+ evdata.only_to_active_pads = FALSE;
+ gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
event = NULL;
if (!evdata.result || !evdata.one_actually_seeked) {
static gboolean
gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
{
- EventData evdata;
- gboolean res = TRUE;
+ EventData evdata = { 0, };
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
- {
- gst_event_ref (event);
- res = gst_aggregator_do_seek (self, event);
- gst_event_unref (event);
- event = NULL;
- goto done;
- }
+ /* _do_seek() unrefs the event. */
+ return gst_aggregator_do_seek (self, event);
case GST_EVENT_NAVIGATION:
- {
/* navigation is rather pointless. */
- res = FALSE;
gst_event_unref (event);
- goto done;
- }
+ return FALSE;
default:
- {
break;
- }
}
/* Don't forward QOS events to pads that had no active buffer yet. Otherwise
* they will receive a QOS event that has earliest_time=0 (because we can't
* have negative timestamps), and consider their buffer as too late */
- evdata =
- gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE,
- GST_EVENT_TYPE (event) == GST_EVENT_QOS);
- res = evdata.result;
-
-done:
- return res;
+ evdata.event = event;
+ evdata.flush = FALSE;
+ evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
+ gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
+ return evdata.result;
}
static gboolean
* as unresponsive.
*/
static void
-gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
+gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
{
gboolean changed;
* before a pad is deemed unresponsive. A value of -1 means an
* unlimited time.
*/
-static gint64
+static GstClockTime
gst_aggregator_get_latency_property (GstAggregator * agg)
{
- gint64 res;
+ GstClockTime res;
- g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
+ g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
GST_OBJECT_LOCK (agg);
res = agg->priv->latency;
switch (prop_id) {
case PROP_LATENCY:
- gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
+ gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
break;
case PROP_START_TIME_SELECTION:
agg->priv->start_time_selection = g_value_get_enum (value);
switch (prop_id) {
case PROP_LATENCY:
- g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
+ g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
break;
case PROP_START_TIME_SELECTION:
g_value_set_enum (value, agg->priv->start_time_selection);
GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
GST_DEBUG_FG_MAGENTA, "GstAggregator");
- klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
+ klass->finish_buffer = gst_aggregator_default_finish_buffer;
klass->sink_event = gst_aggregator_default_sink_event;
klass->sink_query = gst_aggregator_default_sink_query;
gobject_class->finalize = gst_aggregator_finalize;
g_object_class_install_property (gobject_class, PROP_LATENCY,
- g_param_spec_int64 ("latency", "Buffer latency",
+ g_param_spec_uint64 ("latency", "Buffer latency",
"Additional latency in live mode to allow upstream "
"to take longer to produce buffers for the current "
- "position (in nanoseconds)", 0,
- (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
+ "position (in nanoseconds)", 0, G_MAXUINT64,
DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
"Start time to use if start-time-selection=set", 0,
G_MAXUINT64,
DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
- GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
}
static void
update_time_level (aggpad, head);
}
+/*
+ * Can be called either from the sinkpad's chain function or from the srcpad's
+ * thread in the case of a buffer synthetized from a GAP event.
+ * Because of this second case, FLUSH_LOCK can't be used here.
+ */
+
static GstFlowReturn
gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
{
GstFlowReturn flow_return;
GstClockTime buf_pts;
+ GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
- PAD_FLUSH_LOCK (aggpad);
-
PAD_LOCK (aggpad);
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
goto flushing;
- if (aggpad->priv->pending_eos == TRUE)
- goto eos;
+ if (klass->skip_buffer && klass->skip_buffer (aggpad, self, buffer))
+ goto skipped;
PAD_UNLOCK (aggpad);
aggpad->priv->first_buffer = FALSE;
}
- if (gst_aggregator_pad_has_space (self, aggpad)
+ if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
if (head)
- g_queue_push_head (&aggpad->priv->buffers, buffer);
+ g_queue_push_head (&aggpad->priv->data, buffer);
else
- g_queue_push_tail (&aggpad->priv->buffers, buffer);
+ g_queue_push_tail (&aggpad->priv->data, buffer);
apply_buffer (aggpad, buffer, head);
aggpad->priv->num_buffers++;
buffer = NULL;
GST_OBJECT_UNLOCK (self);
SRC_UNLOCK (self);
- PAD_FLUSH_UNLOCK (aggpad);
-
GST_DEBUG_OBJECT (aggpad, "Done chaining");
return flow_return;
flushing:
PAD_UNLOCK (aggpad);
- PAD_FLUSH_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
gst_flow_get_name (flow_return));
return flow_return;
-eos:
+skipped:
PAD_UNLOCK (aggpad);
- PAD_FLUSH_UNLOCK (aggpad);
+ GST_DEBUG_OBJECT (aggpad, "Skipped buffer %" GST_PTR_FORMAT, buffer);
gst_buffer_unref (buffer);
- GST_DEBUG_OBJECT (aggpad, "We are EOS already...");
- return GST_FLOW_EOS;
+ return GST_FLOW_OK;
}
static GstFlowReturn
gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
{
- return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
- GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
+ GstFlowReturn ret;
+ GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
+
+ PAD_FLUSH_LOCK (aggpad);
+
+ ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
+ aggpad, buffer, TRUE);
+
+ PAD_FLUSH_UNLOCK (aggpad);
+
+ return ret;
}
static gboolean
{
GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
- GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
if (GST_QUERY_IS_SERIALIZED (query)) {
GstStructure *s;
goto flushing;
}
- g_queue_push_head (&aggpad->priv->buffers, query);
+ g_queue_push_head (&aggpad->priv->data, query);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
gst_structure_remove_field (s, "gst-aggregator-retval");
else
- g_queue_remove (&aggpad->priv->buffers, query);
+ g_queue_remove (&aggpad->priv->data, query);
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
PAD_UNLOCK (aggpad);
return ret;
- }
+ } else {
+ GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
- return klass->sink_query (self, aggpad, query);
+ return klass->sink_query (self, aggpad, query);
+ }
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
return FALSE;
}
+/* Queue serialized events and let the others go through directly.
+ * The queued events with be handled from the src-pad task in
+ * gst_aggregator_do_events_and_queries().
+ */
static GstFlowReturn
gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
GstEvent * event)
GstFlowReturn ret = GST_FLOW_OK;
GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
- GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
- if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
- /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) {
+ if (GST_EVENT_IS_SERIALIZED (event)
+ && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
SRC_LOCK (self);
PAD_LOCK (aggpad);
- if (aggpad->priv->flow_return != GST_FLOW_OK
- && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
- ret = aggpad->priv->flow_return;
+ if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
- }
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
GST_OBJECT_LOCK (aggpad);
GST_OBJECT_UNLOCK (aggpad);
}
- if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
- GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
- event);
- g_queue_push_head (&aggpad->priv->buffers, event);
- event = NULL;
- SRC_BROADCAST (self);
- }
+ GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
+ g_queue_push_head (&aggpad->priv->data, event);
+ SRC_BROADCAST (self);
PAD_UNLOCK (aggpad);
SRC_UNLOCK (self);
- }
+ } else {
+ GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
- if (event) {
if (!klass->sink_event (self, aggpad, event)) {
/* Copied from GstPad to convert boolean to a GstFlowReturn in
* the event handling func */
gst_pad_store_sticky_event (pad, event);
gst_event_unref (event);
- return ret;
+ return aggpad->priv->flow_return;
}
static gboolean
G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
GstAggregatorPadPrivate);
- g_queue_init (&pad->priv->buffers);
+ g_queue_init (&pad->priv->data);
g_cond_init (&pad->priv->event_cond);
g_mutex_init (&pad->priv->flush_lock);
{
pad->priv->num_buffers--;
GST_TRACE_OBJECT (pad, "Consuming buffer");
- if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
- pad->priv->pending_eos = FALSE;
- pad->priv->eos = TRUE;
- }
PAD_BROADCAST_EVENT (pad);
}
GstBuffer *buffer = NULL;
while (pad->priv->clipped_buffer == NULL &&
- GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
- buffer = g_queue_pop_tail (&pad->priv->buffers);
+ GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+ buffer = g_queue_pop_tail (&pad->priv->data);
apply_buffer (pad, buffer, FALSE);
}
/**
- * gst_aggregator_pad_steal_buffer:
+ * gst_aggregator_pad_pop_buffer:
* @pad: the pad to get buffer from
*
* Steal the ref to the buffer currently queued in @pad.
* queued. You should unref the buffer after usage.
*/
GstBuffer *
-gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer;
gst_aggregator_pad_clip_buffer_unlocked (pad);
buffer = pad->priv->clipped_buffer;
- pad->priv->clipped_buffer = NULL;
if (buffer) {
+ pad->priv->clipped_buffer = NULL;
gst_aggregator_pad_buffer_consumed (pad);
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
}
{
GstBuffer *buf;
- buf = gst_aggregator_pad_steal_buffer (pad);
+ buf = gst_aggregator_pad_pop_buffer (pad);
if (buf == NULL)
return FALSE;
}
/**
- * gst_aggregator_pad_get_buffer:
+ * gst_aggregator_pad_peek_buffer:
* @pad: the pad to get buffer from
*
* Returns: (transfer full): A reference to the buffer in @pad or
* usage.
*/
GstBuffer *
-gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer;
return buffer;
}
+/**
+ * gst_aggregator_pad_is_eos:
+ * @pad: an aggregator pad
+ *
+ * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
+ */
gboolean
gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
{
return is_eos;
}
-/**
+#if 0
+/*
* gst_aggregator_merge_tags:
* @self: a #GstAggregator
* @tags: a #GstTagList to merge
self->priv->tags_changed = TRUE;
GST_OBJECT_UNLOCK (self);
}
+#endif
/**
* gst_aggregator_set_latency: