X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=8865255b62109e576e94c5881ad4bd3a83c8daa8;hb=78aa10b5bb55ec066adb92acd36046a6081a3e80;hp=9d3164991a38ed21291c9106651b08b2caa48df7;hpb=a7845e80f196907b31d5a6dbc326388e8c43492c;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 9d31649..8865255 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -215,6 +215,8 @@ struct _GstAggregatorPadPrivate gboolean pending_flush_stop; gboolean pending_eos; + gboolean first_buffer; + GQueue buffers; guint num_buffers; GstClockTime head_position; @@ -288,7 +290,7 @@ struct _GstAggregatorPrivate gboolean peer_latency_live; /* protected by src_lock */ GstClockTime peer_latency_min; /* protected by src_lock */ GstClockTime peer_latency_max; /* protected by src_lock */ - gboolean has_peer_latency; + gboolean has_peer_latency; /* protected by src_lock */ GstClockTime sub_latency_min; /* protected by src_lock */ GstClockTime sub_latency_max; /* protected by src_lock */ @@ -298,7 +300,7 @@ struct _GstAggregatorPrivate GMutex src_lock; GCond src_cond; - gboolean first_buffer; + gboolean first_buffer; /* protected by object lock */ GstAggregatorStartTimeSelection start_time_selection; GstClockTime start_time; @@ -311,6 +313,7 @@ typedef struct GstEvent *event; gboolean result; gboolean flush; + gboolean only_to_active_pads; gboolean one_actually_seeked; } EventData; @@ -858,7 +861,6 @@ gst_aggregator_start (GstAggregator * self) GstAggregatorClass *klass; gboolean result; - self->priv->running = TRUE; self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; self->priv->send_eos = TRUE; @@ -1086,6 +1088,8 @@ gst_aggregator_default_sink_event (GstAggregator * self, GST_OBJECT_UNLOCK (self); } + aggpad->priv->first_buffer = TRUE; + /* We never forward the event */ goto eat; } @@ -1303,23 +1307,19 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad) SRC_UNLOCK (self); } -static GstPad * -gst_aggregator_request_new_pad (GstElement * element, +static GstAggregatorPad * +gst_aggregator_default_create_new_pad (GstAggregator * self, GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) { - GstAggregator *self; GstAggregatorPad *agg_pad; - - GstElementClass *klass = GST_ELEMENT_GET_CLASS (element); - GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv; - - self = GST_AGGREGATOR (element); + GstElementClass *klass = GST_ELEMENT_GET_CLASS (self); + GstAggregatorPrivate *priv = self->priv; if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) { gint serial = 0; gchar *name = NULL; - GST_OBJECT_LOCK (element); + GST_OBJECT_LOCK (self); if (req_name == NULL || strlen (req_name) < 6 || !g_str_has_prefix (req_name, "sink_")) { /* no name given when requesting the pad, use next available int */ @@ -1336,11 +1336,30 @@ gst_aggregator_request_new_pad (GstElement * element, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); - GST_OBJECT_UNLOCK (element); + GST_OBJECT_UNLOCK (self); + return agg_pad; } else { return NULL; } +} + +static GstPad * +gst_aggregator_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) +{ + GstAggregator *self; + GstAggregatorPad *agg_pad; + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element); + GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv; + + self = GST_AGGREGATOR (element); + + agg_pad = klass->create_new_pad (self, templ, req_name, caps); + if (!agg_pad) { + GST_ERROR_OBJECT (element, "Couldn't create new pad"); + return NULL; + } GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad)); self->priv->has_peer_latency = FALSE; @@ -1401,18 +1420,6 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) else max = GST_CLOCK_TIME_NONE; - if (live && min > max) { - GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Latency too big"), - ("The requested latency value is too big for the current pipeline. " - "Limiting to %" G_GINT64_FORMAT, max)); - min = max; - /* FIXME: This could in theory become negative, but in - * that case all is lost anyway */ - self->priv->latency -= min - max; - /* FIXME: shouldn't we g_object_notify() the change here? */ - } - SRC_BROADCAST (self); GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT @@ -1554,18 +1561,22 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (peer) { - ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); - GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); - gst_object_unref (peer); + if (evdata->only_to_active_pads && aggpad->priv->first_buffer) { + GST_DEBUG_OBJECT (pad, "not sending event to inactive pad"); + ret = TRUE; + } else { + ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); + GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); + gst_object_unref (peer); + } } if (ret == FALSE) { - if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) - GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event); - if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) { GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME); + GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event); + if (gst_pad_query (peer, seeking)) { gboolean seekable; @@ -1602,7 +1613,7 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) static EventData gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, - GstEvent * event, gboolean flush) + GstEvent * event, gboolean flush, gboolean only_to_active_pads) { EventData evdata; @@ -1610,6 +1621,7 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, evdata.result = TRUE; evdata.flush = flush; evdata.one_actually_seeked = FALSE; + evdata.only_to_active_pads = only_to_active_pads; /* We first need to set all pads as flushing in a first pass * as flush_start flush_stop is sometimes sent synchronously @@ -1669,7 +1681,8 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) GST_OBJECT_UNLOCK (self); /* forward the seek upstream */ - evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush); + evdata = + gst_aggregator_forward_event_to_all_sinkpads (self, event, flush, FALSE); event = NULL; if (!evdata.result || !evdata.one_actually_seeked) { @@ -1712,7 +1725,12 @@ gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event) } } - evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE); + /* Don't forward QOS events to pads that had no active buffer yet. Otherwise + * they will receive a QOS event that has earliest_time=0 (because we can't + * have negative timestamps), and consider their buffer as too late */ + evdata = + gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE, + GST_EVENT_TYPE (event) == GST_EVENT_QOS); res = evdata.result; done: @@ -1933,6 +1951,8 @@ gst_aggregator_class_init (GstAggregatorClass * klass) klass->src_event = gst_aggregator_default_src_event; klass->src_query = gst_aggregator_default_src_query; + klass->create_new_pad = gst_aggregator_default_create_new_pad; + gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad); gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event); @@ -2132,8 +2152,11 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, buf_pts = GST_BUFFER_PTS (actual_buf); + aggpad->priv->first_buffer = FALSE; + for (;;) { SRC_LOCK (self); + GST_OBJECT_LOCK (self); PAD_LOCK (aggpad); if (gst_aggregator_pad_has_space (self, aggpad) && aggpad->priv->flow_return == GST_FLOW_OK) { @@ -2150,10 +2173,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) { + GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); goto flushing; } GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); PAD_WAIT_EVENT (aggpad); @@ -2169,6 +2194,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, start_time = 0; break; case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: + GST_OBJECT_LOCK (aggpad); if (aggpad->segment.format == GST_FORMAT_TIME) { start_time = buf_pts; if (start_time != -1) { @@ -2184,6 +2210,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, "as the segment is a %s segment instead of a time segment", gst_format_get_name (aggpad->segment.format)); } + GST_OBJECT_UNLOCK (aggpad); break; case GST_AGGREGATOR_START_TIME_SELECTION_SET: start_time = self->priv->start_time; @@ -2197,8 +2224,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, self->segment.position = start_time; else self->segment.position = MIN (start_time, self->segment.position); - self->segment.start = MIN (start_time, self->segment.start); - self->segment.time = MIN (start_time, self->segment.time); GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT, GST_TIME_ARGS (start_time)); @@ -2206,6 +2231,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, } PAD_UNLOCK (aggpad); + GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); done: @@ -2417,6 +2443,8 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) g_mutex_init (&pad->priv->flush_lock); g_mutex_init (&pad->priv->lock); + + pad->priv->first_buffer = TRUE; } /**