X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=94d481604fb8564afb251f0be5e567a6d45d7b8b;hb=5bf13cdd5314bc3c6c81bd620e712acdcab14eb2;hp=89778a3dfb9ec9ed1e63439c29bc490cf410717c;hpb=f1aba33090ba04596984a11570773512ea44f5d6;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 89778a3..94d4816 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -61,6 +61,10 @@ * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE * to ease their identification and subsequent processing. * + * * Subclasses must use (a subclass of) #GstAggregatorPad for both their + * sink and source pads. + * See gst_element_class_add_static_pad_template_with_gtype(). + * * This class used to live in gst-plugins-bad and was moved to core. * * Since: 1.14 @@ -126,7 +130,8 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); -static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad); +static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, + GstBuffer * buffer); GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -257,6 +262,9 @@ struct _GstAggregatorPadPrivate * the chain function is also happening. */ GMutex flush_lock; + + /* properties */ + gboolean emit_signals; }; /* Must be called with PAD_LOCK held */ @@ -287,7 +295,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) PAD_UNLOCK (aggpad); if (klass->flush) - return klass->flush (aggpad, agg); + return (klass->flush (aggpad, agg) == GST_FLOW_OK); return TRUE; } @@ -296,6 +304,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) * GstAggregator implementation * *************************************/ static GstElementClass *aggregator_parent_class = NULL; +static gint aggregator_private_offset = 0; /* All members are protected by the object lock unless otherwise noted */ @@ -328,6 +337,8 @@ struct _GstAggregatorPrivate GstClockTime sub_latency_min; /* protected by src_lock */ GstClockTime sub_latency_max; /* protected by src_lock */ + GstClockTime upstream_latency_min; /* protected by src_lock */ + /* aggregate */ GstClockID aggregate_id; /* protected by src_lock */ GMutex src_lock; @@ -361,6 +372,7 @@ typedef struct } EventData; #define DEFAULT_LATENCY 0 +#define DEFAULT_MIN_UPSTREAM_LATENCY 0 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO #define DEFAULT_START_TIME (-1) @@ -368,6 +380,7 @@ enum { PROP_0, PROP_LATENCY, + PROP_MIN_UPSTREAM_LATENCY, PROP_START_TIME_SELECTION, PROP_START_TIME, PROP_LAST @@ -461,7 +474,8 @@ gst_aggregator_reset_flow_values (GstAggregator * self) GST_OBJECT_LOCK (self); self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; - gst_segment_init (&self->segment, GST_FORMAT_TIME); + gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment, + GST_FORMAT_TIME); self->priv->first_buffer = TRUE; GST_OBJECT_UNLOCK (self); } @@ -479,7 +493,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) GST_INFO_OBJECT (self, "pushing stream start"); /* stream-start (FIXME: create id based on input ids) */ g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ()); - if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) { + if (!gst_pad_push_event (GST_PAD (self->srcpad), + gst_event_new_stream_start (s_id))) { GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed"); } self->priv->send_stream_start = FALSE; @@ -489,7 +504,7 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT, self->priv->srccaps); - if (!gst_pad_push_event (self->srcpad, + if (!gst_pad_push_event (GST_PAD (self->srcpad), gst_event_new_caps (self->priv->srccaps))) { GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed"); } @@ -499,7 +514,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) GST_OBJECT_LOCK (self); if (self->priv->send_segment && !self->priv->flush_seeking) { - segment = gst_event_new_segment (&self->segment); + segment = + gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment); if (!self->priv->seqnum) /* This code-path is in preparation to be able to run without a source @@ -710,6 +726,12 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) return res; } +typedef struct +{ + gboolean processed_event; + GstFlowReturn flow_ret; +} DoHandleEventsAndQueriesData; + static gboolean gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, gpointer user_data) @@ -719,7 +741,7 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, GstEvent *event = NULL; GstQuery *query = NULL; GstAggregatorClass *klass = NULL; - gboolean *processed_event = user_data; + DoHandleEventsAndQueriesData *data = user_data; do { event = NULL; @@ -737,8 +759,7 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, if (event || query) { gboolean ret; - if (processed_event) - *processed_event = TRUE; + data->processed_event = TRUE; if (klass == NULL) klass = GST_AGGREGATOR_GET_CLASS (self); @@ -748,8 +769,11 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, ret = klass->sink_event (aggregator, pad, event); PAD_LOCK (pad); - if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) { pad->priv->negotiated = ret; + if (!ret) + pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED; + } if (g_queue_peek_tail (&pad->priv->data) == event) gst_event_unref (g_queue_pop_tail (&pad->priv->data)); gst_event_unref (event); @@ -797,7 +821,7 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, if (GST_IS_BUFFER (item->data) && klass->skip_buffer (aggpad, agg, item->data)) { GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data); - gst_aggregator_pad_buffer_consumed (aggpad); + gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data)); gst_buffer_unref (item->data); g_queue_delete_link (&aggpad->priv->data, item); } else { @@ -1089,10 +1113,13 @@ gst_aggregator_aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { GstFlowReturn flow_return = GST_FLOW_OK; - gboolean processed_event = FALSE; + DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK }; gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), - gst_aggregator_do_events_and_queries, NULL); + gst_aggregator_do_events_and_queries, &events_query_data); + + if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK) + goto handle_error; if (self->priv->peer_latency_live) gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), @@ -1103,10 +1130,15 @@ gst_aggregator_aggregate_func (GstAggregator * self) if (!gst_aggregator_wait_and_check (self, &timeout)) continue; + events_query_data.processed_event = FALSE; + events_query_data.flow_ret = GST_FLOW_OK; gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), - gst_aggregator_do_events_and_queries, &processed_event); + gst_aggregator_do_events_and_queries, &events_query_data); + + if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK) + goto handle_error; - if (processed_event) + if (events_query_data.processed_event) continue; if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) { @@ -1136,6 +1168,7 @@ gst_aggregator_aggregate_func (GstAggregator * self) gst_aggregator_push_eos (self); } + handle_error: GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); if (flow_return != GST_FLOW_OK) { @@ -1442,7 +1475,6 @@ gst_aggregator_default_sink_event (GstAggregator * self, GstBuffer *gapbuf; gst_event_parse_gap (event, &pts, &duration); - gapbuf = gst_buffer_new (); if (GST_CLOCK_TIME_IS_VALID (duration)) endpts = pts + duration; @@ -1464,14 +1496,17 @@ gst_aggregator_default_sink_event (GstAggregator * self, else duration = GST_CLOCK_TIME_NONE; + gapbuf = gst_buffer_new (); GST_BUFFER_PTS (gapbuf) = pts; GST_BUFFER_DURATION (gapbuf) = duration; GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); /* Remove GAP event so we can replace it with the buffer */ + PAD_LOCK (aggpad); if (g_queue_peek_tail (&aggpad->priv->data) == event) gst_event_unref (g_queue_pop_tail (&aggpad->priv->data)); + PAD_UNLOCK (aggpad); if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != GST_FLOW_OK) { @@ -1634,17 +1669,26 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, GST_OBJECT_LOCK (self); if (req_name == NULL || strlen (req_name) < 6 - || !g_str_has_prefix (req_name, "sink_")) { + || !g_str_has_prefix (req_name, "sink_") + || strrchr (req_name, '%') != NULL) { /* no name given when requesting the pad, use next available int */ serial = ++priv->max_padserial; } else { + gchar *endptr = NULL; + /* parse serial number from requested padname */ - serial = g_ascii_strtoull (&req_name[5], NULL, 10); - if (serial > priv->max_padserial) - priv->max_padserial = serial; + serial = g_ascii_strtoull (&req_name[5], &endptr, 10); + if (endptr != NULL && *endptr == '\0') { + if (serial > priv->max_padserial) { + priv->max_padserial = serial; + } + } else { + serial = ++priv->max_padserial; + } } name = g_strdup_printf ("sink_%u", serial); + g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD)); agg_pad = g_object_new (pad_type, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); @@ -1717,6 +1761,16 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) return FALSE; } + if (self->priv->upstream_latency_min > min) { + GstClockTimeDiff diff = + GST_CLOCK_DIFF (min, self->priv->upstream_latency_min); + + min += diff; + if (GST_CLOCK_TIME_IS_VALID (max)) { + max += diff; + } + } + if (min > max && GST_CLOCK_TIME_IS_VALID (max)) { GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL), ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %" @@ -1828,8 +1882,8 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event) &start, &stop_type, &stop); GST_OBJECT_LOCK (self); - gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, - stop_type, stop, NULL); + gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt, + flags, start_type, start, stop_type, stop, NULL); self->priv->seqnum = gst_event_get_seqnum (event); self->priv->first_buffer = FALSE; GST_OBJECT_UNLOCK (self); @@ -1888,7 +1942,6 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) } else { ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); - gst_object_unref (peer); } } @@ -1928,6 +1981,9 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) evdata->result &= ret; + if (peer) + gst_object_unref (peer); + /* Always send to all pads */ return FALSE; } @@ -1987,8 +2043,8 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) priv->flush_seeking = TRUE; } - gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, - stop_type, stop, NULL); + gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt, + flags, start_type, start, stop_type, stop, NULL); /* Seeking sets a position */ self->priv->first_buffer = FALSE; @@ -2238,6 +2294,11 @@ gst_aggregator_set_property (GObject * object, guint prop_id, case PROP_LATENCY: gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value)); break; + case PROP_MIN_UPSTREAM_LATENCY: + SRC_LOCK (agg); + agg->priv->upstream_latency_min = g_value_get_uint64 (value); + SRC_UNLOCK (agg); + break; case PROP_START_TIME_SELECTION: agg->priv->start_time_selection = g_value_get_enum (value); break; @@ -2260,6 +2321,11 @@ gst_aggregator_get_property (GObject * object, guint prop_id, case PROP_LATENCY: g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg)); break; + case PROP_MIN_UPSTREAM_LATENCY: + SRC_LOCK (agg); + g_value_set_uint64 (value, agg->priv->upstream_latency_min); + SRC_UNLOCK (agg); + break; case PROP_START_TIME_SELECTION: g_value_set_enum (value, agg->priv->start_time_selection); break; @@ -2280,11 +2346,13 @@ gst_aggregator_class_init (GstAggregatorClass * klass) GstElementClass *gstelement_class = (GstElementClass *) klass; aggregator_parent_class = g_type_class_peek_parent (klass); - g_type_class_add_private (klass, sizeof (GstAggregatorPrivate)); GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", GST_DEBUG_FG_MAGENTA, "GstAggregator"); + if (aggregator_private_offset != 0) + g_type_class_adjust_private_offset (klass, &aggregator_private_offset); + klass->finish_buffer = gst_aggregator_default_finish_buffer; klass->sink_event = gst_aggregator_default_sink_event; @@ -2317,6 +2385,27 @@ gst_aggregator_class_init (GstAggregatorClass * klass) "position (in nanoseconds)", 0, G_MAXUINT64, DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstAggregator:min-upstream-latency: + * + * Force minimum upstream latency (in nanoseconds). When sources with a + * higher latency are expected to be plugged in dynamically after the + * aggregator has started playing, this allows overriding the minimum + * latency reported by the initial source(s). This is only taken into + * account when larger than the actually reported minimum latency. + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY, + g_param_spec_uint64 ("min-upstream-latency", "Buffer latency", + "When sources with a higher latency are expected to be plugged " + "in dynamically after the aggregator has started playing, " + "this allows overriding the minimum latency reported by the " + "initial source(s). This is only taken into account when larger " + "than the actually reported minimum latency. (nanoseconds)", + 0, G_MAXUINT64, + DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, g_param_spec_enum ("start-time-selection", "Start Time Selection", "Decides which start time is output", @@ -2331,17 +2420,22 @@ gst_aggregator_class_init (GstAggregatorClass * klass) DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } +static inline gpointer +gst_aggregator_get_instance_private (GstAggregator * self) +{ + return (G_STRUCT_MEMBER_P (self, aggregator_private_offset)); +} + static void gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) { GstPadTemplate *pad_template; GstAggregatorPrivate *priv; + GType pad_type; g_return_if_fail (klass->aggregate != NULL); - self->priv = - G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR, - GstAggregatorPrivate); + self->priv = gst_aggregator_get_instance_private (self); priv = self->priv; @@ -2356,9 +2450,17 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) self->priv->peer_latency_min = self->priv->sub_latency_min = 0; self->priv->peer_latency_max = self->priv->sub_latency_max = 0; self->priv->has_peer_latency = FALSE; - gst_aggregator_reset_flow_values (self); - self->srcpad = gst_pad_new_from_template (pad_template, "src"); + pad_type = + GST_PAD_TEMPLATE_GTYPE (pad_template) == + G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : + GST_PAD_TEMPLATE_GTYPE (pad_template); + g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD)); + self->srcpad = + g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC, + "template", pad_template, NULL); + + gst_aggregator_reset_flow_values (self); gst_pad_set_event_function (self->srcpad, GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func)); @@ -2369,6 +2471,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); + self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY; self->priv->latency = DEFAULT_LATENCY; self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION; self->priv->start_time = DEFAULT_START_TIME; @@ -2400,6 +2503,10 @@ gst_aggregator_get_type (void) _type = g_type_register_static (GST_TYPE_ELEMENT, "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT); + + aggregator_private_offset = + g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate)); + g_once_init_leave (&type, _type); } return type; @@ -2517,6 +2624,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, if (self->priv->first_buffer) { GstClockTime start_time; + GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad); switch (self->priv->start_time_selection) { case GST_AGGREGATOR_START_TIME_SELECTION_ZERO: @@ -2550,10 +2658,10 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, } if (start_time != -1) { - if (self->segment.position == -1) - self->segment.position = start_time; + if (srcpad->segment.position == -1) + srcpad->segment.position = start_time; else - self->segment.position = MIN (start_time, self->segment.position); + srcpad->segment.position = MIN (start_time, srcpad->segment.position); GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT, GST_TIME_ARGS (start_time)); @@ -2732,7 +2840,23 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad, /*********************************** * GstAggregatorPad implementation * ************************************/ -G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); +G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); + +#define DEFAULT_PAD_EMIT_SIGNALS FALSE + +enum +{ + PAD_PROP_0, + PAD_PROP_EMIT_SIGNALS, +}; + +enum +{ + PAD_SIGNAL_BUFFER_CONSUMED, + PAD_LAST_SIGNAL, +}; + +static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 }; static void gst_aggregator_pad_constructed (GObject * object) @@ -2774,23 +2898,80 @@ gst_aggregator_pad_dispose (GObject * object) } static void +gst_aggregator_pad_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object); + + switch (prop_id) { + case PAD_PROP_EMIT_SIGNALS: + pad->priv->emit_signals = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_aggregator_pad_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object); + + switch (prop_id) { + case PAD_PROP_EMIT_SIGNALS: + g_value_set_boolean (value, pad->priv->emit_signals); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void gst_aggregator_pad_class_init (GstAggregatorPadClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; - g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate)); - gobject_class->constructed = gst_aggregator_pad_constructed; gobject_class->finalize = gst_aggregator_pad_finalize; gobject_class->dispose = gst_aggregator_pad_dispose; + gobject_class->set_property = gst_aggregator_pad_set_property; + gobject_class->get_property = gst_aggregator_pad_get_property; + + /** + * GstAggregatorPad:buffer-consumed: + * + * Signals that a buffer was consumed. As aggregator pads store buffers + * in an internal queue, there is no direct match between input and output + * buffers at any given time. This signal can be useful to forward metas + * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time. + * + * Since: 1.16 + */ + gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] = + g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_BUFFER); + + /** + * GstAggregatorPad:emit-signals: + * + * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS, + g_param_spec_boolean ("emit-signals", "Emit signals", + "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void gst_aggregator_pad_init (GstAggregatorPad * pad) { - pad->priv = - G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, - GstAggregatorPadPrivate); + pad->priv = gst_aggregator_pad_get_instance_private (pad); g_queue_init (&pad->priv->data); g_cond_init (&pad->priv->event_cond); @@ -2800,14 +2981,19 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) gst_aggregator_pad_reset_unlocked (pad); pad->priv->negotiated = FALSE; + pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS; } /* Must be called with the PAD_LOCK held */ static void -gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) +gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer) { pad->priv->num_buffers--; - GST_TRACE_OBJECT (pad, "Consuming buffer"); + GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer); + if (buffer && pad->priv->emit_signals) { + g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED], + 0, buffer); + } PAD_BROADCAST_EVENT (pad); } @@ -2844,7 +3030,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) buffer = aggclass->clip (self, pad, buffer); if (buffer == NULL) { - gst_aggregator_pad_buffer_consumed (pad); + gst_aggregator_pad_buffer_consumed (pad, buffer); GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); } } @@ -2872,13 +3058,18 @@ gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad) PAD_LOCK (pad); + if (pad->priv->flow_return != GST_FLOW_OK) { + PAD_UNLOCK (pad); + return NULL; + } + gst_aggregator_pad_clip_buffer_unlocked (pad); buffer = pad->priv->clipped_buffer; if (buffer) { pad->priv->clipped_buffer = NULL; - gst_aggregator_pad_buffer_consumed (pad); + gst_aggregator_pad_buffer_consumed (pad, buffer); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } @@ -2924,6 +3115,11 @@ gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) PAD_LOCK (pad); + if (pad->priv->flow_return != GST_FLOW_OK) { + PAD_UNLOCK (pad); + return NULL; + } + gst_aggregator_pad_clip_buffer_unlocked (pad); if (pad->priv->clipped_buffer) { @@ -2937,6 +3133,31 @@ gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) } /** + * gst_aggregator_pad_has_buffer: + * @pad: the pad to check the buffer on + * + * This checks if a pad has a buffer available that will be returned by + * a call to gst_aggregator_pad_peek_buffer() or + * gst_aggregator_pad_pop_buffer(). + * + * Returns: %TRUE if the pad has a buffer available as the next thing. + * + * Since: 1.14.1 + */ +gboolean +gst_aggregator_pad_has_buffer (GstAggregatorPad * pad) +{ + gboolean has_buffer; + + PAD_LOCK (pad); + gst_aggregator_pad_clip_buffer_unlocked (pad); + has_buffer = (pad->priv->clipped_buffer != NULL); + PAD_UNLOCK (pad); + + return has_buffer; +} + +/** * gst_aggregator_pad_is_eos: * @pad: an aggregator pad * @@ -3080,3 +3301,40 @@ gst_aggregator_get_allocator (GstAggregator * self, if (params) *params = self->priv->allocation_params; } + +/** + * gst_aggregator_simple_get_next_time: + * @self: A #GstAggregator + * + * This is a simple #GstAggregator::get_next_time implementation that + * just looks at the #GstSegment on the srcpad of the aggregator and bases + * the next time on the running time there. + * + * This is the desired behaviour in most cases where you have a live source + * and you have a dead line based aggregator subclass. + * + * Returns: The running time based on the position + * + * Since: 1.16 + */ +GstClockTime +gst_aggregator_simple_get_next_time (GstAggregator * self) +{ + GstClockTime next_time; + GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad); + GstSegment *segment = &srcpad->segment; + + GST_OBJECT_LOCK (self); + if (segment->position == -1 || segment->position < segment->start) + next_time = segment->start; + else + next_time = segment->position; + + if (segment->stop != -1 && next_time > segment->stop) + next_time = segment->stop; + + next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time); + GST_OBJECT_UNLOCK (self); + + return next_time; +}