X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=9dddf837c46a8078a1925050a1f8a1a84990436c;hb=79d0239e2fe92ea39a6abc0ecdccc92034a64ce6;hp=3a642d8a683cb2c7b360bb002fbde2c56def1d9a;hpb=8b60b25917c224b95afa0f05792560c5bee46eac;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 3a642d8..9dddf83 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -103,8 +103,8 @@ gst_aggregator_start_time_selection_get_type (void) static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); static void gst_aggregator_set_latency_property (GstAggregator * agg, - gint64 latency); -static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); + GstClockTime latency); +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); @@ -286,6 +286,8 @@ struct _GstAggregatorPrivate /* Our state is >= PAUSED */ gboolean running; /* protected by src_lock */ + /* seqnum from seek or segment, + * to be applied to synthetic segment/eos events */ gint seqnum; gboolean send_stream_start; /* protected by srcpad stream lock */ gboolean send_segment; @@ -354,87 +356,6 @@ enum static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); -/** - * gst_aggregator_iterate_sinkpads: - * @self: The #GstAggregator - * @func: (scope call): The function to call. - * @user_data: (closure): The data to pass to @func. - * - * Iterate the sinkpads of aggregator to call a function on them. - * - * This method guarantees that @func will be called only once for each - * sink pad. - * - * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE - */ -gboolean -gst_aggregator_iterate_sinkpads (GstAggregator * self, - GstAggregatorPadForeachFunc func, gpointer user_data) -{ - gboolean result = FALSE; - GstIterator *iter; - gboolean done = FALSE; - GValue item = { 0, }; - GList *seen_pads = NULL; - - iter = gst_element_iterate_sink_pads (GST_ELEMENT (self)); - - if (!iter) - goto no_iter; - - while (!done) { - switch (gst_iterator_next (iter, &item)) { - case GST_ITERATOR_OK: - { - GstAggregatorPad *pad; - - pad = g_value_get_object (&item); - - /* if already pushed, skip. FIXME, find something faster to tag pads */ - if (pad == NULL || g_list_find (seen_pads, pad)) { - g_value_reset (&item); - break; - } - - GST_LOG_OBJECT (pad, "calling function %s on pad", - GST_DEBUG_FUNCPTR_NAME (func)); - - result = func (self, pad, user_data); - - done = !result; - - seen_pads = g_list_prepend (seen_pads, pad); - - g_value_reset (&item); - break; - } - case GST_ITERATOR_RESYNC: - gst_iterator_resync (iter); - break; - case GST_ITERATOR_ERROR: - GST_ERROR_OBJECT (self, - "Could not iterate over internally linked pads"); - done = TRUE; - break; - case GST_ITERATOR_DONE: - done = TRUE; - break; - } - } - g_value_unset (&item); - gst_iterator_free (iter); - - if (seen_pads == NULL) { - GST_DEBUG_OBJECT (self, "No pad seen"); - return FALSE; - } - - g_list_free (seen_pads); - -no_iter: - return result; -} - static gboolean gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) { @@ -561,6 +482,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) segment = gst_event_new_segment (&self->segment); if (!self->priv->seqnum) + /* This code-path is in preparation to be able to run without a source + * connected. Then we won't have a seq-num from a segment event. */ self->priv->seqnum = gst_event_get_seqnum (segment); else gst_event_set_seqnum (segment, self->priv->seqnum); @@ -758,9 +681,11 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) } static gboolean -gst_aggregator_do_events_and_queries (GstAggregator * self, - GstAggregatorPad * pad, gpointer user_data) +gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, + gpointer user_data) { + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *aggregator = GST_AGGREGATOR_CAST (self); GstEvent *event = NULL; GstQuery *query = NULL; GstAggregatorClass *klass = NULL; @@ -790,7 +715,7 @@ gst_aggregator_do_events_and_queries (GstAggregator * self, if (event) { GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); gst_event_ref (event); - ret = klass->sink_event (self, pad, event); + ret = klass->sink_event (aggregator, pad, event); PAD_LOCK (pad); if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) @@ -800,7 +725,7 @@ gst_aggregator_do_events_and_queries (GstAggregator * self, gst_event_unref (event); } else if (query) { GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query); - ret = klass->sink_query (self, pad, query); + ret = klass->sink_query (aggregator, pad, query); PAD_LOCK (pad); if (g_queue_peek_tail (&pad->priv->data) == query) { @@ -1100,14 +1025,17 @@ gst_aggregator_aggregate_func (GstAggregator * self) GstFlowReturn flow_return = GST_FLOW_OK; gboolean processed_event = FALSE; - gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries, - NULL); + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, NULL); + /* Ensure we have buffers ready (either in clipped_buffer or at the head of + * the queue */ if (!gst_aggregator_wait_and_check (self, &timeout)) continue; - gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries, - &processed_event); + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, &processed_event); + if (processed_event) continue; @@ -1514,11 +1442,13 @@ eat: return res; } -static inline gboolean -gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad, - gpointer unused_udata) +static gboolean +gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data) { - gst_aggregator_pad_flush (pad, self); + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *agg = GST_AGGREGATOR_CAST (self); + + gst_aggregator_pad_flush (pad, agg); PAD_LOCK (pad); pad->priv->flow_return = GST_FLOW_FLUSHING; @@ -1537,7 +1467,9 @@ gst_aggregator_stop (GstAggregator * agg) gst_aggregator_reset_flow_values (agg); - gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL); + /* Application needs to make sure no pads are added while it shuts us down */ + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg), + gst_aggregator_stop_pad, NULL); klass = GST_AGGREGATOR_GET_CLASS (agg); @@ -1632,6 +1564,9 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, GstAggregatorPrivate *priv = self->priv; gint serial = 0; gchar *name = NULL; + GType pad_type = + GST_PAD_TEMPLATE_GTYPE (templ) == + G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ); if (templ->direction != GST_PAD_SINK) goto not_sink; @@ -1652,7 +1587,7 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, } name = g_strdup_printf ("sink_%u", serial); - agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, + agg_pad = g_object_new (pad_type, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); @@ -2170,7 +2105,7 @@ gst_aggregator_finalize (GObject * object) * as unresponsive. */ static void -gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) +gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency) { gboolean changed; @@ -2221,12 +2156,12 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) * before a pad is deemed unresponsive. A value of -1 means an * unlimited time. */ -static gint64 +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg) { - gint64 res; + GstClockTime res; - g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); + g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE); GST_OBJECT_LOCK (agg); res = agg->priv->latency; @@ -2243,7 +2178,7 @@ gst_aggregator_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - gst_aggregator_set_latency_property (agg, g_value_get_int64 (value)); + gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value)); break; case PROP_START_TIME_SELECTION: agg->priv->start_time_selection = g_value_get_enum (value); @@ -2265,7 +2200,7 @@ gst_aggregator_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - g_value_set_int64 (value, gst_aggregator_get_latency_property (agg)); + g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg)); break; case PROP_START_TIME_SELECTION: g_value_set_enum (value, agg->priv->start_time_selection); @@ -2292,8 +2227,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass) GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", GST_DEBUG_FG_MAGENTA, "GstAggregator"); - klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD; - klass->sink_event = gst_aggregator_default_sink_event; klass->sink_query = gst_aggregator_default_sink_query; @@ -2318,11 +2251,10 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gobject_class->finalize = gst_aggregator_finalize; g_object_class_install_property (gobject_class, PROP_LATENCY, - g_param_spec_int64 ("latency", "Buffer latency", + g_param_spec_uint64 ("latency", "Buffer latency", "Additional latency in live mode to allow upstream " "to take longer to produce buffers for the current " - "position (in nanoseconds)", 0, - (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1), + "position (in nanoseconds)", 0, G_MAXUINT64, DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, @@ -2338,7 +2270,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass) G_MAXUINT64, DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad); GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries); } @@ -2467,6 +2398,12 @@ apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) update_time_level (aggpad, head); } +/* + * Can be called either from the sinkpad's chain function or from the srcpad's + * thread in the case of a buffer synthetized from a GAP event. + * Because of this second case, FLUSH_LOCK can't be used here. + */ + static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) @@ -2476,8 +2413,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); - PAD_FLUSH_LOCK (aggpad); - PAD_LOCK (aggpad); flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) @@ -2573,15 +2508,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); - PAD_FLUSH_UNLOCK (aggpad); - GST_DEBUG_OBJECT (aggpad, "Done chaining"); return flow_return; flushing: PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", gst_flow_get_name (flow_return)); @@ -2594,8 +2526,17 @@ flushing: static GstFlowReturn gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) { - return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), - GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE); + GstFlowReturn ret; + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + PAD_FLUSH_LOCK (aggpad); + + ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), + aggpad, buffer, TRUE); + + PAD_FLUSH_UNLOCK (aggpad); + + return ret; } static gboolean @@ -2653,7 +2594,7 @@ flushing: return FALSE; } -/* Queue serialized events and let the others go though directly. +/* Queue serialized events and let the others go through directly. * The queued events with be handled from the src-pad task in * gst_aggregator_do_events_and_queries(). */ @@ -2937,6 +2878,12 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) return buffer; } +/** + * gst_aggregator_pad_is_eos: + * @pad: an aggregator pad + * + * Returns: %TRUE if the pad is EOS, otherwise %FALSE. + */ gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad) {