X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=94d481604fb8564afb251f0be5e567a6d45d7b8b;hb=5bf13cdd5314bc3c6c81bd620e712acdcab14eb2;hp=7eeea595578b16cf26648b6f2096b67121cc4b54;hpb=96c3a635ce098c9bf2d692354e0c1e9b9bf64640;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 7eeea59..94d4816 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -130,7 +130,8 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); -static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad); +static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, + GstBuffer * buffer); GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -261,6 +262,9 @@ struct _GstAggregatorPadPrivate * the chain function is also happening. */ GMutex flush_lock; + + /* properties */ + gboolean emit_signals; }; /* Must be called with PAD_LOCK held */ @@ -291,7 +295,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) PAD_UNLOCK (aggpad); if (klass->flush) - return klass->flush (aggpad, agg); + return (klass->flush (aggpad, agg) == GST_FLOW_OK); return TRUE; } @@ -300,6 +304,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) * GstAggregator implementation * *************************************/ static GstElementClass *aggregator_parent_class = NULL; +static gint aggregator_private_offset = 0; /* All members are protected by the object lock unless otherwise noted */ @@ -332,6 +337,8 @@ struct _GstAggregatorPrivate GstClockTime sub_latency_min; /* protected by src_lock */ GstClockTime sub_latency_max; /* protected by src_lock */ + GstClockTime upstream_latency_min; /* protected by src_lock */ + /* aggregate */ GstClockID aggregate_id; /* protected by src_lock */ GMutex src_lock; @@ -365,6 +372,7 @@ typedef struct } EventData; #define DEFAULT_LATENCY 0 +#define DEFAULT_MIN_UPSTREAM_LATENCY 0 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO #define DEFAULT_START_TIME (-1) @@ -372,6 +380,7 @@ enum { PROP_0, PROP_LATENCY, + PROP_MIN_UPSTREAM_LATENCY, PROP_START_TIME_SELECTION, PROP_START_TIME, PROP_LAST @@ -717,6 +726,12 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) return res; } +typedef struct +{ + gboolean processed_event; + GstFlowReturn flow_ret; +} DoHandleEventsAndQueriesData; + static gboolean gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, gpointer user_data) @@ -726,7 +741,7 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, GstEvent *event = NULL; GstQuery *query = NULL; GstAggregatorClass *klass = NULL; - gboolean *processed_event = user_data; + DoHandleEventsAndQueriesData *data = user_data; do { event = NULL; @@ -744,8 +759,7 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, if (event || query) { gboolean ret; - if (processed_event) - *processed_event = TRUE; + data->processed_event = TRUE; if (klass == NULL) klass = GST_AGGREGATOR_GET_CLASS (self); @@ -755,8 +769,11 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, ret = klass->sink_event (aggregator, pad, event); PAD_LOCK (pad); - if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) { pad->priv->negotiated = ret; + if (!ret) + pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED; + } if (g_queue_peek_tail (&pad->priv->data) == event) gst_event_unref (g_queue_pop_tail (&pad->priv->data)); gst_event_unref (event); @@ -804,7 +821,7 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, if (GST_IS_BUFFER (item->data) && klass->skip_buffer (aggpad, agg, item->data)) { GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data); - gst_aggregator_pad_buffer_consumed (aggpad); + gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data)); gst_buffer_unref (item->data); g_queue_delete_link (&aggpad->priv->data, item); } else { @@ -1096,10 +1113,13 @@ gst_aggregator_aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { GstFlowReturn flow_return = GST_FLOW_OK; - gboolean processed_event = FALSE; + DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK }; gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), - gst_aggregator_do_events_and_queries, NULL); + gst_aggregator_do_events_and_queries, &events_query_data); + + if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK) + goto handle_error; if (self->priv->peer_latency_live) gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), @@ -1110,10 +1130,15 @@ gst_aggregator_aggregate_func (GstAggregator * self) if (!gst_aggregator_wait_and_check (self, &timeout)) continue; + events_query_data.processed_event = FALSE; + events_query_data.flow_ret = GST_FLOW_OK; gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), - gst_aggregator_do_events_and_queries, &processed_event); + gst_aggregator_do_events_and_queries, &events_query_data); + + if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK) + goto handle_error; - if (processed_event) + if (events_query_data.processed_event) continue; if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) { @@ -1143,6 +1168,7 @@ gst_aggregator_aggregate_func (GstAggregator * self) gst_aggregator_push_eos (self); } + handle_error: GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); if (flow_return != GST_FLOW_OK) { @@ -1449,7 +1475,6 @@ gst_aggregator_default_sink_event (GstAggregator * self, GstBuffer *gapbuf; gst_event_parse_gap (event, &pts, &duration); - gapbuf = gst_buffer_new (); if (GST_CLOCK_TIME_IS_VALID (duration)) endpts = pts + duration; @@ -1471,14 +1496,17 @@ gst_aggregator_default_sink_event (GstAggregator * self, else duration = GST_CLOCK_TIME_NONE; + gapbuf = gst_buffer_new (); GST_BUFFER_PTS (gapbuf) = pts; GST_BUFFER_DURATION (gapbuf) = duration; GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); /* Remove GAP event so we can replace it with the buffer */ + PAD_LOCK (aggpad); if (g_queue_peek_tail (&aggpad->priv->data) == event) gst_event_unref (g_queue_pop_tail (&aggpad->priv->data)); + PAD_UNLOCK (aggpad); if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != GST_FLOW_OK) { @@ -1641,17 +1669,26 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, GST_OBJECT_LOCK (self); if (req_name == NULL || strlen (req_name) < 6 - || !g_str_has_prefix (req_name, "sink_")) { + || !g_str_has_prefix (req_name, "sink_") + || strrchr (req_name, '%') != NULL) { /* no name given when requesting the pad, use next available int */ serial = ++priv->max_padserial; } else { + gchar *endptr = NULL; + /* parse serial number from requested padname */ - serial = g_ascii_strtoull (&req_name[5], NULL, 10); - if (serial > priv->max_padserial) - priv->max_padserial = serial; + serial = g_ascii_strtoull (&req_name[5], &endptr, 10); + if (endptr != NULL && *endptr == '\0') { + if (serial > priv->max_padserial) { + priv->max_padserial = serial; + } + } else { + serial = ++priv->max_padserial; + } } name = g_strdup_printf ("sink_%u", serial); + g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD)); agg_pad = g_object_new (pad_type, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); @@ -1724,6 +1761,16 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) return FALSE; } + if (self->priv->upstream_latency_min > min) { + GstClockTimeDiff diff = + GST_CLOCK_DIFF (min, self->priv->upstream_latency_min); + + min += diff; + if (GST_CLOCK_TIME_IS_VALID (max)) { + max += diff; + } + } + if (min > max && GST_CLOCK_TIME_IS_VALID (max)) { GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL), ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %" @@ -1895,7 +1942,6 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) } else { ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); - gst_object_unref (peer); } } @@ -1935,6 +1981,9 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) evdata->result &= ret; + if (peer) + gst_object_unref (peer); + /* Always send to all pads */ return FALSE; } @@ -2245,6 +2294,11 @@ gst_aggregator_set_property (GObject * object, guint prop_id, case PROP_LATENCY: gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value)); break; + case PROP_MIN_UPSTREAM_LATENCY: + SRC_LOCK (agg); + agg->priv->upstream_latency_min = g_value_get_uint64 (value); + SRC_UNLOCK (agg); + break; case PROP_START_TIME_SELECTION: agg->priv->start_time_selection = g_value_get_enum (value); break; @@ -2267,6 +2321,11 @@ gst_aggregator_get_property (GObject * object, guint prop_id, case PROP_LATENCY: g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg)); break; + case PROP_MIN_UPSTREAM_LATENCY: + SRC_LOCK (agg); + g_value_set_uint64 (value, agg->priv->upstream_latency_min); + SRC_UNLOCK (agg); + break; case PROP_START_TIME_SELECTION: g_value_set_enum (value, agg->priv->start_time_selection); break; @@ -2287,11 +2346,13 @@ gst_aggregator_class_init (GstAggregatorClass * klass) GstElementClass *gstelement_class = (GstElementClass *) klass; aggregator_parent_class = g_type_class_peek_parent (klass); - g_type_class_add_private (klass, sizeof (GstAggregatorPrivate)); GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", GST_DEBUG_FG_MAGENTA, "GstAggregator"); + if (aggregator_private_offset != 0) + g_type_class_adjust_private_offset (klass, &aggregator_private_offset); + klass->finish_buffer = gst_aggregator_default_finish_buffer; klass->sink_event = gst_aggregator_default_sink_event; @@ -2324,6 +2385,27 @@ gst_aggregator_class_init (GstAggregatorClass * klass) "position (in nanoseconds)", 0, G_MAXUINT64, DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstAggregator:min-upstream-latency: + * + * Force minimum upstream latency (in nanoseconds). When sources with a + * higher latency are expected to be plugged in dynamically after the + * aggregator has started playing, this allows overriding the minimum + * latency reported by the initial source(s). This is only taken into + * account when larger than the actually reported minimum latency. + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY, + g_param_spec_uint64 ("min-upstream-latency", "Buffer latency", + "When sources with a higher latency are expected to be plugged " + "in dynamically after the aggregator has started playing, " + "this allows overriding the minimum latency reported by the " + "initial source(s). This is only taken into account when larger " + "than the actually reported minimum latency. (nanoseconds)", + 0, G_MAXUINT64, + DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, g_param_spec_enum ("start-time-selection", "Start Time Selection", "Decides which start time is output", @@ -2338,17 +2420,22 @@ gst_aggregator_class_init (GstAggregatorClass * klass) DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } +static inline gpointer +gst_aggregator_get_instance_private (GstAggregator * self) +{ + return (G_STRUCT_MEMBER_P (self, aggregator_private_offset)); +} + static void gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) { GstPadTemplate *pad_template; GstAggregatorPrivate *priv; + GType pad_type; g_return_if_fail (klass->aggregate != NULL); - self->priv = - G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR, - GstAggregatorPrivate); + self->priv = gst_aggregator_get_instance_private (self); priv = self->priv; @@ -2364,7 +2451,14 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) self->priv->peer_latency_max = self->priv->sub_latency_max = 0; self->priv->has_peer_latency = FALSE; - self->srcpad = gst_pad_new_from_template (pad_template, "src"); + pad_type = + GST_PAD_TEMPLATE_GTYPE (pad_template) == + G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : + GST_PAD_TEMPLATE_GTYPE (pad_template); + g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD)); + self->srcpad = + g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC, + "template", pad_template, NULL); gst_aggregator_reset_flow_values (self); @@ -2377,6 +2471,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); + self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY; self->priv->latency = DEFAULT_LATENCY; self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION; self->priv->start_time = DEFAULT_START_TIME; @@ -2408,6 +2503,10 @@ gst_aggregator_get_type (void) _type = g_type_register_static (GST_TYPE_ELEMENT, "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT); + + aggregator_private_offset = + g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate)); + g_once_init_leave (&type, _type); } return type; @@ -2741,7 +2840,23 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad, /*********************************** * GstAggregatorPad implementation * ************************************/ -G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); +G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); + +#define DEFAULT_PAD_EMIT_SIGNALS FALSE + +enum +{ + PAD_PROP_0, + PAD_PROP_EMIT_SIGNALS, +}; + +enum +{ + PAD_SIGNAL_BUFFER_CONSUMED, + PAD_LAST_SIGNAL, +}; + +static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 }; static void gst_aggregator_pad_constructed (GObject * object) @@ -2783,23 +2898,80 @@ gst_aggregator_pad_dispose (GObject * object) } static void +gst_aggregator_pad_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object); + + switch (prop_id) { + case PAD_PROP_EMIT_SIGNALS: + pad->priv->emit_signals = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_aggregator_pad_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object); + + switch (prop_id) { + case PAD_PROP_EMIT_SIGNALS: + g_value_set_boolean (value, pad->priv->emit_signals); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void gst_aggregator_pad_class_init (GstAggregatorPadClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; - g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate)); - gobject_class->constructed = gst_aggregator_pad_constructed; gobject_class->finalize = gst_aggregator_pad_finalize; gobject_class->dispose = gst_aggregator_pad_dispose; + gobject_class->set_property = gst_aggregator_pad_set_property; + gobject_class->get_property = gst_aggregator_pad_get_property; + + /** + * GstAggregatorPad:buffer-consumed: + * + * Signals that a buffer was consumed. As aggregator pads store buffers + * in an internal queue, there is no direct match between input and output + * buffers at any given time. This signal can be useful to forward metas + * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time. + * + * Since: 1.16 + */ + gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] = + g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_BUFFER); + + /** + * GstAggregatorPad:emit-signals: + * + * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS, + g_param_spec_boolean ("emit-signals", "Emit signals", + "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void gst_aggregator_pad_init (GstAggregatorPad * pad) { - pad->priv = - G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, - GstAggregatorPadPrivate); + pad->priv = gst_aggregator_pad_get_instance_private (pad); g_queue_init (&pad->priv->data); g_cond_init (&pad->priv->event_cond); @@ -2809,14 +2981,19 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) gst_aggregator_pad_reset_unlocked (pad); pad->priv->negotiated = FALSE; + pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS; } /* Must be called with the PAD_LOCK held */ static void -gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) +gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer) { pad->priv->num_buffers--; - GST_TRACE_OBJECT (pad, "Consuming buffer"); + GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer); + if (buffer && pad->priv->emit_signals) { + g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED], + 0, buffer); + } PAD_BROADCAST_EVENT (pad); } @@ -2853,7 +3030,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) buffer = aggclass->clip (self, pad, buffer); if (buffer == NULL) { - gst_aggregator_pad_buffer_consumed (pad); + gst_aggregator_pad_buffer_consumed (pad, buffer); GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); } } @@ -2881,13 +3058,18 @@ gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad) PAD_LOCK (pad); + if (pad->priv->flow_return != GST_FLOW_OK) { + PAD_UNLOCK (pad); + return NULL; + } + gst_aggregator_pad_clip_buffer_unlocked (pad); buffer = pad->priv->clipped_buffer; if (buffer) { pad->priv->clipped_buffer = NULL; - gst_aggregator_pad_buffer_consumed (pad); + gst_aggregator_pad_buffer_consumed (pad, buffer); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } @@ -2933,6 +3115,11 @@ gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) PAD_LOCK (pad); + if (pad->priv->flow_return != GST_FLOW_OK) { + PAD_UNLOCK (pad); + return NULL; + } + gst_aggregator_pad_clip_buffer_unlocked (pad); if (pad->priv->clipped_buffer) { @@ -2955,7 +3142,7 @@ gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) * * Returns: %TRUE if the pad has a buffer available as the next thing. * - * Since: 1.16 + * Since: 1.14.1 */ gboolean gst_aggregator_pad_has_buffer (GstAggregatorPad * pad) @@ -3114,3 +3301,40 @@ gst_aggregator_get_allocator (GstAggregator * self, if (params) *params = self->priv->allocation_params; } + +/** + * gst_aggregator_simple_get_next_time: + * @self: A #GstAggregator + * + * This is a simple #GstAggregator::get_next_time implementation that + * just looks at the #GstSegment on the srcpad of the aggregator and bases + * the next time on the running time there. + * + * This is the desired behaviour in most cases where you have a live source + * and you have a dead line based aggregator subclass. + * + * Returns: The running time based on the position + * + * Since: 1.16 + */ +GstClockTime +gst_aggregator_simple_get_next_time (GstAggregator * self) +{ + GstClockTime next_time; + GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad); + GstSegment *segment = &srcpad->segment; + + GST_OBJECT_LOCK (self); + if (segment->position == -1 || segment->position < segment->start) + next_time = segment->start; + else + next_time = segment->position; + + if (segment->stop != -1 && next_time > segment->stop) + next_time = segment->stop; + + next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time); + GST_OBJECT_UNLOCK (self); + + return next_time; +}