X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=26d48a3477187d3929c185bb5281cd0e2ef1c085;hb=3db4239aab91cc7e769d314a93c5cd966fd285ff;hp=788e64bb09007af50812c1cb197182ddfe1781f3;hpb=5f176b724b9363e2bc33bd31821dc6da1b1b1453;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 788e64b..26d48a3 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -54,6 +54,12 @@ * subclass, it should instead return GST_FLOW_EOS in its aggregate * implementation. * + * + * Note that the aggregator logic regarding gap event handling is to turn + * these into gap buffers with matching PTS and duration. It will also + * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE + * to ease their identification and subsequent processing. + * * */ @@ -65,6 +71,33 @@ #include "gstaggregator.h" +typedef enum +{ + GST_AGGREGATOR_START_TIME_SELECTION_ZERO, + GST_AGGREGATOR_START_TIME_SELECTION_FIRST, + GST_AGGREGATOR_START_TIME_SELECTION_SET +} GstAggregatorStartTimeSelection; + +static GType +gst_aggregator_start_time_selection_get_type (void) +{ + static GType gtype = 0; + + if (gtype == 0) { + static const GEnumValue values[] = { + {GST_AGGREGATOR_START_TIME_SELECTION_ZERO, + "Start at 0 running time (default)", "zero"}, + {GST_AGGREGATOR_START_TIME_SELECTION_FIRST, + "Start at first observed input running time", "first"}, + {GST_AGGREGATOR_START_TIME_SELECTION_SET, + "Set start time with start-time property", "set"}, + {0, NULL, NULL} + }; + + gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values); + } + return gtype; +} /* Might become API */ static void gst_aggregator_merge_tags (GstAggregator * aggregator, @@ -74,105 +107,132 @@ static void gst_aggregator_set_latency_property (GstAggregator * agg, static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); +/* Locking order, locks in this element must always be taken in this order + * + * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad) + * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad) + * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad) + * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST + * standard element object lock -> GST_OBJECT_LOCK(agg) + * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad) + * standard src pad object lock -> GST_OBJECT_LOCK(srcpad) + * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad) + */ + + +static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); + GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug /* GstAggregatorPad definitions */ -#define PAD_LOCK(pad) G_STMT_START { \ - GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \ +#define PAD_LOCK(pad) G_STMT_START { \ + GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \ g_thread_self()); \ - GST_OBJECT_LOCK (pad); \ - GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p", \ + g_mutex_lock(&pad->priv->lock); \ + GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p", \ g_thread_self()); \ } G_STMT_END -#define PAD_UNLOCK(pad) G_STMT_START { \ - GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p", \ - g_thread_self()); \ - GST_OBJECT_UNLOCK (pad); \ - GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p", \ +#define PAD_UNLOCK(pad) G_STMT_START { \ + GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p", \ + g_thread_self()); \ + g_mutex_unlock(&pad->priv->lock); \ + GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p", \ g_thread_self()); \ } G_STMT_END #define PAD_WAIT_EVENT(pad) G_STMT_START { \ - GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p", \ + GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p", \ g_thread_self()); \ g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \ - GST_OBJECT_GET_LOCK (pad)); \ - GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p", \ + (&((GstAggregatorPad*)pad)->priv->lock)); \ + GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \ g_thread_self()); \ } G_STMT_END #define PAD_BROADCAST_EVENT(pad) G_STMT_START { \ - GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p", \ + GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p", \ g_thread_self()); \ g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \ } G_STMT_END -#define PAD_STREAM_LOCK(pad) G_STMT_START { \ +#define PAD_FLUSH_LOCK(pad) G_STMT_START { \ GST_TRACE_OBJECT (pad, "Taking lock from thread %p", \ g_thread_self()); \ - g_mutex_lock(&pad->priv->stream_lock); \ + g_mutex_lock(&pad->priv->flush_lock); \ GST_TRACE_OBJECT (pad, "Took lock from thread %p", \ g_thread_self()); \ } G_STMT_END -#define PAD_STREAM_UNLOCK(pad) G_STMT_START { \ +#define PAD_FLUSH_UNLOCK(pad) G_STMT_START { \ GST_TRACE_OBJECT (pad, "Releasing lock from thread %p", \ g_thread_self()); \ - g_mutex_unlock(&pad->priv->stream_lock); \ + g_mutex_unlock(&pad->priv->flush_lock); \ GST_TRACE_OBJECT (pad, "Release lock from thread %p", \ g_thread_self()); \ } G_STMT_END -#define SRC_STREAM_LOCK(self) G_STMT_START { \ - GST_TRACE_OBJECT (self, "Taking src STREAM lock from thread %p", \ - g_thread_self()); \ - g_mutex_lock(&self->priv->src_lock); \ - GST_TRACE_OBJECT (self, "Took src STREAM lock from thread %p", \ - g_thread_self()); \ +#define SRC_LOCK(self) G_STMT_START { \ + GST_TRACE_OBJECT (self, "Taking src lock from thread %p", \ + g_thread_self()); \ + g_mutex_lock(&self->priv->src_lock); \ + GST_TRACE_OBJECT (self, "Took src lock from thread %p", \ + g_thread_self()); \ } G_STMT_END -#define SRC_STREAM_UNLOCK(self) G_STMT_START { \ - GST_TRACE_OBJECT (self, "Releasing src STREAM lock from thread %p", \ - g_thread_self()); \ - g_mutex_unlock(&self->priv->src_lock); \ - GST_TRACE_OBJECT (self, "Released src STREAM lock from thread %p", \ - g_thread_self()); \ +#define SRC_UNLOCK(self) G_STMT_START { \ + GST_TRACE_OBJECT (self, "Releasing src lock from thread %p", \ + g_thread_self()); \ + g_mutex_unlock(&self->priv->src_lock); \ + GST_TRACE_OBJECT (self, "Released src lock from thread %p", \ + g_thread_self()); \ } G_STMT_END -#define SRC_STREAM_WAIT(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p", \ - g_thread_self()); \ - g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock)); \ - GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p", \ - g_thread_self()); \ +#define SRC_WAIT(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Waiting for src on thread %p", \ + g_thread_self()); \ + g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock)); \ + GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p", \ + g_thread_self()); \ } G_STMT_END -#define SRC_STREAM_BROADCAST(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \ - g_thread_self()); \ - if (self->priv->aggregate_id) \ - gst_clock_id_unschedule (self->priv->aggregate_id); \ - g_cond_broadcast(&(self->priv->src_cond)); \ +#define SRC_BROADCAST(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Signaling src from thread %p", \ + g_thread_self()); \ + if (self->priv->aggregate_id) \ + gst_clock_id_unschedule (self->priv->aggregate_id); \ + g_cond_broadcast(&(self->priv->src_cond)); \ } G_STMT_END struct _GstAggregatorPadPrivate { + /* Following fields are protected by the PAD_LOCK */ + GstFlowReturn flow_return; gboolean pending_flush_start; gboolean pending_flush_stop; gboolean pending_eos; - gboolean flushing; - /* Protected by the pad lock */ - GstBuffer *buffer; + gboolean first_buffer; + + GQueue buffers; + guint num_buffers; + GstClockTime head_position; + GstClockTime tail_position; + GstClockTime head_time; + GstClockTime tail_time; + GstClockTime time_level; + gboolean eos; + GMutex lock; GCond event_cond; - - GMutex stream_lock; + /* This lock prevents a flush start processing happening while + * the chain function is also happening. + */ + GMutex flush_lock; }; static gboolean @@ -181,8 +241,18 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); PAD_LOCK (aggpad); + aggpad->priv->pending_eos = FALSE; aggpad->priv->eos = FALSE; - aggpad->priv->flushing = FALSE; + aggpad->priv->flow_return = GST_FLOW_OK; + GST_OBJECT_LOCK (aggpad); + gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED); + GST_OBJECT_UNLOCK (aggpad); + aggpad->priv->head_position = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; + aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_time = GST_CLOCK_TIME_NONE; + aggpad->priv->time_level = 0; PAD_UNLOCK (aggpad); if (klass->flush) @@ -203,7 +273,7 @@ struct _GstAggregatorPrivate gint padcount; /* Our state is >= PAUSED */ - gboolean running; /* protected by SRC_STREAM_LOCK */ + gboolean running; /* protected by src_lock */ gint seqnum; gboolean send_stream_start; /* protected by srcpad stream lock */ @@ -211,27 +281,31 @@ struct _GstAggregatorPrivate gboolean flush_seeking; gboolean pending_flush_start; gboolean send_eos; /* protected by srcpad stream lock */ - GstFlowReturn flow_return; GstCaps *srccaps; /* protected by the srcpad stream lock */ GstTagList *tags; gboolean tags_changed; - gboolean latency_live; - GstClockTime latency_min; - GstClockTime latency_max; + gboolean peer_latency_live; /* protected by src_lock */ + GstClockTime peer_latency_min; /* protected by src_lock */ + GstClockTime peer_latency_max; /* protected by src_lock */ + gboolean has_peer_latency; - GstClockTime sub_latency_min; - GstClockTime sub_latency_max; + GstClockTime sub_latency_min; /* protected by src_lock */ + GstClockTime sub_latency_max; /* protected by src_lock */ /* aggregate */ GstClockID aggregate_id; /* protected by src_lock */ GMutex src_lock; GCond src_cond; + gboolean first_buffer; + GstAggregatorStartTimeSelection start_time_selection; + GstClockTime start_time; + /* properties */ - gint64 latency; + gint64 latency; /* protected by both src_lock and all pad locks */ }; typedef struct @@ -239,19 +313,27 @@ typedef struct GstEvent *event; gboolean result; gboolean flush; + gboolean only_to_active_pads; gboolean one_actually_seeked; } EventData; -#define DEFAULT_LATENCY 0 +#define DEFAULT_LATENCY 0 +#define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO +#define DEFAULT_START_TIME (-1) enum { PROP_0, PROP_LATENCY, + PROP_START_TIME_SELECTION, + PROP_START_TIME, PROP_LAST }; +static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); + /** * gst_aggregator_iterate_sinkpads: * @self: The #GstAggregator @@ -332,6 +414,12 @@ no_iter: } static gboolean +gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) +{ + return (g_queue_peek_tail (&pad->priv->buffers) == NULL); +} + +static gboolean gst_aggregator_check_pads_ready (GstAggregator * self) { GstAggregatorPad *pad; @@ -349,7 +437,16 @@ gst_aggregator_check_pads_ready (GstAggregator * self) pad = l->data; PAD_LOCK (pad); - if (pad->priv->buffer == NULL && !pad->priv->eos) { + + /* In live mode, having a single pad with buffers is enough to + * generate a start time from it. In non-live mode all pads need + * to have a buffer + */ + if (self->priv->peer_latency_live && + !gst_aggregator_pad_queue_is_empty (pad)) + self->priv->first_buffer = FALSE; + + if (gst_aggregator_pad_queue_is_empty (pad) && !pad->priv->eos) { PAD_UNLOCK (pad); goto pad_not_ready; } @@ -357,6 +454,8 @@ gst_aggregator_check_pads_ready (GstAggregator * self) } + self->priv->first_buffer = FALSE; + GST_OBJECT_UNLOCK (self); GST_LOG_OBJECT (self, "pads are ready"); return TRUE; @@ -379,10 +478,10 @@ static void gst_aggregator_reset_flow_values (GstAggregator * self) { GST_OBJECT_LOCK (self); - self->priv->flow_return = GST_FLOW_FLUSHING; self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; gst_segment_init (&self->segment, GST_FORMAT_TIME); + self->priv->first_buffer = TRUE; GST_OBJECT_UNLOCK (self); } @@ -430,7 +529,7 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment); } - if (priv->tags && priv->tags_changed) { + if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) { tags = gst_event_new_tag (gst_tag_list_ref (priv->tags)); priv->tags_changed = FALSE; } @@ -493,9 +592,13 @@ gst_aggregator_push_eos (GstAggregator * self) GstEvent *event; gst_aggregator_push_mandatory_events (self); - self->priv->send_eos = FALSE; event = gst_event_new_eos (); + + GST_OBJECT_LOCK (self); + self->priv->send_eos = FALSE; gst_event_set_seqnum (event, self->priv->seqnum); + GST_OBJECT_UNLOCK (self); + gst_pad_push_event (self->srcpad, event); } @@ -510,45 +613,53 @@ gst_aggregator_get_next_time (GstAggregator * self) return GST_CLOCK_TIME_NONE; } -/* called with the src STREAM lock */ static gboolean gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) { - GstClockTime latency_max, latency_min; + GstClockTime latency; GstClockTime start; - gboolean live, res; + gboolean res; *timeout = FALSE; - SRC_STREAM_LOCK (self); + SRC_LOCK (self); - GST_OBJECT_LOCK (self); - gst_aggregator_get_latency_unlocked (self, &live, &latency_min, &latency_max); - GST_OBJECT_UNLOCK (self); + latency = gst_aggregator_get_latency_unlocked (self); if (gst_aggregator_check_pads_ready (self)) { GST_DEBUG_OBJECT (self, "all pads have data"); - SRC_STREAM_UNLOCK (self); + SRC_UNLOCK (self); return TRUE; } /* Before waiting, check if we're actually still running */ if (!self->priv->running || !self->priv->send_eos) { - SRC_STREAM_UNLOCK (self); + SRC_UNLOCK (self); return FALSE; } start = gst_aggregator_get_next_time (self); - if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) - || !GST_CLOCK_TIME_IS_VALID (start)) { + /* If we're not live, or if we use the running time + * of the first buffer as start time, we wait until + * all pads have buffers. + * Otherwise (i.e. if we are live!), we wait on the clock + * and if a pad does not have a buffer in time we ignore + * that pad. + */ + if (!GST_CLOCK_TIME_IS_VALID (latency) || + !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) || + !GST_CLOCK_TIME_IS_VALID (start) || + (self->priv->first_buffer + && self->priv->start_time_selection == + GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) { /* We wake up here when something happened, and below * then check if we're ready now. If we return FALSE, * we will be directly called again. */ - SRC_STREAM_WAIT (self); + SRC_WAIT (self); } else { GstClockTime base_time, time; GstClock *clock; @@ -563,34 +674,27 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) clock = GST_ELEMENT_CLOCK (self); if (clock) gst_object_ref (clock); + GST_OBJECT_UNLOCK (self); time = base_time + start; - - if (GST_CLOCK_TIME_IS_VALID (latency_min)) { - time += latency_min; - } else { - time += self->priv->latency; - } + time += latency; GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %" GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT - " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT - " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time), + " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")", + GST_TIME_ARGS (time), GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time), - GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max), - GST_TIME_ARGS (latency_min), + GST_TIME_ARGS (start), GST_TIME_ARGS (latency), GST_TIME_ARGS (gst_clock_get_time (clock))); - GST_OBJECT_UNLOCK (self); - self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time); gst_object_unref (clock); - SRC_STREAM_UNLOCK (self); + SRC_UNLOCK (self); jitter = 0; status = gst_clock_id_wait (self->priv->aggregate_id, &jitter); - SRC_STREAM_LOCK (self); + SRC_LOCK (self); if (self->priv->aggregate_id) { gst_clock_id_unref (self->priv->aggregate_id); self->priv->aggregate_id = NULL; @@ -603,18 +707,86 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) /* we timed out */ if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) { - SRC_STREAM_UNLOCK (self); + SRC_UNLOCK (self); *timeout = TRUE; return TRUE; } } res = gst_aggregator_check_pads_ready (self); - SRC_STREAM_UNLOCK (self); + SRC_UNLOCK (self); return res; } +static gboolean +check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) +{ + GstEvent *event = NULL; + GstAggregatorClass *klass = NULL; + gboolean *processed_event = user_data; + + do { + event = NULL; + + PAD_LOCK (pad); + if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { + pad->priv->pending_eos = FALSE; + pad->priv->eos = TRUE; + } + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { + event = g_queue_pop_tail (&pad->priv->buffers); + PAD_BROADCAST_EVENT (pad); + } + PAD_UNLOCK (pad); + if (event) { + if (processed_event) + *processed_event = TRUE; + if (klass == NULL) + klass = GST_AGGREGATOR_GET_CLASS (self); + + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + klass->sink_event (self, pad, event); + } + } while (event != NULL); + + return TRUE; +} + +static void +gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, + GstFlowReturn flow_return, gboolean full) +{ + GList *item; + + PAD_LOCK (aggpad); + if (flow_return == GST_FLOW_NOT_LINKED) + aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return); + else + aggpad->priv->flow_return = flow_return; + + item = g_queue_peek_head_link (&aggpad->priv->buffers); + while (item) { + GList *next = item->next; + + /* In partial flush, we do like the pad, we get rid of non-sticky events + * and EOS/SEGMENT. + */ + if (full || GST_IS_BUFFER (item->data) || + GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || + GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || + !GST_EVENT_IS_STICKY (item->data)) { + gst_mini_object_unref (item->data); + g_queue_delete_link (&aggpad->priv->buffers, item); + } + item = next; + } + aggpad->priv->num_buffers = 0; + + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); +} + static void gst_aggregator_aggregate_func (GstAggregator * self) { @@ -630,30 +802,57 @@ gst_aggregator_aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { GstFlowReturn flow_return; + gboolean processed_event = FALSE; + + gst_aggregator_iterate_sinkpads (self, check_events, NULL); if (!gst_aggregator_wait_and_check (self, &timeout)) continue; - GST_TRACE_OBJECT (self, "Actually aggregating!"); + gst_aggregator_iterate_sinkpads (self, check_events, &processed_event); + if (processed_event) + continue; + GST_TRACE_OBJECT (self, "Actually aggregating!"); flow_return = klass->aggregate (self, timeout); GST_OBJECT_LOCK (self); - if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) - priv->flow_return = GST_FLOW_OK; - else - priv->flow_return = flow_return; + if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) { + /* We don't want to set the pads to flushing, but we want to + * stop the thread, so just break here */ + GST_OBJECT_UNLOCK (self); + break; + } GST_OBJECT_UNLOCK (self); - if (flow_return == GST_FLOW_EOS) { + if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) { gst_aggregator_push_eos (self); } GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); - if (flow_return != GST_FLOW_OK) + if (flow_return != GST_FLOW_OK) { + GList *item; + + GST_OBJECT_LOCK (self); + for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + + gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE); + } + GST_OBJECT_UNLOCK (self); break; + } } + + /* Pause the task here, the only ways to get here are: + * 1) We're stopping, in which case the task is stopped anyway + * 2) We got a flow error above, in which case it might take + * some time to forward the flow return upstream and we + * would otherwise call the task function over and over + * again without doing anything + */ + gst_pad_pause_task (self->srcpad); } static gboolean @@ -667,7 +866,6 @@ gst_aggregator_start (GstAggregator * self) self->priv->send_segment = TRUE; self->priv->send_eos = TRUE; self->priv->srccaps = NULL; - self->priv->flow_return = GST_FLOW_OK; klass = GST_AGGREGATOR_GET_CLASS (self); @@ -682,7 +880,13 @@ gst_aggregator_start (GstAggregator * self) static gboolean _check_pending_flush_stop (GstAggregatorPad * pad) { - return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start); + gboolean res; + + PAD_LOCK (pad); + res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start); + PAD_UNLOCK (pad); + + return res; } static gboolean @@ -693,10 +897,10 @@ gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start) GST_INFO_OBJECT (self, "%s srcpad task", flush_start ? "Pausing" : "Stopping"); - SRC_STREAM_LOCK (self); + SRC_LOCK (self); self->priv->running = FALSE; - SRC_STREAM_BROADCAST (self); - SRC_STREAM_UNLOCK (self); + SRC_BROADCAST (self); + SRC_UNLOCK (self); if (flush_start) { res = gst_pad_push_event (self->srcpad, flush_start); @@ -762,20 +966,20 @@ static void gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) { - GstBuffer *tmpbuf; GstAggregatorPrivate *priv = self->priv; GstAggregatorPadPrivate *padpriv = aggpad->priv; - g_atomic_int_set (&aggpad->priv->flushing, TRUE); - /* Remove pad buffer and wake up the streaming thread */ - tmpbuf = gst_aggregator_pad_steal_buffer (aggpad); - gst_buffer_replace (&tmpbuf, NULL); - PAD_STREAM_LOCK (aggpad); - if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start, - TRUE, FALSE) == TRUE) { + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE); + + PAD_FLUSH_LOCK (aggpad); + PAD_LOCK (aggpad); + if (padpriv->pending_flush_start) { GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now"); - g_atomic_int_set (&padpriv->pending_flush_stop, TRUE); + + padpriv->pending_flush_start = FALSE; + padpriv->pending_flush_stop = TRUE; } + PAD_UNLOCK (aggpad); GST_OBJECT_LOCK (self); if (priv->flush_seeking) { @@ -786,7 +990,6 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); gst_aggregator_stop_srcpad_task (self, event); - priv->flow_return = GST_FLOW_OK; GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking"); GST_PAD_STREAM_LOCK (self->srcpad); @@ -800,12 +1003,45 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GST_OBJECT_UNLOCK (self); gst_event_unref (event); } - PAD_STREAM_UNLOCK (aggpad); + PAD_FLUSH_UNLOCK (aggpad); +} + +/* Must be called with the the PAD_LOCK held */ +static void +update_time_level (GstAggregatorPad * aggpad, gboolean head) +{ + if (head) { + if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) && + aggpad->clip_segment.format == GST_FORMAT_TIME) + aggpad->priv->head_time = + gst_segment_to_running_time (&aggpad->clip_segment, + GST_FORMAT_TIME, aggpad->priv->head_position); + else + aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + } else { + if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) && + aggpad->segment.format == GST_FORMAT_TIME) + aggpad->priv->tail_time = + gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, aggpad->priv->tail_position); + else + aggpad->priv->tail_time = aggpad->priv->head_time; + } + + if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE || + aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) { + aggpad->priv->time_level = 0; + return; + } - tmpbuf = gst_aggregator_pad_steal_buffer (aggpad); - gst_buffer_replace (&tmpbuf, NULL); + if (aggpad->priv->tail_time > aggpad->priv->head_time) + aggpad->priv->time_level = 0; + else + aggpad->priv->time_level = aggpad->priv->head_time - + aggpad->priv->tail_time; } + /* GstAggregator vmethods default implementations */ static gboolean gst_aggregator_default_sink_event (GstAggregator * self, @@ -838,10 +1074,10 @@ gst_aggregator_default_sink_event (GstAggregator * self, gst_aggregator_flush (self); gst_pad_push_event (self->srcpad, event); event = NULL; - SRC_STREAM_LOCK (self); + SRC_LOCK (self); priv->send_eos = TRUE; - SRC_STREAM_BROADCAST (self); - SRC_STREAM_UNLOCK (self); + SRC_BROADCAST (self); + SRC_UNLOCK (self); GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); GST_PAD_STREAM_UNLOCK (self->srcpad); @@ -853,6 +1089,8 @@ gst_aggregator_default_sink_event (GstAggregator * self, GST_OBJECT_UNLOCK (self); } + aggpad->priv->first_buffer = TRUE; + /* We never forward the event */ goto eat; } @@ -864,31 +1102,79 @@ gst_aggregator_default_sink_event (GstAggregator * self, * check for it. Mark pending_eos, eos will be set when steal_buffer is * called */ - SRC_STREAM_LOCK (self); + SRC_LOCK (self); PAD_LOCK (aggpad); - if (!aggpad->priv->buffer) { + if (gst_aggregator_pad_queue_is_empty (aggpad)) { aggpad->priv->eos = TRUE; } else { aggpad->priv->pending_eos = TRUE; } PAD_UNLOCK (aggpad); - SRC_STREAM_BROADCAST (self); - SRC_STREAM_UNLOCK (self); + SRC_BROADCAST (self); + SRC_UNLOCK (self); goto eat; } case GST_EVENT_SEGMENT: { PAD_LOCK (aggpad); + GST_OBJECT_LOCK (aggpad); gst_event_copy_segment (event, &aggpad->segment); - self->priv->seqnum = gst_event_get_seqnum (event); + update_time_level (aggpad, FALSE); + GST_OBJECT_UNLOCK (aggpad); PAD_UNLOCK (aggpad); + + GST_OBJECT_LOCK (self); + self->priv->seqnum = gst_event_get_seqnum (event); + GST_OBJECT_UNLOCK (self); goto eat; } case GST_EVENT_STREAM_START: { goto eat; } + case GST_EVENT_GAP: + { + GstClockTime pts, endpts; + GstClockTime duration; + GstBuffer *gapbuf; + + gst_event_parse_gap (event, &pts, &duration); + gapbuf = gst_buffer_new (); + + if (GST_CLOCK_TIME_IS_VALID (duration)) + endpts = pts + duration; + else + endpts = GST_CLOCK_TIME_NONE; + + GST_OBJECT_LOCK (aggpad); + res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts, + &pts, &endpts); + GST_OBJECT_UNLOCK (aggpad); + + if (!res) { + GST_WARNING_OBJECT (self, "GAP event outside segment, dropping"); + goto eat; + } + + if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts)) + duration = endpts - pts; + else + duration = GST_CLOCK_TIME_NONE; + + GST_BUFFER_PTS (gapbuf) = pts; + GST_BUFFER_DURATION (gapbuf) = duration; + GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); + GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); + + if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != + GST_FLOW_OK) { + GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); + res = FALSE; + } + + goto eat; + } case GST_EVENT_TAG: { GstTagList *tags; @@ -946,6 +1232,10 @@ gst_aggregator_stop (GstAggregator * agg) else result = TRUE; + agg->priv->has_peer_latency = FALSE; + agg->priv->peer_latency_live = FALSE; + agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE; + if (agg->priv->tags) gst_tag_list_unref (agg->priv->tags); agg->priv->tags = NULL; @@ -1005,20 +1295,17 @@ static void gst_aggregator_release_pad (GstElement * element, GstPad * pad) { GstAggregator *self = GST_AGGREGATOR (element); - GstBuffer *tmpbuf; - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GST_INFO_OBJECT (pad, "Removing pad"); - SRC_STREAM_LOCK (self); - g_atomic_int_set (&aggpad->priv->flushing, TRUE); - tmpbuf = gst_aggregator_pad_steal_buffer (aggpad); - gst_buffer_replace (&tmpbuf, NULL); + SRC_LOCK (self); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); gst_element_remove_pad (element, pad); - SRC_STREAM_BROADCAST (self); - SRC_STREAM_UNLOCK (self); + self->priv->has_peer_latency = FALSE; + SRC_BROADCAST (self); + SRC_UNLOCK (self); } static GstPad * @@ -1061,6 +1348,7 @@ gst_aggregator_request_new_pad (GstElement * element, } GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad)); + self->priv->has_peer_latency = FALSE; if (priv->running) gst_pad_set_active (GST_PAD (agg_pad), TRUE); @@ -1071,162 +1359,132 @@ gst_aggregator_request_new_pad (GstElement * element, return GST_PAD (agg_pad); } -typedef struct -{ - GstClockTime min, max; - gboolean live; -} LatencyData; +/* Must be called with SRC_LOCK held */ static gboolean -gst_aggregator_query_sink_latency_foreach (GstAggregator * self, - GstAggregatorPad * pad, gpointer user_data) +gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) { - LatencyData *data = user_data; - GstClockTime min, max; - GstQuery *query; - gboolean live, res; + gboolean query_ret, live; + GstClockTime our_latency, min, max; - query = gst_query_new_latency (); - res = gst_pad_peer_query (GST_PAD_CAST (pad), query); + query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); - if (res) { - gst_query_parse_latency (query, &live, &min, &max); - - GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT - " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max); + if (!query_ret) { + GST_WARNING_OBJECT (self, "Latency query failed"); + return FALSE; + } - if (min != GST_CLOCK_TIME_NONE && min > data->min) - data->min = min; + gst_query_parse_latency (query, &live, &min, &max); - if (max != GST_CLOCK_TIME_NONE && - ((data->max != GST_CLOCK_TIME_NONE && max < data->max) || - (data->max == GST_CLOCK_TIME_NONE))) - data->max = max; + our_latency = self->priv->latency; - data->live |= live; + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) { + GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT + ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min)); + return FALSE; } - gst_query_unref (query); + if (min > max && GST_CLOCK_TIME_IS_VALID (max)) { + GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL), + ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %" + GST_TIME_FORMAT ". Add queues or other buffering elements.", + GST_TIME_ARGS (max), GST_TIME_ARGS (min))); + return FALSE; + } - return TRUE; -} + self->priv->peer_latency_live = live; + self->priv->peer_latency_min = min; + self->priv->peer_latency_max = max; + self->priv->has_peer_latency = TRUE; -/** - * gst_aggregator_get_latency_unlocked: - * @self: a #GstAggregator - * @live: (out) (allow-none): whether @self is live - * @min_latency: (out) (allow-none): the configured minimum latency of @self - * @max_latency: (out) (allow-none): the configured maximum latency of @self - * - * Retreives the latency values reported by @self in response to the latency - * query. - * - * Typically only called by subclasses. - * - * MUST be called with the object lock held. - */ -void -gst_aggregator_get_latency_unlocked (GstAggregator * self, gboolean * live, - GstClockTime * min_latency, GstClockTime * max_latency) -{ - GstClockTime our_latency; - GstClockTime min, max; + /* add our own */ + min += our_latency; + min += self->priv->sub_latency_min; + if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) + && GST_CLOCK_TIME_IS_VALID (max)) + max += self->priv->sub_latency_max + our_latency; + else + max = GST_CLOCK_TIME_NONE; - g_return_if_fail (GST_IS_AGGREGATOR (self)); + if (live && min > max) { + GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, + ("%s", "Latency too big"), + ("The requested latency value is too big for the current pipeline. " + "Limiting to %" G_GINT64_FORMAT, max)); + min = max; + /* FIXME: This could in theory become negative, but in + * that case all is lost anyway */ + self->priv->latency -= min - max; + /* FIXME: shouldn't we g_object_notify() the change here? */ + } - /* latency_min is never GST_CLOCK_TIME_NONE by construction */ - min = self->priv->latency_min; - max = self->priv->latency_max; + SRC_BROADCAST (self); - min += self->priv->sub_latency_min; - if (GST_CLOCK_TIME_IS_VALID (max) - && GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)) - max += self->priv->sub_latency_max; + GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT + " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max); - our_latency = self->priv->latency; - if (GST_CLOCK_TIME_IS_VALID (our_latency)) { - min += our_latency; - if (GST_CLOCK_TIME_IS_VALID (max)) - max += our_latency; - } + gst_query_set_latency (query, live, min, max); - if (live) - *live = self->priv->latency_live; - if (min_latency) - *min_latency = min; - if (max_latency) - *max_latency = max; + return query_ret; } -static gboolean -gst_aggregator_query_latency (GstAggregator * self, GstQuery * query) +/* + * MUST be called with the src_lock held. + * + * See gst_aggregator_get_latency() for doc + */ +static GstClockTime +gst_aggregator_get_latency_unlocked (GstAggregator * self) { - GstClockTime our_latency; - LatencyData data; - - data.min = 0; - data.max = GST_CLOCK_TIME_NONE; - data.live = FALSE; + GstClockTime latency; - /* query upstream's latency */ - SRC_STREAM_LOCK (self); - gst_aggregator_iterate_sinkpads (self, - gst_aggregator_query_sink_latency_foreach, &data); - SRC_STREAM_UNLOCK (self); + g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0); - GST_OBJECT_LOCK (self); - our_latency = self->priv->latency; + if (!self->priv->has_peer_latency) { + GstQuery *query = gst_query_new_latency (); + gboolean ret; - if (data.live && GST_CLOCK_TIME_IS_VALID (our_latency) && - our_latency > data.max) { - GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Latency too big"), - ("The requested latency value is too big for the current pipeline. " - "Limiting to %" G_GINT64_FORMAT, data.max)); - self->priv->latency = data.max; - /* FIXME: shouldn't we g_object_notify() the change here? */ + ret = gst_aggregator_query_latency_unlocked (self, query); + gst_query_unref (query); + if (!ret) + return GST_CLOCK_TIME_NONE; } - if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (data.min))) { - GST_WARNING_OBJECT (self, "Invalid minimum latency, using 0"); - data.min = 0; - } + if (!self->priv->has_peer_latency || !self->priv->peer_latency_live) + return GST_CLOCK_TIME_NONE; - if (G_UNLIKELY (data.min > data.max)) { - GST_WARNING_OBJECT (self, "Minimum latency is greater than maximum latency " - "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT "). " - "Clamping it at the maximum latency", data.min, data.max); - data.min = data.max; - } - - self->priv->latency_live = data.live; - self->priv->latency_min = data.min; - self->priv->latency_max = data.max; + /* latency_min is never GST_CLOCK_TIME_NONE by construction */ + latency = self->priv->peer_latency_min; /* add our own */ - if (GST_CLOCK_TIME_IS_VALID (our_latency)) { - if (GST_CLOCK_TIME_IS_VALID (data.min)) - data.min += our_latency; - if (GST_CLOCK_TIME_IS_VALID (data.max)) - data.max += our_latency; - } - - if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_min) - && GST_CLOCK_TIME_IS_VALID (data.min)) - data.min += self->priv->sub_latency_min; - if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) - && GST_CLOCK_TIME_IS_VALID (data.max)) - data.max += self->priv->sub_latency_max; + latency += self->priv->latency; + latency += self->priv->sub_latency_min; - GST_OBJECT_UNLOCK (self); + return latency; +} - GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT - " max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min, - data.max); +/** + * gst_aggregator_get_latency: + * @self: a #GstAggregator + * + * Retrieves the latency values reported by @self in response to the latency + * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element + * will not wait for the clock. + * + * Typically only called by subclasses. + * + * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync + */ +GstClockTime +gst_aggregator_get_latency (GstAggregator * self) +{ + GstClockTime ret; - gst_query_set_latency (query, data.live, data.min, data.max); + SRC_LOCK (self); + ret = gst_aggregator_get_latency_unlocked (self); + SRC_UNLOCK (self); - return TRUE; + return ret; } static gboolean @@ -1245,10 +1503,14 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event) gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type, &start, &stop_type, &stop); + + GST_OBJECT_LOCK (self); gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, stop_type, stop, NULL); - self->priv->seqnum = gst_event_get_seqnum (event); + self->priv->first_buffer = FALSE; + GST_OBJECT_UNLOCK (self); + GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event); } GST_STATE_UNLOCK (element); @@ -1274,31 +1536,17 @@ gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query) gst_query_set_seeking (query, format, FALSE, 0, -1); res = TRUE; - goto discard; + break; } case GST_QUERY_LATENCY: - { - gboolean ret; - - ret = gst_aggregator_query_latency (self, query); - /* Wake up the src thread again, due to changed latencies - * or changed live-ness we might have to adjust if we wait - * on a deadline at all and how long. - * This is only to unschedule the clock id, we don't really care - * about the GCond here. - */ - SRC_STREAM_LOCK (self); - SRC_STREAM_BROADCAST (self); - SRC_STREAM_UNLOCK (self); - return ret; - } - default: + SRC_LOCK (self); + res = gst_aggregator_query_latency_unlocked (self, query); + SRC_UNLOCK (self); break; + default: + return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); } - return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); - -discard: return res; } @@ -1308,12 +1556,17 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) EventData *evdata = user_data; gboolean ret = TRUE; GstPad *peer = gst_pad_get_peer (pad); - GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv; + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (peer) { - ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); - GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); - gst_object_unref (peer); + if (evdata->only_to_active_pads && aggpad->priv->first_buffer) { + GST_DEBUG_OBJECT (pad, "not sending event to inactive pad"); + ret = TRUE; + } else { + ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); + GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); + gst_object_unref (peer); + } } if (ret == FALSE) { @@ -1342,8 +1595,10 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) } if (evdata->flush) { - padpriv->pending_flush_start = FALSE; - padpriv->pending_flush_stop = FALSE; + PAD_LOCK (aggpad); + aggpad->priv->pending_flush_start = FALSE; + aggpad->priv->pending_flush_stop = FALSE; + PAD_UNLOCK (aggpad); } } else { evdata->one_actually_seeked = TRUE; @@ -1357,7 +1612,7 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) static EventData gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, - GstEvent * event, gboolean flush) + GstEvent * event, gboolean flush, gboolean only_to_active_pads) { EventData evdata; @@ -1365,6 +1620,7 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, evdata.result = TRUE; evdata.flush = flush; evdata.one_actually_seeked = FALSE; + evdata.only_to_active_pads = only_to_active_pads; /* We first need to set all pads as flushing in a first pass * as flush_start flush_stop is sometimes sent synchronously @@ -1376,8 +1632,10 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) { GstAggregatorPad *pad = l->data; + PAD_LOCK (pad); pad->priv->pending_flush_start = TRUE; pad->priv->pending_flush_stop = FALSE; + PAD_UNLOCK (pad); } GST_OBJECT_UNLOCK (self); } @@ -1416,10 +1674,14 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, stop_type, stop, NULL); + + /* Seeking sets a position */ + self->priv->first_buffer = FALSE; GST_OBJECT_UNLOCK (self); /* forward the seek upstream */ - evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush); + evdata = + gst_aggregator_forward_event_to_all_sinkpads (self, event, flush, FALSE); event = NULL; if (!evdata.result || !evdata.one_actually_seeked) { @@ -1462,7 +1724,12 @@ gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event) } } - evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE); + /* Don't forward QOS events to pads that had no active buffer yet. Otherwise + * they will receive a QOS event that has earliest_time=0 (because we can't + * have negative timestamps), and consider their buffer as too late */ + evdata = + gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE, + GST_EVENT_TYPE (event) == GST_EVENT_QOS); res = evdata.result; done: @@ -1546,7 +1813,7 @@ gst_aggregator_finalize (GObject * object) /* * gst_aggregator_set_latency_property: * @agg: a #GstAggregator - * @latency: the new latency value. + * @latency: the new latency value (in nanoseconds). * * Sets the new latency value to @latency. This value is used to limit the * amount of time a pad waits for data to appear before considering the pad @@ -1558,22 +1825,35 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) gboolean changed; g_return_if_fail (GST_IS_AGGREGATOR (self)); + g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency)); - GST_OBJECT_LOCK (self); + SRC_LOCK (self); + changed = (self->priv->latency != latency); - if (self->priv->latency_live && self->priv->latency_max != 0 && - GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) { - GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Latency too big"), - ("The requested latency value is too big for the latency in the " - "current pipeline. Limiting to %" G_GINT64_FORMAT, - self->priv->latency_max)); - latency = self->priv->latency_max; + if (changed) { + GList *item; + + GST_OBJECT_LOCK (self); + /* First lock all the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_LOCK (aggpad); + } + + self->priv->latency = latency; + + SRC_BROADCAST (self); + + /* Now wake up the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); + } + GST_OBJECT_UNLOCK (self); } - changed = (self->priv->latency != latency); - self->priv->latency = latency; - GST_OBJECT_UNLOCK (self); + SRC_UNLOCK (self); if (changed) gst_element_post_message (GST_ELEMENT_CAST (self), @@ -1615,6 +1895,12 @@ gst_aggregator_set_property (GObject * object, guint prop_id, case PROP_LATENCY: gst_aggregator_set_latency_property (agg, g_value_get_int64 (value)); break; + case PROP_START_TIME_SELECTION: + agg->priv->start_time_selection = g_value_get_enum (value); + break; + case PROP_START_TIME: + agg->priv->start_time = g_value_get_uint64 (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1631,6 +1917,12 @@ gst_aggregator_get_property (GObject * object, guint prop_id, case PROP_LATENCY: g_value_set_int64 (value, gst_aggregator_get_latency_property (agg)); break; + case PROP_START_TIME_SELECTION: + g_value_set_enum (value, agg->priv->start_time_selection); + break; + case PROP_START_TIME: + g_value_set_uint64 (value, agg->priv->start_time); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1674,12 +1966,24 @@ gst_aggregator_class_init (GstAggregatorClass * klass) g_param_spec_int64 ("latency", "Buffer latency", "Additional latency in live mode to allow upstream " "to take longer to produce buffers for the current " - "position", 0, + "position (in nanoseconds)", 0, (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1), DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, + g_param_spec_enum ("start-time-selection", "Start Time Selection", + "Decides which start time is output", + gst_aggregator_start_time_selection_get_type (), + DEFAULT_START_TIME_SELECTION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_START_TIME, + g_param_spec_uint64 ("start-time", "Start Time", + "Start time to use if start-time-selection=set", 0, + G_MAXUINT64, + DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad); - GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_query_sink_latency_foreach); } static void @@ -1703,9 +2007,10 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) priv->padcount = -1; priv->tags_changed = FALSE; - self->priv->latency_live = FALSE; - self->priv->latency_min = self->priv->sub_latency_min = 0; - self->priv->latency_max = self->priv->sub_latency_max = GST_CLOCK_TIME_NONE; + self->priv->peer_latency_live = FALSE; + self->priv->peer_latency_min = self->priv->sub_latency_min = 0; + self->priv->peer_latency_max = self->priv->sub_latency_max = 0; + self->priv->has_peer_latency = FALSE; gst_aggregator_reset_flow_values (self); self->srcpad = gst_pad_new_from_template (pad_template, "src"); @@ -1720,6 +2025,8 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); self->priv->latency = DEFAULT_LATENCY; + self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION; + self->priv->start_time = DEFAULT_START_TIME; g_mutex_init (&self->priv->src_lock); g_cond_init (&self->priv->src_cond); @@ -1753,78 +2060,207 @@ gst_aggregator_get_type (void) return type; } +/* Must be called with SRC lock and PAD lock held */ +static gboolean +gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) +{ + /* Empty queue always has space */ + if (g_queue_get_length (&aggpad->priv->buffers) == 0) + return TRUE; + + /* We also want at least two buffers, one is being processed and one is ready + * for the next iteration when we operate in live mode. */ + if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2) + return TRUE; + + /* zero latency, if there is a buffer, it's full */ + if (self->priv->latency == 0) + return FALSE; + + /* Allow no more buffers than the latency */ + return (aggpad->priv->time_level <= self->priv->latency); +} + +/* Must be called with the PAD_LOCK held */ +static void +apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) +{ + GstClockTime timestamp; + + if (GST_BUFFER_DTS_IS_VALID (buffer)) + timestamp = GST_BUFFER_DTS (buffer); + else + timestamp = GST_BUFFER_PTS (buffer); + + if (timestamp == GST_CLOCK_TIME_NONE) { + if (head) + timestamp = aggpad->priv->head_position; + else + timestamp = aggpad->priv->tail_position; + } + + /* add duration */ + if (GST_BUFFER_DURATION_IS_VALID (buffer)) + timestamp += GST_BUFFER_DURATION (buffer); + + if (head) + aggpad->priv->head_position = timestamp; + else + aggpad->priv->tail_position = timestamp; + + update_time_level (aggpad, head); +} + static GstFlowReturn -gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) { GstBuffer *actual_buf = buffer; - GstAggregator *self = GST_AGGREGATOR (object); - GstAggregatorPrivate *priv = self->priv; - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); + GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self); GstFlowReturn flow_return; + GstClockTime buf_pts; GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); - PAD_STREAM_LOCK (aggpad); + PAD_FLUSH_LOCK (aggpad); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) + PAD_LOCK (aggpad); + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) goto flushing; - if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE) + if (aggpad->priv->pending_eos == TRUE) goto eos; - PAD_LOCK (aggpad); - while (aggpad->priv->buffer - && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) { + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) + goto flushing; + + PAD_UNLOCK (aggpad); + + if (aggclass->clip && head) { + aggclass->clip (self, aggpad, buffer, &actual_buf); + } + + if (actual_buf == NULL) { + GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function"); + goto done; + } + + buf_pts = GST_BUFFER_PTS (actual_buf); + + aggpad->priv->first_buffer = FALSE; + + for (;;) { + SRC_LOCK (self); + PAD_LOCK (aggpad); + if (gst_aggregator_pad_has_space (self, aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) { + if (head) + g_queue_push_head (&aggpad->priv->buffers, actual_buf); + else + g_queue_push_tail (&aggpad->priv->buffers, actual_buf); + apply_buffer (aggpad, actual_buf, head); + aggpad->priv->num_buffers++; + actual_buf = buffer = NULL; + SRC_BROADCAST (self); + break; + } + + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + SRC_UNLOCK (self); PAD_WAIT_EVENT (aggpad); + + PAD_UNLOCK (aggpad); } - PAD_UNLOCK (aggpad); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) - goto flushing; + if (self->priv->first_buffer) { + GstClockTime start_time; - if (aggclass->clip) { - aggclass->clip (self, aggpad, buffer, &actual_buf); + switch (self->priv->start_time_selection) { + case GST_AGGREGATOR_START_TIME_SELECTION_ZERO: + default: + start_time = 0; + break; + case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: + if (aggpad->segment.format == GST_FORMAT_TIME) { + start_time = buf_pts; + if (start_time != -1) { + start_time = MAX (start_time, aggpad->segment.start); + start_time = + gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME, + start_time); + } + } else { + start_time = 0; + GST_WARNING_OBJECT (aggpad, + "Ignoring request of selecting the first start time " + "as the segment is a %s segment instead of a time segment", + gst_format_get_name (aggpad->segment.format)); + } + break; + case GST_AGGREGATOR_START_TIME_SELECTION_SET: + start_time = self->priv->start_time; + if (start_time == -1) + start_time = 0; + break; + } + + if (start_time != -1) { + if (self->segment.position == -1) + self->segment.position = start_time; + else + self->segment.position = MIN (start_time, self->segment.position); + + GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT, + GST_TIME_ARGS (start_time)); + } } - SRC_STREAM_LOCK (self); - PAD_LOCK (aggpad); - if (aggpad->priv->buffer) - gst_buffer_unref (aggpad->priv->buffer); - aggpad->priv->buffer = actual_buf; PAD_UNLOCK (aggpad); - PAD_STREAM_UNLOCK (aggpad); + SRC_UNLOCK (self); - if (gst_aggregator_check_pads_ready (self)) - SRC_STREAM_BROADCAST (self); - SRC_STREAM_UNLOCK (self); +done: - GST_DEBUG_OBJECT (aggpad, "Done chaining"); + PAD_FLUSH_UNLOCK (aggpad); - GST_OBJECT_LOCK (self); - flow_return = priv->flow_return; - GST_OBJECT_UNLOCK (self); + GST_DEBUG_OBJECT (aggpad, "Done chaining"); return flow_return; flushing: - PAD_STREAM_UNLOCK (aggpad); + PAD_UNLOCK (aggpad); + PAD_FLUSH_UNLOCK (aggpad); - gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (aggpad, "We are flushing"); + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", + gst_flow_get_name (flow_return)); + if (buffer) + gst_buffer_unref (buffer); - return GST_FLOW_FLUSHING; + return flow_return; eos: - PAD_STREAM_UNLOCK (aggpad); + PAD_UNLOCK (aggpad); + PAD_FLUSH_UNLOCK (aggpad); gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (pad, "We are EOS already..."); + GST_DEBUG_OBJECT (aggpad, "We are EOS already..."); return GST_FLOW_EOS; } +static GstFlowReturn +gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +{ + return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), + GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE); +} + static gboolean gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) @@ -1835,27 +2271,25 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, if (GST_QUERY_IS_SERIALIZED (query)) { PAD_LOCK (aggpad); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) { - PAD_UNLOCK (aggpad); - goto flushing; - } - - while (aggpad->priv->buffer - && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) { + while (!gst_aggregator_pad_queue_is_empty (aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } - PAD_UNLOCK (aggpad); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; + + PAD_UNLOCK (aggpad); } return klass->sink_query (GST_AGGREGATOR (parent), GST_AGGREGATOR_PAD (pad), query); flushing: - GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query"); + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", + gst_flow_get_name (aggpad->priv->flow_return)); + PAD_UNLOCK (aggpad); return FALSE; } @@ -1863,36 +2297,49 @@ static gboolean gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS - && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) { + /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) { + SRC_LOCK (self); PAD_LOCK (aggpad); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - PAD_UNLOCK (aggpad); + if (aggpad->priv->flow_return != GST_FLOW_OK + && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) goto flushing; + + if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { + GST_OBJECT_LOCK (aggpad); + gst_event_copy_segment (event, &aggpad->clip_segment); + aggpad->priv->head_position = aggpad->clip_segment.position; + update_time_level (aggpad, TRUE); + GST_OBJECT_UNLOCK (aggpad); } - while (aggpad->priv->buffer - && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) { - GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); - PAD_WAIT_EVENT (aggpad); + if (!gst_aggregator_pad_queue_is_empty (aggpad) && + GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, + event); + g_queue_push_head (&aggpad->priv->buffers, event); + event = NULL; + SRC_BROADCAST (self); } PAD_UNLOCK (aggpad); - - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) - goto flushing; + SRC_UNLOCK (self); } - return klass->sink_event (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), event); + if (event) + return klass->sink_event (self, aggpad, event); + else + return TRUE; flushing: - GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event"); + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", + gst_flow_get_name (aggpad->priv->flow_return)); + PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); if (GST_EVENT_IS_STICKY (event)) gst_pad_store_sticky_event (pad, event); gst_event_unref (event); @@ -1903,17 +2350,17 @@ static gboolean gst_aggregator_pad_activate_mode_func (GstPad * pad, GstObject * parent, GstPadMode mode, gboolean active) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (active == FALSE) { - PAD_LOCK (aggpad); - g_atomic_int_set (&aggpad->priv->flushing, TRUE); - gst_buffer_replace (&aggpad->priv->buffer, NULL); - PAD_BROADCAST_EVENT (aggpad); - PAD_UNLOCK (aggpad); + SRC_LOCK (self); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); + SRC_BROADCAST (self); + SRC_UNLOCK (self); } else { PAD_LOCK (aggpad); - g_atomic_int_set (&aggpad->priv->flushing, FALSE); + aggpad->priv->flow_return = GST_FLOW_OK; PAD_BROADCAST_EVENT (aggpad); PAD_UNLOCK (aggpad); } @@ -1947,7 +2394,8 @@ gst_aggregator_pad_finalize (GObject * object) GstAggregatorPad *pad = (GstAggregatorPad *) object; g_cond_clear (&pad->priv->event_cond); - g_mutex_clear (&pad->priv->stream_lock); + g_mutex_clear (&pad->priv->flush_lock); + g_mutex_clear (&pad->priv->lock); G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object); } @@ -1956,11 +2404,8 @@ static void gst_aggregator_pad_dispose (GObject * object) { GstAggregatorPad *pad = (GstAggregatorPad *) object; - GstBuffer *buf; - buf = gst_aggregator_pad_steal_buffer (pad); - if (buf) - gst_buffer_unref (buf); + gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE); G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object); } @@ -1984,62 +2429,69 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadPrivate); - pad->priv->buffer = NULL; + g_queue_init (&pad->priv->buffers); g_cond_init (&pad->priv->event_cond); - g_mutex_init (&pad->priv->stream_lock); + g_mutex_init (&pad->priv->flush_lock); + g_mutex_init (&pad->priv->lock); + + pad->priv->first_buffer = TRUE; } /** - * gst_aggregator_pad_steal_buffer_unlocked: + * gst_aggregator_pad_steal_buffer: * @pad: the pad to get buffer from * * Steal the ref to the buffer currently queued in @pad. * - * MUST be called with the pad's object lock held. - * * Returns: (transfer full): The buffer in @pad or NULL if no buffer was * queued. You should unref the buffer after usage. */ GstBuffer * -gst_aggregator_pad_steal_buffer_unlocked (GstAggregatorPad * pad) +gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) { GstBuffer *buffer = NULL; - if (pad->priv->buffer) { + PAD_LOCK (pad); + if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) + buffer = g_queue_pop_tail (&pad->priv->buffers); + + if (buffer) { + apply_buffer (pad, buffer, FALSE); + pad->priv->num_buffers--; GST_TRACE_OBJECT (pad, "Consuming buffer"); - buffer = pad->priv->buffer; - pad->priv->buffer = NULL; - if (pad->priv->pending_eos) { + if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { pad->priv->pending_eos = FALSE; pad->priv->eos = TRUE; } PAD_BROADCAST_EVENT (pad); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } + PAD_UNLOCK (pad); return buffer; } /** - * gst_aggregator_pad_steal_buffer: - * @pad: the pad to get buffer from + * gst_aggregator_pad_drop_buffer: + * @pad: the pad where to drop any pending buffer * - * Steal the ref to the buffer currently queued in @pad. + * Drop the buffer currently queued in @pad. * - * Returns: (transfer full): The buffer in @pad or NULL if no buffer was - * queued. You should unref the buffer after usage. + * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not. */ -GstBuffer * -gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) +gboolean +gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) { - GstBuffer *buffer = NULL; + GstBuffer *buf; - PAD_LOCK (pad); - buffer = gst_aggregator_pad_steal_buffer_unlocked (pad); - PAD_UNLOCK (pad); + buf = gst_aggregator_pad_steal_buffer (pad); - return buffer; + if (buf == NULL) + return FALSE; + + gst_buffer_unref (buf); + return TRUE; } /** @@ -2056,8 +2508,14 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) GstBuffer *buffer = NULL; PAD_LOCK (pad); - if (pad->priv->buffer) - buffer = gst_buffer_ref (pad->priv->buffer); + buffer = g_queue_peek_tail (&pad->priv->buffers); + /* The tail should always be a buffer, because if it is an event, + * it will be consumed immeditaly in gst_aggregator_steal_buffer */ + + if (GST_IS_BUFFER (buffer)) + gst_buffer_ref (buffer); + else + buffer = NULL; PAD_UNLOCK (pad); return buffer; @@ -2124,15 +2582,28 @@ void gst_aggregator_set_latency (GstAggregator * self, GstClockTime min_latency, GstClockTime max_latency) { + gboolean changed = FALSE; + g_return_if_fail (GST_IS_AGGREGATOR (self)); g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency)); g_return_if_fail (max_latency >= min_latency); - GST_OBJECT_LOCK (self); - self->priv->sub_latency_min = min_latency; - self->priv->sub_latency_max = max_latency; - GST_OBJECT_UNLOCK (self); + SRC_LOCK (self); + if (self->priv->sub_latency_min != min_latency) { + self->priv->sub_latency_min = min_latency; + changed = TRUE; + } + if (self->priv->sub_latency_max != max_latency) { + self->priv->sub_latency_max = max_latency; + changed = TRUE; + } - gst_element_post_message (GST_ELEMENT_CAST (self), - gst_message_new_latency (GST_OBJECT_CAST (self))); + if (changed) + SRC_BROADCAST (self); + SRC_UNLOCK (self); + + if (changed) { + gst_element_post_message (GST_ELEMENT_CAST (self), + gst_message_new_latency (GST_OBJECT_CAST (self))); + } }