/**
* SECTION: gstaggregator
* @title: GstAggregator
- * @short_description: manages a set of pads with the purpose of
- * aggregating their buffers.
+ * @short_description: Base class for mixers and muxers, manages a set of input
+ * pads and aggregates their streams
* @see_also: gstcollectpads for historical reasons.
*
* Manages a set of pads with the purpose of aggregating their buffers.
* * When data is queued on all pads, the aggregate vmethod is called.
*
* * One can peek at the data on any given GstAggregatorPad with the
- * gst_aggregator_pad_get_buffer () method, and take ownership of it
- * with the gst_aggregator_pad_steal_buffer () method. When a buffer
- * has been taken with steal_buffer (), a new buffer can be queued
+ * gst_aggregator_pad_peek_buffer () method, and remove it from the pad
+ * with the gst_aggregator_pad_pop_buffer () method. When a buffer
+ * has been taken with pop_buffer (), a new buffer can be queued
* on that pad.
*
* * If the subclass wishes to push a buffer downstream in its aggregate
* flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
* to ease their identification and subsequent processing.
*
+ * * Subclasses must use (a subclass of) #GstAggregatorPad for both their
+ * sink and source pads.
+ * See gst_element_class_add_static_pad_template_with_gtype().
+ *
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
+ */
+
+/**
+ * SECTION: gstaggregatorpad
+ * @title: GstAggregatorPad
+ * @short_description: #GstPad subclass for pads managed by #GstAggregator
+ * @see_also: gstcollectpads for historical reasons.
+ *
+ * Pads managed by a #GstAggregor subclass.
+ *
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
*/
#ifdef HAVE_CONFIG_H
}
/* Might become API */
+#if 0
static void gst_aggregator_merge_tags (GstAggregator * aggregator,
const GstTagList * tags, GstTagMergeMode mode);
+#endif
static void gst_aggregator_set_latency_property (GstAggregator * agg,
GstClockTime latency);
static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
+ GstBuffer * buffer);
+
GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
#define GST_CAT_DEFAULT aggregator_debug
* the chain function is also happening.
*/
GMutex flush_lock;
+
+ /* properties */
+ gboolean emit_signals;
};
/* Must be called with PAD_LOCK held */
PAD_UNLOCK (aggpad);
if (klass->flush)
- return klass->flush (aggpad, agg);
+ return (klass->flush (aggpad, agg) == GST_FLOW_OK);
return TRUE;
}
* GstAggregator implementation *
*************************************/
static GstElementClass *aggregator_parent_class = NULL;
+static gint aggregator_private_offset = 0;
/* All members are protected by the object lock unless otherwise noted */
GstClockTime sub_latency_min; /* protected by src_lock */
GstClockTime sub_latency_max; /* protected by src_lock */
+ GstClockTime upstream_latency_min; /* protected by src_lock */
+
/* aggregate */
GstClockID aggregate_id; /* protected by src_lock */
GMutex src_lock;
} EventData;
#define DEFAULT_LATENCY 0
+#define DEFAULT_MIN_UPSTREAM_LATENCY 0
#define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
#define DEFAULT_START_TIME (-1)
{
PROP_0,
PROP_LATENCY,
+ PROP_MIN_UPSTREAM_LATENCY,
PROP_START_TIME_SELECTION,
PROP_START_TIME,
PROP_LAST
static gboolean
gst_aggregator_check_pads_ready (GstAggregator * self)
{
- GstAggregatorPad *pad;
+ GstAggregatorPad *pad = NULL;
GList *l, *sinkpads;
gboolean have_buffer = TRUE;
gboolean have_event_or_query = FALSE;
GST_OBJECT_LOCK (self);
self->priv->send_stream_start = TRUE;
self->priv->send_segment = TRUE;
- gst_segment_init (&self->segment, GST_FORMAT_TIME);
+ gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
+ GST_FORMAT_TIME);
self->priv->first_buffer = TRUE;
GST_OBJECT_UNLOCK (self);
}
GST_INFO_OBJECT (self, "pushing stream start");
/* stream-start (FIXME: create id based on input ids) */
g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
- if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
+ if (!gst_pad_push_event (GST_PAD (self->srcpad),
+ gst_event_new_stream_start (s_id))) {
GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
}
self->priv->send_stream_start = FALSE;
GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
self->priv->srccaps);
- if (!gst_pad_push_event (self->srcpad,
+ if (!gst_pad_push_event (GST_PAD (self->srcpad),
gst_event_new_caps (self->priv->srccaps))) {
GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
}
GST_OBJECT_LOCK (self);
if (self->priv->send_segment && !self->priv->flush_seeking) {
- segment = gst_event_new_segment (&self->segment);
+ segment =
+ gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
if (!self->priv->seqnum)
/* This code-path is in preparation to be able to run without a source
GST_PAD_STREAM_UNLOCK (self->srcpad);
}
-/**
- * gst_aggregator_finish_buffer:
- * @self: The #GstAggregator
- * @buffer: (transfer full): the #GstBuffer to push.
- *
- * This method will push the provided output buffer downstream. If needed,
- * mandatory events such as stream-start, caps, and segment events will be
- * sent before pushing the buffer.
- */
-GstFlowReturn
-gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
+static GstFlowReturn
+gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
{
gst_aggregator_push_mandatory_events (self);
}
}
+/**
+ * gst_aggregator_finish_buffer:
+ * @aggregator: The #GstAggregator
+ * @buffer: (transfer full): the #GstBuffer to push.
+ *
+ * This method will push the provided output buffer downstream. If needed,
+ * mandatory events such as stream-start, caps, and segment events will be
+ * sent before pushing the buffer.
+ */
+GstFlowReturn
+gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
+{
+ GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
+
+ g_assert (klass->finish_buffer != NULL);
+
+ return klass->finish_buffer (aggregator, buffer);
+}
+
static void
gst_aggregator_push_eos (GstAggregator * self)
{
return res;
}
+typedef struct
+{
+ gboolean processed_event;
+ GstFlowReturn flow_ret;
+} DoHandleEventsAndQueriesData;
+
static gboolean
gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
gpointer user_data)
GstEvent *event = NULL;
GstQuery *query = NULL;
GstAggregatorClass *klass = NULL;
- gboolean *processed_event = user_data;
+ DoHandleEventsAndQueriesData *data = user_data;
do {
event = NULL;
if (event || query) {
gboolean ret;
- if (processed_event)
- *processed_event = TRUE;
+ data->processed_event = TRUE;
if (klass == NULL)
klass = GST_AGGREGATOR_GET_CLASS (self);
ret = klass->sink_event (aggregator, pad, event);
PAD_LOCK (pad);
- if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
+ if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
pad->priv->negotiated = ret;
+ if (!ret)
+ pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED;
+ }
if (g_queue_peek_tail (&pad->priv->data) == event)
gst_event_unref (g_queue_pop_tail (&pad->priv->data));
gst_event_unref (event);
return TRUE;
}
+static gboolean
+gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
+ gpointer user_data)
+{
+ GList *item;
+ GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
+ GstAggregator *agg = (GstAggregator *) self;
+ GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
+
+ if (!klass->skip_buffer)
+ return FALSE;
+
+ PAD_LOCK (aggpad);
+
+ item = g_queue_peek_head_link (&aggpad->priv->data);
+ while (item) {
+ GList *next = item->next;
+
+ if (GST_IS_BUFFER (item->data)
+ && klass->skip_buffer (aggpad, agg, item->data)) {
+ GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
+ gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data));
+ gst_buffer_unref (item->data);
+ g_queue_delete_link (&aggpad->priv->data, item);
+ } else {
+ break;
+ }
+
+ item = next;
+ }
+
+ PAD_UNLOCK (aggpad);
+
+ return TRUE;
+}
+
static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
GstFlowReturn flow_return, gboolean full)
GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && priv->running) {
GstFlowReturn flow_return = GST_FLOW_OK;
- gboolean processed_event = FALSE;
+ DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
- gst_aggregator_do_events_and_queries, NULL);
+ gst_aggregator_do_events_and_queries, &events_query_data);
+
+ if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+ goto handle_error;
+
+ if (self->priv->peer_latency_live)
+ gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+ gst_aggregator_pad_skip_buffers, NULL);
/* Ensure we have buffers ready (either in clipped_buffer or at the head of
* the queue */
if (!gst_aggregator_wait_and_check (self, &timeout))
continue;
+ events_query_data.processed_event = FALSE;
+ events_query_data.flow_ret = GST_FLOW_OK;
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
- gst_aggregator_do_events_and_queries, &processed_event);
+ gst_aggregator_do_events_and_queries, &events_query_data);
- if (processed_event)
+ if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+ goto handle_error;
+
+ if (events_query_data.processed_event)
continue;
if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
gst_aggregator_push_eos (self);
}
+ handle_error:
GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
if (flow_return != GST_FLOW_OK) {
GstBuffer *gapbuf;
gst_event_parse_gap (event, &pts, &duration);
- gapbuf = gst_buffer_new ();
if (GST_CLOCK_TIME_IS_VALID (duration))
endpts = pts + duration;
else
duration = GST_CLOCK_TIME_NONE;
+ gapbuf = gst_buffer_new ();
GST_BUFFER_PTS (gapbuf) = pts;
GST_BUFFER_DURATION (gapbuf) = duration;
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
/* Remove GAP event so we can replace it with the buffer */
+ PAD_LOCK (aggpad);
if (g_queue_peek_tail (&aggpad->priv->data) == event)
gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
+ PAD_UNLOCK (aggpad);
if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
GST_FLOW_OK) {
goto eat;
}
case GST_EVENT_TAG:
- {
- GstTagList *tags;
-
- gst_event_parse_tag (event, &tags);
-
- if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
- gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
- gst_event_unref (event);
- event = NULL;
- goto eat;
- }
- break;
- }
+ goto eat;
default:
{
break;
GST_OBJECT_LOCK (self);
if (req_name == NULL || strlen (req_name) < 6
- || !g_str_has_prefix (req_name, "sink_")) {
+ || !g_str_has_prefix (req_name, "sink_")
+ || strrchr (req_name, '%') != NULL) {
/* no name given when requesting the pad, use next available int */
serial = ++priv->max_padserial;
} else {
+ gchar *endptr = NULL;
+
/* parse serial number from requested padname */
- serial = g_ascii_strtoull (&req_name[5], NULL, 10);
- if (serial > priv->max_padserial)
- priv->max_padserial = serial;
+ serial = g_ascii_strtoull (&req_name[5], &endptr, 10);
+ if (endptr != NULL && *endptr == '\0') {
+ if (serial > priv->max_padserial) {
+ priv->max_padserial = serial;
+ }
+ } else {
+ serial = ++priv->max_padserial;
+ }
}
name = g_strdup_printf ("sink_%u", serial);
+ g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
agg_pad = g_object_new (pad_type,
"name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
g_free (name);
return FALSE;
}
+ if (self->priv->upstream_latency_min > min) {
+ GstClockTimeDiff diff =
+ GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
+
+ min += diff;
+ if (GST_CLOCK_TIME_IS_VALID (max)) {
+ max += diff;
+ }
+ }
+
if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
&start, &stop_type, &stop);
GST_OBJECT_LOCK (self);
- gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
- stop_type, stop, NULL);
+ gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
+ flags, start_type, start, stop_type, stop, NULL);
self->priv->seqnum = gst_event_get_seqnum (event);
self->priv->first_buffer = FALSE;
GST_OBJECT_UNLOCK (self);
} else {
ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
- gst_object_unref (peer);
}
}
evdata->result &= ret;
+ if (peer)
+ gst_object_unref (peer);
+
/* Always send to all pads */
return FALSE;
}
priv->flush_seeking = TRUE;
}
- gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
- stop_type, stop, NULL);
+ gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
+ flags, start_type, start, stop_type, stop, NULL);
/* Seeking sets a position */
self->priv->first_buffer = FALSE;
case PROP_LATENCY:
gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
break;
+ case PROP_MIN_UPSTREAM_LATENCY:
+ SRC_LOCK (agg);
+ agg->priv->upstream_latency_min = g_value_get_uint64 (value);
+ SRC_UNLOCK (agg);
+ break;
case PROP_START_TIME_SELECTION:
agg->priv->start_time_selection = g_value_get_enum (value);
break;
case PROP_LATENCY:
g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
break;
+ case PROP_MIN_UPSTREAM_LATENCY:
+ SRC_LOCK (agg);
+ g_value_set_uint64 (value, agg->priv->upstream_latency_min);
+ SRC_UNLOCK (agg);
+ break;
case PROP_START_TIME_SELECTION:
g_value_set_enum (value, agg->priv->start_time_selection);
break;
GstElementClass *gstelement_class = (GstElementClass *) klass;
aggregator_parent_class = g_type_class_peek_parent (klass);
- g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
GST_DEBUG_FG_MAGENTA, "GstAggregator");
+ if (aggregator_private_offset != 0)
+ g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
+
+ klass->finish_buffer = gst_aggregator_default_finish_buffer;
+
klass->sink_event = gst_aggregator_default_sink_event;
klass->sink_query = gst_aggregator_default_sink_query;
"position (in nanoseconds)", 0, G_MAXUINT64,
DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstAggregator:min-upstream-latency:
+ *
+ * Force minimum upstream latency (in nanoseconds). When sources with a
+ * higher latency are expected to be plugged in dynamically after the
+ * aggregator has started playing, this allows overriding the minimum
+ * latency reported by the initial source(s). This is only taken into
+ * account when larger than the actually reported minimum latency.
+ *
+ * Since: 1.16
+ */
+ g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
+ g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
+ "When sources with a higher latency are expected to be plugged "
+ "in dynamically after the aggregator has started playing, "
+ "this allows overriding the minimum latency reported by the "
+ "initial source(s). This is only taken into account when larger "
+ "than the actually reported minimum latency. (nanoseconds)",
+ 0, G_MAXUINT64,
+ DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
g_param_spec_enum ("start-time-selection", "Start Time Selection",
"Decides which start time is output",
"Start time to use if start-time-selection=set", 0,
G_MAXUINT64,
DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
- GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
+static inline gpointer
+gst_aggregator_get_instance_private (GstAggregator * self)
+{
+ return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
}
static void
{
GstPadTemplate *pad_template;
GstAggregatorPrivate *priv;
+ GType pad_type;
g_return_if_fail (klass->aggregate != NULL);
- self->priv =
- G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
- GstAggregatorPrivate);
+ self->priv = gst_aggregator_get_instance_private (self);
priv = self->priv;
self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
self->priv->has_peer_latency = FALSE;
- gst_aggregator_reset_flow_values (self);
- self->srcpad = gst_pad_new_from_template (pad_template, "src");
+ pad_type =
+ GST_PAD_TEMPLATE_GTYPE (pad_template) ==
+ G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD :
+ GST_PAD_TEMPLATE_GTYPE (pad_template);
+ g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
+ self->srcpad =
+ g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC,
+ "template", pad_template, NULL);
+
+ gst_aggregator_reset_flow_values (self);
gst_pad_set_event_function (self->srcpad,
GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
+ self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
self->priv->latency = DEFAULT_LATENCY;
self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
self->priv->start_time = DEFAULT_START_TIME;
_type = g_type_register_static (GST_TYPE_ELEMENT,
"GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
+
+ aggregator_private_offset =
+ g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
+
g_once_init_leave (&type, _type);
}
return type;
GstFlowReturn flow_return;
GstClockTime buf_pts;
- GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
-
PAD_LOCK (aggpad);
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
if (self->priv->first_buffer) {
GstClockTime start_time;
+ GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
switch (self->priv->start_time_selection) {
case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
}
if (start_time != -1) {
- if (self->segment.position == -1)
- self->segment.position = start_time;
+ if (srcpad->segment.position == -1)
+ srcpad->segment.position = start_time;
else
- self->segment.position = MIN (start_time, self->segment.position);
+ srcpad->segment.position = MIN (start_time, srcpad->segment.position);
GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
GST_TIME_ARGS (start_time));
/***********************************
* GstAggregatorPad implementation *
************************************/
-G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
+G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
+
+#define DEFAULT_PAD_EMIT_SIGNALS FALSE
+
+enum
+{
+ PAD_PROP_0,
+ PAD_PROP_EMIT_SIGNALS,
+};
+
+enum
+{
+ PAD_SIGNAL_BUFFER_CONSUMED,
+ PAD_LAST_SIGNAL,
+};
+
+static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 };
static void
gst_aggregator_pad_constructed (GObject * object)
{
GstPad *pad = GST_PAD (object);
- gst_pad_set_chain_function (pad,
- GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
- gst_pad_set_event_full_function_full (pad,
- GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
- gst_pad_set_query_function (pad,
- GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
- gst_pad_set_activatemode_function (pad,
- GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+ if (GST_PAD_IS_SINK (pad)) {
+ gst_pad_set_chain_function (pad,
+ GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
+ gst_pad_set_event_full_function_full (pad,
+ GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
+ gst_pad_set_query_function (pad,
+ GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
+ gst_pad_set_activatemode_function (pad,
+ GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+ }
}
static void
}
static void
+gst_aggregator_pad_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+ switch (prop_id) {
+ case PAD_PROP_EMIT_SIGNALS:
+ pad->priv->emit_signals = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_aggregator_pad_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+ switch (prop_id) {
+ case PAD_PROP_EMIT_SIGNALS:
+ g_value_set_boolean (value, pad->priv->emit_signals);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
- g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
-
gobject_class->constructed = gst_aggregator_pad_constructed;
gobject_class->finalize = gst_aggregator_pad_finalize;
gobject_class->dispose = gst_aggregator_pad_dispose;
+ gobject_class->set_property = gst_aggregator_pad_set_property;
+ gobject_class->get_property = gst_aggregator_pad_get_property;
+
+ /**
+ * GstAggregatorPad:buffer-consumed:
+ *
+ * Signals that a buffer was consumed. As aggregator pads store buffers
+ * in an internal queue, there is no direct match between input and output
+ * buffers at any given time. This signal can be useful to forward metas
+ * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time.
+ *
+ * Since: 1.16
+ */
+ gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] =
+ g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, GST_TYPE_BUFFER);
+
+ /**
+ * GstAggregatorPad:emit-signals:
+ *
+ * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed
+ *
+ * Since: 1.16
+ */
+ g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS,
+ g_param_spec_boolean ("emit-signals", "Emit signals",
+ "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
gst_aggregator_pad_init (GstAggregatorPad * pad)
{
- pad->priv =
- G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
- GstAggregatorPadPrivate);
+ pad->priv = gst_aggregator_pad_get_instance_private (pad);
g_queue_init (&pad->priv->data);
g_cond_init (&pad->priv->event_cond);
gst_aggregator_pad_reset_unlocked (pad);
pad->priv->negotiated = FALSE;
+ pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS;
}
/* Must be called with the PAD_LOCK held */
static void
-gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
+gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
{
pad->priv->num_buffers--;
- GST_TRACE_OBJECT (pad, "Consuming buffer");
+ GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer);
+ if (buffer && pad->priv->emit_signals) {
+ g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
+ 0, buffer);
+ }
PAD_BROADCAST_EVENT (pad);
}
buffer = aggclass->clip (self, pad, buffer);
if (buffer == NULL) {
- gst_aggregator_pad_buffer_consumed (pad);
+ gst_aggregator_pad_buffer_consumed (pad, buffer);
GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
}
}
}
/**
- * gst_aggregator_pad_steal_buffer:
+ * gst_aggregator_pad_pop_buffer:
* @pad: the pad to get buffer from
*
* Steal the ref to the buffer currently queued in @pad.
* queued. You should unref the buffer after usage.
*/
GstBuffer *
-gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer;
PAD_LOCK (pad);
+ if (pad->priv->flow_return != GST_FLOW_OK) {
+ PAD_UNLOCK (pad);
+ return NULL;
+ }
+
gst_aggregator_pad_clip_buffer_unlocked (pad);
buffer = pad->priv->clipped_buffer;
if (buffer) {
pad->priv->clipped_buffer = NULL;
- gst_aggregator_pad_buffer_consumed (pad);
+ gst_aggregator_pad_buffer_consumed (pad, buffer);
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
}
{
GstBuffer *buf;
- buf = gst_aggregator_pad_steal_buffer (pad);
+ buf = gst_aggregator_pad_pop_buffer (pad);
if (buf == NULL)
return FALSE;
}
/**
- * gst_aggregator_pad_get_buffer:
+ * gst_aggregator_pad_peek_buffer:
* @pad: the pad to get buffer from
*
* Returns: (transfer full): A reference to the buffer in @pad or
* usage.
*/
GstBuffer *
-gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer;
PAD_LOCK (pad);
+ if (pad->priv->flow_return != GST_FLOW_OK) {
+ PAD_UNLOCK (pad);
+ return NULL;
+ }
+
gst_aggregator_pad_clip_buffer_unlocked (pad);
if (pad->priv->clipped_buffer) {
}
/**
+ * gst_aggregator_pad_has_buffer:
+ * @pad: the pad to check the buffer on
+ *
+ * This checks if a pad has a buffer available that will be returned by
+ * a call to gst_aggregator_pad_peek_buffer() or
+ * gst_aggregator_pad_pop_buffer().
+ *
+ * Returns: %TRUE if the pad has a buffer available as the next thing.
+ *
+ * Since: 1.14.1
+ */
+gboolean
+gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
+{
+ gboolean has_buffer;
+
+ PAD_LOCK (pad);
+ gst_aggregator_pad_clip_buffer_unlocked (pad);
+ has_buffer = (pad->priv->clipped_buffer != NULL);
+ PAD_UNLOCK (pad);
+
+ return has_buffer;
+}
+
+/**
* gst_aggregator_pad_is_eos:
* @pad: an aggregator pad
*
return is_eos;
}
-/**
+#if 0
+/*
* gst_aggregator_merge_tags:
* @self: a #GstAggregator
* @tags: a #GstTagList to merge
self->priv->tags_changed = TRUE;
GST_OBJECT_UNLOCK (self);
}
+#endif
/**
* gst_aggregator_set_latency:
if (params)
*params = self->priv->allocation_params;
}
+
+/**
+ * gst_aggregator_simple_get_next_time:
+ * @self: A #GstAggregator
+ *
+ * This is a simple #GstAggregator::get_next_time implementation that
+ * just looks at the #GstSegment on the srcpad of the aggregator and bases
+ * the next time on the running time there.
+ *
+ * This is the desired behaviour in most cases where you have a live source
+ * and you have a dead line based aggregator subclass.
+ *
+ * Returns: The running time based on the position
+ *
+ * Since: 1.16
+ */
+GstClockTime
+gst_aggregator_simple_get_next_time (GstAggregator * self)
+{
+ GstClockTime next_time;
+ GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
+ GstSegment *segment = &srcpad->segment;
+
+ GST_OBJECT_LOCK (self);
+ if (segment->position == -1 || segment->position < segment->start)
+ next_time = segment->start;
+ else
+ next_time = segment->position;
+
+ if (segment->stop != -1 && next_time > segment->stop)
+ next_time = segment->stop;
+
+ next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
+ GST_OBJECT_UNLOCK (self);
+
+ return next_time;
+}