aggregator: don't leak gap buffer when out of segment
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: Base class for mixers and muxers, manages a set of input
26  *     pads and aggregates their streams
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *
35  *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36  *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
37  *    Subclasses should not overwrite those, but instead implement
38  *    #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
39  *    needed.
40  *
41  *  * When data is queued on all pads, the aggregate vmethod is called.
42  *
43  *  * One can peek at the data on any given GstAggregatorPad with the
44  *    gst_aggregator_pad_peek_buffer () method, and remove it from the pad
45  *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
46  *    has been taken with pop_buffer (), a new buffer can be queued
47  *    on that pad.
48  *
49  *  * If the subclass wishes to push a buffer downstream in its aggregate
50  *    implementation, it should do so through the
51  *    gst_aggregator_finish_buffer () method. This method will take care
52  *    of sending and ordering mandatory events such as stream start, caps
53  *    and segment.
54  *
55  *  * Same goes for EOS events, which should not be pushed directly by the
56  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
57  *    implementation.
58  *
59  *  * Note that the aggregator logic regarding gap event handling is to turn
60  *    these into gap buffers with matching PTS and duration. It will also
61  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
62  *    to ease their identification and subsequent processing.
63  *
64  *  * Subclasses must use (a subclass of) #GstAggregatorPad for both their
65  *    sink and source pads.
66  *    See gst_element_class_add_static_pad_template_with_gtype().
67  *
68  * This class used to live in gst-plugins-bad and was moved to core.
69  *
70  * Since: 1.14
71  */
72
73 /**
74  * SECTION: gstaggregatorpad
75  * @title: GstAggregatorPad
76  * @short_description: #GstPad subclass for pads managed by #GstAggregator
77  * @see_also: gstcollectpads for historical reasons.
78  *
79  * Pads managed by a #GstAggregor subclass.
80  *
81  * This class used to live in gst-plugins-bad and was moved to core.
82  *
83  * Since: 1.14
84  */
85
86 #ifdef HAVE_CONFIG_H
87 #  include "config.h"
88 #endif
89
90 #include <string.h>             /* strlen */
91
92 #include "gstaggregator.h"
93
94 typedef enum
95 {
96   GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
97   GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
98   GST_AGGREGATOR_START_TIME_SELECTION_SET
99 } GstAggregatorStartTimeSelection;
100
101 static GType
102 gst_aggregator_start_time_selection_get_type (void)
103 {
104   static GType gtype = 0;
105
106   if (gtype == 0) {
107     static const GEnumValue values[] = {
108       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
109           "Start at 0 running time (default)", "zero"},
110       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
111           "Start at first observed input running time", "first"},
112       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
113           "Set start time with start-time property", "set"},
114       {0, NULL, NULL}
115     };
116
117     gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
118   }
119   return gtype;
120 }
121
122 /*  Might become API */
123 #if 0
124 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
125     const GstTagList * tags, GstTagMergeMode mode);
126 #endif
127 static void gst_aggregator_set_latency_property (GstAggregator * agg,
128     GstClockTime latency);
129 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
130
131 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
132
133 static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
134
135 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
136 #define GST_CAT_DEFAULT aggregator_debug
137
138 /* Locking order, locks in this element must always be taken in this order
139  *
140  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
141  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
142  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
143  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
144  * standard element object lock -> GST_OBJECT_LOCK(agg)
145  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
146  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
147  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
148  */
149
150 /* GstAggregatorPad definitions */
151 #define PAD_LOCK(pad)   G_STMT_START {                                  \
152   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
153         g_thread_self());                                               \
154   g_mutex_lock(&pad->priv->lock);                                       \
155   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
156         g_thread_self());                                               \
157   } G_STMT_END
158
159 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
160   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
161       g_thread_self());                                                 \
162   g_mutex_unlock(&pad->priv->lock);                                     \
163   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
164         g_thread_self());                                               \
165   } G_STMT_END
166
167
168 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
169   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
170         g_thread_self());                                               \
171   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
172       (&((GstAggregatorPad*)pad)->priv->lock));                         \
173   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
174         g_thread_self());                                               \
175   } G_STMT_END
176
177 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
178   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
179         g_thread_self());                                              \
180   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
181   } G_STMT_END
182
183
184 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
185   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
186         g_thread_self());                                               \
187   g_mutex_lock(&pad->priv->flush_lock);                                 \
188   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
189         g_thread_self());                                               \
190   } G_STMT_END
191
192 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
193   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
194         g_thread_self());                                               \
195   g_mutex_unlock(&pad->priv->flush_lock);                               \
196   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
197         g_thread_self());                                               \
198   } G_STMT_END
199
200 #define SRC_LOCK(self)   G_STMT_START {                             \
201   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
202       g_thread_self());                                             \
203   g_mutex_lock(&self->priv->src_lock);                              \
204   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
205         g_thread_self());                                           \
206   } G_STMT_END
207
208 #define SRC_UNLOCK(self)  G_STMT_START {                            \
209   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
210         g_thread_self());                                           \
211   g_mutex_unlock(&self->priv->src_lock);                            \
212   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
213         g_thread_self());                                           \
214   } G_STMT_END
215
216 #define SRC_WAIT(self) G_STMT_START {                               \
217   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
218         g_thread_self());                                           \
219   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
220   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
221         g_thread_self());                                           \
222   } G_STMT_END
223
224 #define SRC_BROADCAST(self) G_STMT_START {                          \
225     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
226         g_thread_self());                                           \
227     if (self->priv->aggregate_id)                                   \
228       gst_clock_id_unschedule (self->priv->aggregate_id);           \
229     g_cond_broadcast(&(self->priv->src_cond));                      \
230   } G_STMT_END
231
232 struct _GstAggregatorPadPrivate
233 {
234   /* Following fields are protected by the PAD_LOCK */
235   GstFlowReturn flow_return;
236   gboolean pending_flush_start;
237   gboolean pending_flush_stop;
238
239   gboolean first_buffer;
240
241   GQueue data;                  /* buffers, events and queries */
242   GstBuffer *clipped_buffer;
243   guint num_buffers;
244
245   /* used to track fill state of queues, only used with live-src and when
246    * latency property is set to > 0 */
247   GstClockTime head_position;
248   GstClockTime tail_position;
249   GstClockTime head_time;       /* running time */
250   GstClockTime tail_time;
251   GstClockTime time_level;      /* how much head is ahead of tail */
252   GstSegment head_segment;      /* segment before the queue */
253
254   gboolean negotiated;
255
256   gboolean eos;
257
258   GMutex lock;
259   GCond event_cond;
260   /* This lock prevents a flush start processing happening while
261    * the chain function is also happening.
262    */
263   GMutex flush_lock;
264 };
265
266 /* Must be called with PAD_LOCK held */
267 static void
268 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
269 {
270   aggpad->priv->eos = FALSE;
271   aggpad->priv->flow_return = GST_FLOW_OK;
272   GST_OBJECT_LOCK (aggpad);
273   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
274   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
275   GST_OBJECT_UNLOCK (aggpad);
276   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
277   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
278   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
279   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
280   aggpad->priv->time_level = 0;
281   aggpad->priv->first_buffer = TRUE;
282 }
283
284 static gboolean
285 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
286 {
287   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
288
289   PAD_LOCK (aggpad);
290   gst_aggregator_pad_reset_unlocked (aggpad);
291   PAD_UNLOCK (aggpad);
292
293   if (klass->flush)
294     return klass->flush (aggpad, agg);
295
296   return TRUE;
297 }
298
299 /*************************************
300  * GstAggregator implementation  *
301  *************************************/
302 static GstElementClass *aggregator_parent_class = NULL;
303 static gint aggregator_private_offset = 0;
304
305 /* All members are protected by the object lock unless otherwise noted */
306
307 struct _GstAggregatorPrivate
308 {
309   gint max_padserial;
310
311   /* Our state is >= PAUSED */
312   gboolean running;             /* protected by src_lock */
313
314   /* seqnum from seek or segment,
315    * to be applied to synthetic segment/eos events */
316   gint seqnum;
317   gboolean send_stream_start;   /* protected by srcpad stream lock */
318   gboolean send_segment;
319   gboolean flush_seeking;
320   gboolean pending_flush_start;
321   gboolean send_eos;            /* protected by srcpad stream lock */
322
323   GstCaps *srccaps;             /* protected by the srcpad stream lock */
324
325   GstTagList *tags;
326   gboolean tags_changed;
327
328   gboolean peer_latency_live;   /* protected by src_lock */
329   GstClockTime peer_latency_min;        /* protected by src_lock */
330   GstClockTime peer_latency_max;        /* protected by src_lock */
331   gboolean has_peer_latency;    /* protected by src_lock */
332
333   GstClockTime sub_latency_min; /* protected by src_lock */
334   GstClockTime sub_latency_max; /* protected by src_lock */
335
336   GstClockTime upstream_latency_min;    /* protected by src_lock */
337
338   /* aggregate */
339   GstClockID aggregate_id;      /* protected by src_lock */
340   GMutex src_lock;
341   GCond src_cond;
342
343   gboolean first_buffer;        /* protected by object lock */
344   GstAggregatorStartTimeSelection start_time_selection;
345   GstClockTime start_time;
346
347   /* protected by the object lock */
348   GstQuery *allocation_query;
349   GstAllocator *allocator;
350   GstBufferPool *pool;
351   GstAllocationParams allocation_params;
352
353   /* properties */
354   gint64 latency;               /* protected by both src_lock and all pad locks */
355 };
356
357 /* Seek event forwarding helper */
358 typedef struct
359 {
360   /* parameters */
361   GstEvent *event;
362   gboolean flush;
363   gboolean only_to_active_pads;
364
365   /* results */
366   gboolean result;
367   gboolean one_actually_seeked;
368 } EventData;
369
370 #define DEFAULT_LATENCY              0
371 #define DEFAULT_MIN_UPSTREAM_LATENCY              0
372 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
373 #define DEFAULT_START_TIME           (-1)
374
375 enum
376 {
377   PROP_0,
378   PROP_LATENCY,
379   PROP_MIN_UPSTREAM_LATENCY,
380   PROP_START_TIME_SELECTION,
381   PROP_START_TIME,
382   PROP_LAST
383 };
384
385 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
386     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
387
388 static gboolean
389 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
390 {
391   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
392       pad->priv->clipped_buffer == NULL);
393 }
394
395 static gboolean
396 gst_aggregator_check_pads_ready (GstAggregator * self)
397 {
398   GstAggregatorPad *pad = NULL;
399   GList *l, *sinkpads;
400   gboolean have_buffer = TRUE;
401   gboolean have_event_or_query = FALSE;
402
403   GST_LOG_OBJECT (self, "checking pads");
404
405   GST_OBJECT_LOCK (self);
406
407   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
408   if (sinkpads == NULL)
409     goto no_sinkpads;
410
411   for (l = sinkpads; l != NULL; l = l->next) {
412     pad = l->data;
413
414     PAD_LOCK (pad);
415
416     if (pad->priv->num_buffers == 0) {
417       if (!gst_aggregator_pad_queue_is_empty (pad))
418         have_event_or_query = TRUE;
419       if (!pad->priv->eos) {
420         have_buffer = FALSE;
421
422         /* If not live we need data on all pads, so leave the loop */
423         if (!self->priv->peer_latency_live) {
424           PAD_UNLOCK (pad);
425           goto pad_not_ready;
426         }
427       }
428     } else if (self->priv->peer_latency_live) {
429       /* In live mode, having a single pad with buffers is enough to
430        * generate a start time from it. In non-live mode all pads need
431        * to have a buffer
432        */
433       self->priv->first_buffer = FALSE;
434     }
435
436     PAD_UNLOCK (pad);
437   }
438
439   if (!have_buffer && !have_event_or_query)
440     goto pad_not_ready;
441
442   if (have_buffer)
443     self->priv->first_buffer = FALSE;
444
445   GST_OBJECT_UNLOCK (self);
446   GST_LOG_OBJECT (self, "pads are ready");
447   return TRUE;
448
449 no_sinkpads:
450   {
451     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
452     GST_OBJECT_UNLOCK (self);
453     return FALSE;
454   }
455 pad_not_ready:
456   {
457     if (have_event_or_query)
458       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
459           " but waking up for serialized event");
460     else
461       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
462     GST_OBJECT_UNLOCK (self);
463     return have_event_or_query;
464   }
465 }
466
467 static void
468 gst_aggregator_reset_flow_values (GstAggregator * self)
469 {
470   GST_OBJECT_LOCK (self);
471   self->priv->send_stream_start = TRUE;
472   self->priv->send_segment = TRUE;
473   gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
474       GST_FORMAT_TIME);
475   self->priv->first_buffer = TRUE;
476   GST_OBJECT_UNLOCK (self);
477 }
478
479 static inline void
480 gst_aggregator_push_mandatory_events (GstAggregator * self)
481 {
482   GstAggregatorPrivate *priv = self->priv;
483   GstEvent *segment = NULL;
484   GstEvent *tags = NULL;
485
486   if (self->priv->send_stream_start) {
487     gchar s_id[32];
488
489     GST_INFO_OBJECT (self, "pushing stream start");
490     /* stream-start (FIXME: create id based on input ids) */
491     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
492     if (!gst_pad_push_event (GST_PAD (self->srcpad),
493             gst_event_new_stream_start (s_id))) {
494       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
495     }
496     self->priv->send_stream_start = FALSE;
497   }
498
499   if (self->priv->srccaps) {
500
501     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
502         self->priv->srccaps);
503     if (!gst_pad_push_event (GST_PAD (self->srcpad),
504             gst_event_new_caps (self->priv->srccaps))) {
505       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
506     }
507     gst_caps_unref (self->priv->srccaps);
508     self->priv->srccaps = NULL;
509   }
510
511   GST_OBJECT_LOCK (self);
512   if (self->priv->send_segment && !self->priv->flush_seeking) {
513     segment =
514         gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
515
516     if (!self->priv->seqnum)
517       /* This code-path is in preparation to be able to run without a source
518        * connected. Then we won't have a seq-num from a segment event. */
519       self->priv->seqnum = gst_event_get_seqnum (segment);
520     else
521       gst_event_set_seqnum (segment, self->priv->seqnum);
522     self->priv->send_segment = FALSE;
523
524     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
525   }
526
527   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
528     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
529     priv->tags_changed = FALSE;
530   }
531   GST_OBJECT_UNLOCK (self);
532
533   if (segment)
534     gst_pad_push_event (self->srcpad, segment);
535   if (tags)
536     gst_pad_push_event (self->srcpad, tags);
537
538 }
539
540 /**
541  * gst_aggregator_set_src_caps:
542  * @self: The #GstAggregator
543  * @caps: The #GstCaps to set on the src pad.
544  *
545  * Sets the caps to be used on the src pad.
546  */
547 void
548 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
549 {
550   GST_PAD_STREAM_LOCK (self->srcpad);
551   gst_caps_replace (&self->priv->srccaps, caps);
552   gst_aggregator_push_mandatory_events (self);
553   GST_PAD_STREAM_UNLOCK (self->srcpad);
554 }
555
556 static GstFlowReturn
557 gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
558 {
559   gst_aggregator_push_mandatory_events (self);
560
561   GST_OBJECT_LOCK (self);
562   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
563     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
564     GST_OBJECT_UNLOCK (self);
565     return gst_pad_push (self->srcpad, buffer);
566   } else {
567     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
568         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
569     GST_OBJECT_UNLOCK (self);
570     gst_buffer_unref (buffer);
571     return GST_FLOW_OK;
572   }
573 }
574
575 /**
576  * gst_aggregator_finish_buffer:
577  * @aggregator: The #GstAggregator
578  * @buffer: (transfer full): the #GstBuffer to push.
579  *
580  * This method will push the provided output buffer downstream. If needed,
581  * mandatory events such as stream-start, caps, and segment events will be
582  * sent before pushing the buffer.
583  */
584 GstFlowReturn
585 gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
586 {
587   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
588
589   g_assert (klass->finish_buffer != NULL);
590
591   return klass->finish_buffer (aggregator, buffer);
592 }
593
594 static void
595 gst_aggregator_push_eos (GstAggregator * self)
596 {
597   GstEvent *event;
598   gst_aggregator_push_mandatory_events (self);
599
600   event = gst_event_new_eos ();
601
602   GST_OBJECT_LOCK (self);
603   self->priv->send_eos = FALSE;
604   gst_event_set_seqnum (event, self->priv->seqnum);
605   GST_OBJECT_UNLOCK (self);
606
607   gst_pad_push_event (self->srcpad, event);
608 }
609
610 static GstClockTime
611 gst_aggregator_get_next_time (GstAggregator * self)
612 {
613   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
614
615   if (klass->get_next_time)
616     return klass->get_next_time (self);
617
618   return GST_CLOCK_TIME_NONE;
619 }
620
621 static gboolean
622 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
623 {
624   GstClockTime latency;
625   GstClockTime start;
626   gboolean res;
627
628   *timeout = FALSE;
629
630   SRC_LOCK (self);
631
632   latency = gst_aggregator_get_latency_unlocked (self);
633
634   if (gst_aggregator_check_pads_ready (self)) {
635     GST_DEBUG_OBJECT (self, "all pads have data");
636     SRC_UNLOCK (self);
637
638     return TRUE;
639   }
640
641   /* Before waiting, check if we're actually still running */
642   if (!self->priv->running || !self->priv->send_eos) {
643     SRC_UNLOCK (self);
644
645     return FALSE;
646   }
647
648   start = gst_aggregator_get_next_time (self);
649
650   /* If we're not live, or if we use the running time
651    * of the first buffer as start time, we wait until
652    * all pads have buffers.
653    * Otherwise (i.e. if we are live!), we wait on the clock
654    * and if a pad does not have a buffer in time we ignore
655    * that pad.
656    */
657   GST_OBJECT_LOCK (self);
658   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
659       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
660       !GST_CLOCK_TIME_IS_VALID (start) ||
661       (self->priv->first_buffer
662           && self->priv->start_time_selection ==
663           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
664     /* We wake up here when something happened, and below
665      * then check if we're ready now. If we return FALSE,
666      * we will be directly called again.
667      */
668     GST_OBJECT_UNLOCK (self);
669     SRC_WAIT (self);
670   } else {
671     GstClockTime base_time, time;
672     GstClock *clock;
673     GstClockReturn status;
674     GstClockTimeDiff jitter;
675
676     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
677         GST_TIME_ARGS (start));
678
679     base_time = GST_ELEMENT_CAST (self)->base_time;
680     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
681     GST_OBJECT_UNLOCK (self);
682
683     time = base_time + start;
684     time += latency;
685
686     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
687         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
688         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
689         GST_TIME_ARGS (time),
690         GST_TIME_ARGS (base_time),
691         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
692         GST_TIME_ARGS (gst_clock_get_time (clock)));
693
694     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
695     gst_object_unref (clock);
696     SRC_UNLOCK (self);
697
698     jitter = 0;
699     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
700
701     SRC_LOCK (self);
702     if (self->priv->aggregate_id) {
703       gst_clock_id_unref (self->priv->aggregate_id);
704       self->priv->aggregate_id = NULL;
705     }
706
707     GST_DEBUG_OBJECT (self,
708         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
709         status, GST_STIME_ARGS (jitter));
710
711     /* we timed out */
712     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
713       SRC_UNLOCK (self);
714       *timeout = TRUE;
715       return TRUE;
716     }
717   }
718
719   res = gst_aggregator_check_pads_ready (self);
720   SRC_UNLOCK (self);
721
722   return res;
723 }
724
725 typedef struct
726 {
727   gboolean processed_event;
728   GstFlowReturn flow_ret;
729 } DoHandleEventsAndQueriesData;
730
731 static gboolean
732 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
733     gpointer user_data)
734 {
735   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
736   GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
737   GstEvent *event = NULL;
738   GstQuery *query = NULL;
739   GstAggregatorClass *klass = NULL;
740   DoHandleEventsAndQueriesData *data = user_data;
741
742   do {
743     event = NULL;
744     query = NULL;
745
746     PAD_LOCK (pad);
747     if (pad->priv->clipped_buffer == NULL &&
748         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
749       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
750         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
751       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
752         query = g_queue_peek_tail (&pad->priv->data);
753     }
754     PAD_UNLOCK (pad);
755     if (event || query) {
756       gboolean ret;
757
758       data->processed_event = TRUE;
759       if (klass == NULL)
760         klass = GST_AGGREGATOR_GET_CLASS (self);
761
762       if (event) {
763         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
764         gst_event_ref (event);
765         ret = klass->sink_event (aggregator, pad, event);
766
767         PAD_LOCK (pad);
768         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
769           pad->priv->negotiated = ret;
770           if (!ret)
771             pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED;
772         }
773         if (g_queue_peek_tail (&pad->priv->data) == event)
774           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
775         gst_event_unref (event);
776       } else if (query) {
777         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
778         ret = klass->sink_query (aggregator, pad, query);
779
780         PAD_LOCK (pad);
781         if (g_queue_peek_tail (&pad->priv->data) == query) {
782           GstStructure *s;
783
784           s = gst_query_writable_structure (query);
785           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
786               NULL);
787           g_queue_pop_tail (&pad->priv->data);
788         }
789       }
790
791       PAD_BROADCAST_EVENT (pad);
792       PAD_UNLOCK (pad);
793     }
794   } while (event || query);
795
796   return TRUE;
797 }
798
799 static gboolean
800 gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
801     gpointer user_data)
802 {
803   GList *item;
804   GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
805   GstAggregator *agg = (GstAggregator *) self;
806   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
807
808   if (!klass->skip_buffer)
809     return FALSE;
810
811   PAD_LOCK (aggpad);
812
813   item = g_queue_peek_head_link (&aggpad->priv->data);
814   while (item) {
815     GList *next = item->next;
816
817     if (GST_IS_BUFFER (item->data)
818         && klass->skip_buffer (aggpad, agg, item->data)) {
819       GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
820       gst_aggregator_pad_buffer_consumed (aggpad);
821       gst_buffer_unref (item->data);
822       g_queue_delete_link (&aggpad->priv->data, item);
823     } else {
824       break;
825     }
826
827     item = next;
828   }
829
830   PAD_UNLOCK (aggpad);
831
832   return TRUE;
833 }
834
835 static void
836 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
837     GstFlowReturn flow_return, gboolean full)
838 {
839   GList *item;
840
841   PAD_LOCK (aggpad);
842   if (flow_return == GST_FLOW_NOT_LINKED)
843     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
844   else
845     aggpad->priv->flow_return = flow_return;
846
847   item = g_queue_peek_head_link (&aggpad->priv->data);
848   while (item) {
849     GList *next = item->next;
850
851     /* In partial flush, we do like the pad, we get rid of non-sticky events
852      * and EOS/SEGMENT.
853      */
854     if (full || GST_IS_BUFFER (item->data) ||
855         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
856         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
857         !GST_EVENT_IS_STICKY (item->data)) {
858       if (!GST_IS_QUERY (item->data))
859         gst_mini_object_unref (item->data);
860       g_queue_delete_link (&aggpad->priv->data, item);
861     }
862     item = next;
863   }
864   aggpad->priv->num_buffers = 0;
865   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
866
867   PAD_BROADCAST_EVENT (aggpad);
868   PAD_UNLOCK (aggpad);
869 }
870
871 static GstFlowReturn
872 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
873     GstCaps ** ret)
874 {
875   *ret = gst_caps_ref (caps);
876
877   return GST_FLOW_OK;
878 }
879
880 static GstCaps *
881 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
882 {
883   caps = gst_caps_fixate (caps);
884
885   return caps;
886 }
887
888 static gboolean
889 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
890 {
891   return TRUE;
892 }
893
894
895 /* takes ownership of the pool, allocator and query */
896 static gboolean
897 gst_aggregator_set_allocation (GstAggregator * self,
898     GstBufferPool * pool, GstAllocator * allocator,
899     GstAllocationParams * params, GstQuery * query)
900 {
901   GstAllocator *oldalloc;
902   GstBufferPool *oldpool;
903   GstQuery *oldquery;
904
905   GST_DEBUG ("storing allocation query");
906
907   GST_OBJECT_LOCK (self);
908   oldpool = self->priv->pool;
909   self->priv->pool = pool;
910
911   oldalloc = self->priv->allocator;
912   self->priv->allocator = allocator;
913
914   oldquery = self->priv->allocation_query;
915   self->priv->allocation_query = query;
916
917   if (params)
918     self->priv->allocation_params = *params;
919   else
920     gst_allocation_params_init (&self->priv->allocation_params);
921   GST_OBJECT_UNLOCK (self);
922
923   if (oldpool) {
924     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
925     gst_buffer_pool_set_active (oldpool, FALSE);
926     gst_object_unref (oldpool);
927   }
928   if (oldalloc) {
929     gst_object_unref (oldalloc);
930   }
931   if (oldquery) {
932     gst_query_unref (oldquery);
933   }
934   return TRUE;
935 }
936
937
938 static gboolean
939 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
940 {
941   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
942
943   if (aggclass->decide_allocation)
944     if (!aggclass->decide_allocation (self, query))
945       return FALSE;
946
947   return TRUE;
948 }
949
950 static gboolean
951 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
952 {
953   GstQuery *query;
954   gboolean result = TRUE;
955   GstBufferPool *pool = NULL;
956   GstAllocator *allocator;
957   GstAllocationParams params;
958
959   /* find a pool for the negotiated caps now */
960   GST_DEBUG_OBJECT (self, "doing allocation query");
961   query = gst_query_new_allocation (caps, TRUE);
962   if (!gst_pad_peer_query (self->srcpad, query)) {
963     /* not a problem, just debug a little */
964     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
965   }
966
967   GST_DEBUG_OBJECT (self, "calling decide_allocation");
968   result = gst_aggregator_decide_allocation (self, query);
969
970   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
971       query);
972
973   if (!result)
974     goto no_decide_allocation;
975
976   /* we got configuration from our peer or the decide_allocation method,
977    * parse them */
978   if (gst_query_get_n_allocation_params (query) > 0) {
979     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
980   } else {
981     allocator = NULL;
982     gst_allocation_params_init (&params);
983   }
984
985   if (gst_query_get_n_allocation_pools (query) > 0)
986     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
987
988   /* now store */
989   result =
990       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
991
992   return result;
993
994   /* Errors */
995 no_decide_allocation:
996   {
997     GST_WARNING_OBJECT (self, "Failed to decide allocation");
998     gst_query_unref (query);
999
1000     return result;
1001   }
1002
1003 }
1004
1005 /* WITH SRC_LOCK held */
1006 static GstFlowReturn
1007 gst_aggregator_update_src_caps (GstAggregator * self)
1008 {
1009   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1010   GstCaps *downstream_caps, *template_caps, *caps = NULL;
1011   GstFlowReturn ret = GST_FLOW_OK;
1012
1013   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1014   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1015
1016   if (gst_caps_is_empty (downstream_caps)) {
1017     GST_INFO_OBJECT (self, "Downstream caps (%"
1018         GST_PTR_FORMAT ") not compatible with pad template caps (%"
1019         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1020     ret = GST_FLOW_NOT_NEGOTIATED;
1021     goto done;
1022   }
1023
1024   g_assert (agg_klass->update_src_caps);
1025   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1026       downstream_caps);
1027   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1028   if (ret < GST_FLOW_OK) {
1029     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1030     goto done;
1031   }
1032   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1033     ret = GST_FLOW_NOT_NEGOTIATED;
1034     goto done;
1035   }
1036   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1037
1038 #ifdef GST_ENABLE_EXTRA_CHECKS
1039   if (!gst_caps_is_subset (caps, template_caps)) {
1040     GstCaps *intersection;
1041
1042     GST_ERROR_OBJECT (self,
1043         "update_src_caps returned caps %" GST_PTR_FORMAT
1044         " which are not a real subset of the template caps %"
1045         GST_PTR_FORMAT, caps, template_caps);
1046     g_warning ("%s: update_src_caps returned caps which are not a real "
1047         "subset of the filter caps", GST_ELEMENT_NAME (self));
1048
1049     intersection =
1050         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1051     gst_caps_unref (caps);
1052     caps = intersection;
1053   }
1054 #endif
1055
1056   if (gst_caps_is_any (caps)) {
1057     goto done;
1058   }
1059
1060   if (!gst_caps_is_fixed (caps)) {
1061     g_assert (agg_klass->fixate_src_caps);
1062
1063     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1064     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1065       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1066       ret = GST_FLOW_NOT_NEGOTIATED;
1067       goto done;
1068     }
1069     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1070   }
1071
1072   if (agg_klass->negotiated_src_caps) {
1073     if (!agg_klass->negotiated_src_caps (self, caps)) {
1074       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1075       ret = GST_FLOW_NOT_NEGOTIATED;
1076       goto done;
1077     }
1078   }
1079
1080   gst_aggregator_set_src_caps (self, caps);
1081
1082   if (!gst_aggregator_do_allocation (self, caps)) {
1083     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1084     ret = GST_FLOW_NOT_NEGOTIATED;
1085   }
1086
1087 done:
1088   gst_caps_unref (downstream_caps);
1089   gst_caps_unref (template_caps);
1090
1091   if (caps)
1092     gst_caps_unref (caps);
1093
1094   return ret;
1095 }
1096
1097 static void
1098 gst_aggregator_aggregate_func (GstAggregator * self)
1099 {
1100   GstAggregatorPrivate *priv = self->priv;
1101   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1102   gboolean timeout = FALSE;
1103
1104   if (self->priv->running == FALSE) {
1105     GST_DEBUG_OBJECT (self, "Not running anymore");
1106     return;
1107   }
1108
1109   GST_LOG_OBJECT (self, "Checking aggregate");
1110   while (priv->send_eos && priv->running) {
1111     GstFlowReturn flow_return = GST_FLOW_OK;
1112     DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
1113
1114     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1115         gst_aggregator_do_events_and_queries, &events_query_data);
1116
1117     if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1118       goto handle_error;
1119
1120     if (self->priv->peer_latency_live)
1121       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1122           gst_aggregator_pad_skip_buffers, NULL);
1123
1124     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
1125      * the queue */
1126     if (!gst_aggregator_wait_and_check (self, &timeout))
1127       continue;
1128
1129     events_query_data.processed_event = FALSE;
1130     events_query_data.flow_ret = GST_FLOW_OK;
1131     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1132         gst_aggregator_do_events_and_queries, &events_query_data);
1133
1134     if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1135       goto handle_error;
1136
1137     if (events_query_data.processed_event)
1138       continue;
1139
1140     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1141       flow_return = gst_aggregator_update_src_caps (self);
1142       if (flow_return != GST_FLOW_OK)
1143         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1144     }
1145
1146     if (timeout || flow_return >= GST_FLOW_OK) {
1147       GST_TRACE_OBJECT (self, "Actually aggregating!");
1148       flow_return = klass->aggregate (self, timeout);
1149     }
1150
1151     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1152       continue;
1153
1154     GST_OBJECT_LOCK (self);
1155     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
1156       /* We don't want to set the pads to flushing, but we want to
1157        * stop the thread, so just break here */
1158       GST_OBJECT_UNLOCK (self);
1159       break;
1160     }
1161     GST_OBJECT_UNLOCK (self);
1162
1163     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1164       gst_aggregator_push_eos (self);
1165     }
1166
1167   handle_error:
1168     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1169
1170     if (flow_return != GST_FLOW_OK) {
1171       GList *item;
1172
1173       GST_OBJECT_LOCK (self);
1174       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1175         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1176
1177         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1178       }
1179       GST_OBJECT_UNLOCK (self);
1180       break;
1181     }
1182   }
1183
1184   /* Pause the task here, the only ways to get here are:
1185    * 1) We're stopping, in which case the task is stopped anyway
1186    * 2) We got a flow error above, in which case it might take
1187    *    some time to forward the flow return upstream and we
1188    *    would otherwise call the task function over and over
1189    *    again without doing anything
1190    */
1191   gst_pad_pause_task (self->srcpad);
1192 }
1193
1194 static gboolean
1195 gst_aggregator_start (GstAggregator * self)
1196 {
1197   GstAggregatorClass *klass;
1198   gboolean result;
1199
1200   self->priv->send_stream_start = TRUE;
1201   self->priv->send_segment = TRUE;
1202   self->priv->send_eos = TRUE;
1203   self->priv->srccaps = NULL;
1204
1205   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1206
1207   klass = GST_AGGREGATOR_GET_CLASS (self);
1208
1209   if (klass->start)
1210     result = klass->start (self);
1211   else
1212     result = TRUE;
1213
1214   return result;
1215 }
1216
1217 static gboolean
1218 _check_pending_flush_stop (GstAggregatorPad * pad)
1219 {
1220   gboolean res;
1221
1222   PAD_LOCK (pad);
1223   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1224   PAD_UNLOCK (pad);
1225
1226   return res;
1227 }
1228
1229 static gboolean
1230 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1231 {
1232   gboolean res = TRUE;
1233
1234   GST_INFO_OBJECT (self, "%s srcpad task",
1235       flush_start ? "Pausing" : "Stopping");
1236
1237   SRC_LOCK (self);
1238   self->priv->running = FALSE;
1239   SRC_BROADCAST (self);
1240   SRC_UNLOCK (self);
1241
1242   if (flush_start) {
1243     res = gst_pad_push_event (self->srcpad, flush_start);
1244   }
1245
1246   gst_pad_stop_task (self->srcpad);
1247
1248   return res;
1249 }
1250
1251 static void
1252 gst_aggregator_start_srcpad_task (GstAggregator * self)
1253 {
1254   GST_INFO_OBJECT (self, "Starting srcpad task");
1255
1256   self->priv->running = TRUE;
1257   gst_pad_start_task (GST_PAD (self->srcpad),
1258       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1259 }
1260
1261 static GstFlowReturn
1262 gst_aggregator_flush (GstAggregator * self)
1263 {
1264   GstFlowReturn ret = GST_FLOW_OK;
1265   GstAggregatorPrivate *priv = self->priv;
1266   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1267
1268   GST_DEBUG_OBJECT (self, "Flushing everything");
1269   GST_OBJECT_LOCK (self);
1270   priv->send_segment = TRUE;
1271   priv->flush_seeking = FALSE;
1272   priv->tags_changed = FALSE;
1273   GST_OBJECT_UNLOCK (self);
1274   if (klass->flush)
1275     ret = klass->flush (self);
1276
1277   return ret;
1278 }
1279
1280
1281 /* Called with GstAggregator's object lock held */
1282
1283 static gboolean
1284 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1285 {
1286   GList *tmp;
1287   GstAggregatorPad *tmppad;
1288
1289   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1290     tmppad = (GstAggregatorPad *) tmp->data;
1291
1292     if (_check_pending_flush_stop (tmppad) == FALSE) {
1293       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1294           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1295       return FALSE;
1296     }
1297   }
1298
1299   return TRUE;
1300 }
1301
1302 static void
1303 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1304     GstEvent * event)
1305 {
1306   GstAggregatorPrivate *priv = self->priv;
1307   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1308
1309   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1310
1311   PAD_FLUSH_LOCK (aggpad);
1312   PAD_LOCK (aggpad);
1313   if (padpriv->pending_flush_start) {
1314     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1315
1316     padpriv->pending_flush_start = FALSE;
1317     padpriv->pending_flush_stop = TRUE;
1318   }
1319   PAD_UNLOCK (aggpad);
1320
1321   GST_OBJECT_LOCK (self);
1322   if (priv->flush_seeking) {
1323     /* If flush_seeking we forward the first FLUSH_START */
1324     if (priv->pending_flush_start) {
1325       priv->pending_flush_start = FALSE;
1326       GST_OBJECT_UNLOCK (self);
1327
1328       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1329       gst_aggregator_stop_srcpad_task (self, event);
1330
1331       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1332       GST_PAD_STREAM_LOCK (self->srcpad);
1333       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1334       event = NULL;
1335     } else {
1336       GST_OBJECT_UNLOCK (self);
1337       gst_event_unref (event);
1338     }
1339   } else {
1340     GST_OBJECT_UNLOCK (self);
1341     gst_event_unref (event);
1342   }
1343   PAD_FLUSH_UNLOCK (aggpad);
1344 }
1345
1346 /* Must be called with the the PAD_LOCK held */
1347 static void
1348 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1349 {
1350   GstAggregatorPadPrivate *priv = aggpad->priv;
1351
1352   if (head) {
1353     if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
1354         priv->head_segment.format == GST_FORMAT_TIME)
1355       priv->head_time = gst_segment_to_running_time (&priv->head_segment,
1356           GST_FORMAT_TIME, priv->head_position);
1357     else
1358       priv->head_time = GST_CLOCK_TIME_NONE;
1359
1360     if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
1361       priv->tail_time = priv->head_time;
1362   } else {
1363     if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
1364         aggpad->segment.format == GST_FORMAT_TIME)
1365       priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
1366           GST_FORMAT_TIME, priv->tail_position);
1367     else
1368       priv->tail_time = priv->head_time;
1369   }
1370
1371   if (priv->head_time == GST_CLOCK_TIME_NONE ||
1372       priv->tail_time == GST_CLOCK_TIME_NONE) {
1373     priv->time_level = 0;
1374     return;
1375   }
1376
1377   if (priv->tail_time > priv->head_time)
1378     priv->time_level = 0;
1379   else
1380     priv->time_level = priv->head_time - priv->tail_time;
1381 }
1382
1383
1384 /* GstAggregator vmethods default implementations */
1385 static gboolean
1386 gst_aggregator_default_sink_event (GstAggregator * self,
1387     GstAggregatorPad * aggpad, GstEvent * event)
1388 {
1389   gboolean res = TRUE;
1390   GstPad *pad = GST_PAD (aggpad);
1391   GstAggregatorPrivate *priv = self->priv;
1392
1393   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1394
1395   switch (GST_EVENT_TYPE (event)) {
1396     case GST_EVENT_FLUSH_START:
1397     {
1398       gst_aggregator_flush_start (self, aggpad, event);
1399       /* We forward only in one case: right after flush_seeking */
1400       event = NULL;
1401       goto eat;
1402     }
1403     case GST_EVENT_FLUSH_STOP:
1404     {
1405       gst_aggregator_pad_flush (aggpad, self);
1406       GST_OBJECT_LOCK (self);
1407       if (priv->flush_seeking) {
1408         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1409         if (gst_aggregator_all_flush_stop_received_locked (self)) {
1410           GST_OBJECT_UNLOCK (self);
1411           /* That means we received FLUSH_STOP/FLUSH_STOP on
1412            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1413           gst_aggregator_flush (self);
1414           gst_pad_push_event (self->srcpad, event);
1415           event = NULL;
1416           SRC_LOCK (self);
1417           priv->send_eos = TRUE;
1418           SRC_BROADCAST (self);
1419           SRC_UNLOCK (self);
1420
1421           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1422           GST_PAD_STREAM_UNLOCK (self->srcpad);
1423           gst_aggregator_start_srcpad_task (self);
1424         } else {
1425           GST_OBJECT_UNLOCK (self);
1426         }
1427       } else {
1428         GST_OBJECT_UNLOCK (self);
1429       }
1430
1431       /* We never forward the event */
1432       goto eat;
1433     }
1434     case GST_EVENT_EOS:
1435     {
1436       SRC_LOCK (self);
1437       PAD_LOCK (aggpad);
1438       g_assert (aggpad->priv->num_buffers == 0);
1439       aggpad->priv->eos = TRUE;
1440       PAD_UNLOCK (aggpad);
1441       SRC_BROADCAST (self);
1442       SRC_UNLOCK (self);
1443       goto eat;
1444     }
1445     case GST_EVENT_SEGMENT:
1446     {
1447       PAD_LOCK (aggpad);
1448       GST_OBJECT_LOCK (aggpad);
1449       gst_event_copy_segment (event, &aggpad->segment);
1450       /* We've got a new segment, tail_position is now meaningless
1451        * and may interfere with the time_level calculation
1452        */
1453       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1454       update_time_level (aggpad, FALSE);
1455       GST_OBJECT_UNLOCK (aggpad);
1456       PAD_UNLOCK (aggpad);
1457
1458       GST_OBJECT_LOCK (self);
1459       self->priv->seqnum = gst_event_get_seqnum (event);
1460       GST_OBJECT_UNLOCK (self);
1461       goto eat;
1462     }
1463     case GST_EVENT_STREAM_START:
1464     {
1465       goto eat;
1466     }
1467     case GST_EVENT_GAP:
1468     {
1469       GstClockTime pts, endpts;
1470       GstClockTime duration;
1471       GstBuffer *gapbuf;
1472
1473       gst_event_parse_gap (event, &pts, &duration);
1474
1475       if (GST_CLOCK_TIME_IS_VALID (duration))
1476         endpts = pts + duration;
1477       else
1478         endpts = GST_CLOCK_TIME_NONE;
1479
1480       GST_OBJECT_LOCK (aggpad);
1481       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1482           &pts, &endpts);
1483       GST_OBJECT_UNLOCK (aggpad);
1484
1485       if (!res) {
1486         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1487         goto eat;
1488       }
1489
1490       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1491         duration = endpts - pts;
1492       else
1493         duration = GST_CLOCK_TIME_NONE;
1494
1495       gapbuf = gst_buffer_new ();
1496       GST_BUFFER_PTS (gapbuf) = pts;
1497       GST_BUFFER_DURATION (gapbuf) = duration;
1498       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1499       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1500
1501       /* Remove GAP event so we can replace it with the buffer */
1502       PAD_LOCK (aggpad);
1503       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1504         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1505       PAD_UNLOCK (aggpad);
1506
1507       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1508           GST_FLOW_OK) {
1509         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1510         res = FALSE;
1511       }
1512
1513       goto eat;
1514     }
1515     case GST_EVENT_TAG:
1516       goto eat;
1517     default:
1518     {
1519       break;
1520     }
1521   }
1522
1523   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1524   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1525
1526 eat:
1527   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1528   if (event)
1529     gst_event_unref (event);
1530
1531   return res;
1532 }
1533
1534 static gboolean
1535 gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
1536 {
1537   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
1538   GstAggregator *agg = GST_AGGREGATOR_CAST (self);
1539
1540   gst_aggregator_pad_flush (pad, agg);
1541
1542   PAD_LOCK (pad);
1543   pad->priv->flow_return = GST_FLOW_FLUSHING;
1544   pad->priv->negotiated = FALSE;
1545   PAD_BROADCAST_EVENT (pad);
1546   PAD_UNLOCK (pad);
1547
1548   return TRUE;
1549 }
1550
1551 static gboolean
1552 gst_aggregator_stop (GstAggregator * agg)
1553 {
1554   GstAggregatorClass *klass;
1555   gboolean result;
1556
1557   gst_aggregator_reset_flow_values (agg);
1558
1559   /* Application needs to make sure no pads are added while it shuts us down */
1560   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
1561       gst_aggregator_stop_pad, NULL);
1562
1563   klass = GST_AGGREGATOR_GET_CLASS (agg);
1564
1565   if (klass->stop)
1566     result = klass->stop (agg);
1567   else
1568     result = TRUE;
1569
1570   agg->priv->has_peer_latency = FALSE;
1571   agg->priv->peer_latency_live = FALSE;
1572   agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
1573
1574   if (agg->priv->tags)
1575     gst_tag_list_unref (agg->priv->tags);
1576   agg->priv->tags = NULL;
1577
1578   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1579
1580   return result;
1581 }
1582
1583 /* GstElement vmethods implementations */
1584 static GstStateChangeReturn
1585 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1586 {
1587   GstStateChangeReturn ret;
1588   GstAggregator *self = GST_AGGREGATOR (element);
1589
1590   switch (transition) {
1591     case GST_STATE_CHANGE_READY_TO_PAUSED:
1592       if (!gst_aggregator_start (self))
1593         goto error_start;
1594       break;
1595     default:
1596       break;
1597   }
1598
1599   if ((ret =
1600           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1601               transition)) == GST_STATE_CHANGE_FAILURE)
1602     goto failure;
1603
1604
1605   switch (transition) {
1606     case GST_STATE_CHANGE_PAUSED_TO_READY:
1607       if (!gst_aggregator_stop (self)) {
1608         /* What to do in this case? Error out? */
1609         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1610       }
1611       break;
1612     default:
1613       break;
1614   }
1615
1616   return ret;
1617
1618 /* ERRORS */
1619 failure:
1620   {
1621     GST_ERROR_OBJECT (element, "parent failed state change");
1622     return ret;
1623   }
1624 error_start:
1625   {
1626     GST_ERROR_OBJECT (element, "Subclass failed to start");
1627     return GST_STATE_CHANGE_FAILURE;
1628   }
1629 }
1630
1631 static void
1632 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1633 {
1634   GstAggregator *self = GST_AGGREGATOR (element);
1635   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1636
1637   GST_INFO_OBJECT (pad, "Removing pad");
1638
1639   SRC_LOCK (self);
1640   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1641   gst_element_remove_pad (element, pad);
1642
1643   self->priv->has_peer_latency = FALSE;
1644   SRC_BROADCAST (self);
1645   SRC_UNLOCK (self);
1646 }
1647
1648 static GstAggregatorPad *
1649 gst_aggregator_default_create_new_pad (GstAggregator * self,
1650     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1651 {
1652   GstAggregatorPad *agg_pad;
1653   GstAggregatorPrivate *priv = self->priv;
1654   gint serial = 0;
1655   gchar *name = NULL;
1656   GType pad_type =
1657       GST_PAD_TEMPLATE_GTYPE (templ) ==
1658       G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
1659
1660   if (templ->direction != GST_PAD_SINK)
1661     goto not_sink;
1662
1663   if (templ->presence != GST_PAD_REQUEST)
1664     goto not_request;
1665
1666   GST_OBJECT_LOCK (self);
1667   if (req_name == NULL || strlen (req_name) < 6
1668       || !g_str_has_prefix (req_name, "sink_")) {
1669     /* no name given when requesting the pad, use next available int */
1670     serial = ++priv->max_padserial;
1671   } else {
1672     /* parse serial number from requested padname */
1673     serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1674     if (serial > priv->max_padserial)
1675       priv->max_padserial = serial;
1676   }
1677
1678   name = g_strdup_printf ("sink_%u", serial);
1679   agg_pad = g_object_new (pad_type,
1680       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1681   g_free (name);
1682
1683   GST_OBJECT_UNLOCK (self);
1684
1685   return agg_pad;
1686
1687   /* errors */
1688 not_sink:
1689   {
1690     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
1691     return NULL;
1692   }
1693 not_request:
1694   {
1695     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
1696     return NULL;
1697   }
1698 }
1699
1700 static GstPad *
1701 gst_aggregator_request_new_pad (GstElement * element,
1702     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1703 {
1704   GstAggregator *self;
1705   GstAggregatorPad *agg_pad;
1706   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1707   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1708
1709   self = GST_AGGREGATOR (element);
1710
1711   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1712   if (!agg_pad) {
1713     GST_ERROR_OBJECT (element, "Couldn't create new pad");
1714     return NULL;
1715   }
1716
1717   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1718
1719   if (priv->running)
1720     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1721
1722   /* add the pad to the element */
1723   gst_element_add_pad (element, GST_PAD (agg_pad));
1724
1725   return GST_PAD (agg_pad);
1726 }
1727
1728 /* Must be called with SRC_LOCK held */
1729
1730 static gboolean
1731 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1732 {
1733   gboolean query_ret, live;
1734   GstClockTime our_latency, min, max;
1735
1736   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1737
1738   if (!query_ret) {
1739     GST_WARNING_OBJECT (self, "Latency query failed");
1740     return FALSE;
1741   }
1742
1743   gst_query_parse_latency (query, &live, &min, &max);
1744
1745   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1746     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1747         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1748     return FALSE;
1749   }
1750
1751   if (self->priv->upstream_latency_min > min) {
1752     GstClockTimeDiff diff =
1753         GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
1754
1755     min += diff;
1756     if (GST_CLOCK_TIME_IS_VALID (max)) {
1757       max += diff;
1758     }
1759   }
1760
1761   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1762     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1763         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1764             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1765             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1766     return FALSE;
1767   }
1768
1769   our_latency = self->priv->latency;
1770
1771   self->priv->peer_latency_live = live;
1772   self->priv->peer_latency_min = min;
1773   self->priv->peer_latency_max = max;
1774   self->priv->has_peer_latency = TRUE;
1775
1776   /* add our own */
1777   min += our_latency;
1778   min += self->priv->sub_latency_min;
1779   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1780       && GST_CLOCK_TIME_IS_VALID (max))
1781     max += self->priv->sub_latency_max + our_latency;
1782   else
1783     max = GST_CLOCK_TIME_NONE;
1784
1785   SRC_BROADCAST (self);
1786
1787   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1788       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1789
1790   gst_query_set_latency (query, live, min, max);
1791
1792   return query_ret;
1793 }
1794
1795 /*
1796  * MUST be called with the src_lock held.
1797  *
1798  * See  gst_aggregator_get_latency() for doc
1799  */
1800 static GstClockTime
1801 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1802 {
1803   GstClockTime latency;
1804
1805   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1806
1807   if (!self->priv->has_peer_latency) {
1808     GstQuery *query = gst_query_new_latency ();
1809     gboolean ret;
1810
1811     ret = gst_aggregator_query_latency_unlocked (self, query);
1812     gst_query_unref (query);
1813     if (!ret)
1814       return GST_CLOCK_TIME_NONE;
1815   }
1816
1817   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1818     return GST_CLOCK_TIME_NONE;
1819
1820   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1821   latency = self->priv->peer_latency_min;
1822
1823   /* add our own */
1824   latency += self->priv->latency;
1825   latency += self->priv->sub_latency_min;
1826
1827   return latency;
1828 }
1829
1830 /**
1831  * gst_aggregator_get_latency:
1832  * @self: a #GstAggregator
1833  *
1834  * Retrieves the latency values reported by @self in response to the latency
1835  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1836  * will not wait for the clock.
1837  *
1838  * Typically only called by subclasses.
1839  *
1840  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1841  */
1842 GstClockTime
1843 gst_aggregator_get_latency (GstAggregator * self)
1844 {
1845   GstClockTime ret;
1846
1847   SRC_LOCK (self);
1848   ret = gst_aggregator_get_latency_unlocked (self);
1849   SRC_UNLOCK (self);
1850
1851   return ret;
1852 }
1853
1854 static gboolean
1855 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1856 {
1857   GstAggregator *self = GST_AGGREGATOR (element);
1858
1859   GST_STATE_LOCK (element);
1860   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1861       GST_STATE (element) < GST_STATE_PAUSED) {
1862     gdouble rate;
1863     GstFormat fmt;
1864     GstSeekFlags flags;
1865     GstSeekType start_type, stop_type;
1866     gint64 start, stop;
1867
1868     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1869         &start, &stop_type, &stop);
1870
1871     GST_OBJECT_LOCK (self);
1872     gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
1873         flags, start_type, start, stop_type, stop, NULL);
1874     self->priv->seqnum = gst_event_get_seqnum (event);
1875     self->priv->first_buffer = FALSE;
1876     GST_OBJECT_UNLOCK (self);
1877
1878     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1879   }
1880   GST_STATE_UNLOCK (element);
1881
1882
1883   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1884       event);
1885 }
1886
1887 static gboolean
1888 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1889 {
1890   gboolean res = TRUE;
1891
1892   switch (GST_QUERY_TYPE (query)) {
1893     case GST_QUERY_SEEKING:
1894     {
1895       GstFormat format;
1896
1897       /* don't pass it along as some (file)sink might claim it does
1898        * whereas with a collectpads in between that will not likely work */
1899       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1900       gst_query_set_seeking (query, format, FALSE, 0, -1);
1901       res = TRUE;
1902
1903       break;
1904     }
1905     case GST_QUERY_LATENCY:
1906       SRC_LOCK (self);
1907       res = gst_aggregator_query_latency_unlocked (self, query);
1908       SRC_UNLOCK (self);
1909       break;
1910     default:
1911       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1912   }
1913
1914   return res;
1915 }
1916
1917 static gboolean
1918 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1919 {
1920   EventData *evdata = user_data;
1921   gboolean ret = TRUE;
1922   GstPad *peer = gst_pad_get_peer (pad);
1923   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1924
1925   if (peer) {
1926     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1927       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1928       ret = TRUE;
1929     } else {
1930       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1931       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1932     }
1933   }
1934
1935   if (ret == FALSE) {
1936     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1937       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1938
1939       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1940
1941       if (gst_pad_query (peer, seeking)) {
1942         gboolean seekable;
1943
1944         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1945
1946         if (seekable == FALSE) {
1947           GST_INFO_OBJECT (pad,
1948               "Source not seekable, We failed but it does not matter!");
1949
1950           ret = TRUE;
1951         }
1952       } else {
1953         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1954       }
1955
1956       gst_query_unref (seeking);
1957     }
1958
1959     if (evdata->flush) {
1960       PAD_LOCK (aggpad);
1961       aggpad->priv->pending_flush_start = FALSE;
1962       aggpad->priv->pending_flush_stop = FALSE;
1963       PAD_UNLOCK (aggpad);
1964     }
1965   } else {
1966     evdata->one_actually_seeked = TRUE;
1967   }
1968
1969   evdata->result &= ret;
1970
1971   if (peer)
1972     gst_object_unref (peer);
1973
1974   /* Always send to all pads */
1975   return FALSE;
1976 }
1977
1978 static void
1979 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1980     EventData * evdata)
1981 {
1982   evdata->result = TRUE;
1983   evdata->one_actually_seeked = FALSE;
1984
1985   /* We first need to set all pads as flushing in a first pass
1986    * as flush_start flush_stop is sometimes sent synchronously
1987    * while we send the seek event */
1988   if (evdata->flush) {
1989     GList *l;
1990
1991     GST_OBJECT_LOCK (self);
1992     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1993       GstAggregatorPad *pad = l->data;
1994
1995       PAD_LOCK (pad);
1996       pad->priv->pending_flush_start = TRUE;
1997       pad->priv->pending_flush_stop = FALSE;
1998       PAD_UNLOCK (pad);
1999     }
2000     GST_OBJECT_UNLOCK (self);
2001   }
2002
2003   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
2004
2005   gst_event_unref (evdata->event);
2006 }
2007
2008 static gboolean
2009 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
2010 {
2011   gdouble rate;
2012   GstFormat fmt;
2013   GstSeekFlags flags;
2014   GstSeekType start_type, stop_type;
2015   gint64 start, stop;
2016   gboolean flush;
2017   EventData evdata = { 0, };
2018   GstAggregatorPrivate *priv = self->priv;
2019
2020   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2021       &start, &stop_type, &stop);
2022
2023   GST_INFO_OBJECT (self, "starting SEEK");
2024
2025   flush = flags & GST_SEEK_FLAG_FLUSH;
2026
2027   GST_OBJECT_LOCK (self);
2028   if (flush) {
2029     priv->pending_flush_start = TRUE;
2030     priv->flush_seeking = TRUE;
2031   }
2032
2033   gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2034       flags, start_type, start, stop_type, stop, NULL);
2035
2036   /* Seeking sets a position */
2037   self->priv->first_buffer = FALSE;
2038   GST_OBJECT_UNLOCK (self);
2039
2040   /* forward the seek upstream */
2041   evdata.event = event;
2042   evdata.flush = flush;
2043   evdata.only_to_active_pads = FALSE;
2044   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2045   event = NULL;
2046
2047   if (!evdata.result || !evdata.one_actually_seeked) {
2048     GST_OBJECT_LOCK (self);
2049     priv->flush_seeking = FALSE;
2050     priv->pending_flush_start = FALSE;
2051     GST_OBJECT_UNLOCK (self);
2052   }
2053
2054   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2055
2056   return evdata.result;
2057 }
2058
2059 static gboolean
2060 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2061 {
2062   EventData evdata = { 0, };
2063
2064   switch (GST_EVENT_TYPE (event)) {
2065     case GST_EVENT_SEEK:
2066       /* _do_seek() unrefs the event. */
2067       return gst_aggregator_do_seek (self, event);
2068     case GST_EVENT_NAVIGATION:
2069       /* navigation is rather pointless. */
2070       gst_event_unref (event);
2071       return FALSE;
2072     default:
2073       break;
2074   }
2075
2076   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2077    * they will receive a QOS event that has earliest_time=0 (because we can't
2078    * have negative timestamps), and consider their buffer as too late */
2079   evdata.event = event;
2080   evdata.flush = FALSE;
2081   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2082   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2083   return evdata.result;
2084 }
2085
2086 static gboolean
2087 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2088     GstEvent * event)
2089 {
2090   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2091
2092   return klass->src_event (GST_AGGREGATOR (parent), event);
2093 }
2094
2095 static gboolean
2096 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2097     GstQuery * query)
2098 {
2099   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2100
2101   return klass->src_query (GST_AGGREGATOR (parent), query);
2102 }
2103
2104 static gboolean
2105 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2106     GstObject * parent, GstPadMode mode, gboolean active)
2107 {
2108   GstAggregator *self = GST_AGGREGATOR (parent);
2109   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2110
2111   if (klass->src_activate) {
2112     if (klass->src_activate (self, mode, active) == FALSE) {
2113       return FALSE;
2114     }
2115   }
2116
2117   if (active == TRUE) {
2118     switch (mode) {
2119       case GST_PAD_MODE_PUSH:
2120       {
2121         GST_INFO_OBJECT (pad, "Activating pad!");
2122         gst_aggregator_start_srcpad_task (self);
2123         return TRUE;
2124       }
2125       default:
2126       {
2127         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2128         return FALSE;
2129       }
2130     }
2131   }
2132
2133   /* deactivating */
2134   GST_INFO_OBJECT (self, "Deactivating srcpad");
2135   gst_aggregator_stop_srcpad_task (self, FALSE);
2136
2137   return TRUE;
2138 }
2139
2140 static gboolean
2141 gst_aggregator_default_sink_query (GstAggregator * self,
2142     GstAggregatorPad * aggpad, GstQuery * query)
2143 {
2144   GstPad *pad = GST_PAD (aggpad);
2145
2146   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2147     GstQuery *decide_query = NULL;
2148     GstAggregatorClass *agg_class;
2149     gboolean ret;
2150
2151     GST_OBJECT_LOCK (self);
2152     PAD_LOCK (aggpad);
2153     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2154       GST_DEBUG_OBJECT (self,
2155           "not negotiated yet, can't answer ALLOCATION query");
2156       PAD_UNLOCK (aggpad);
2157       GST_OBJECT_UNLOCK (self);
2158
2159       return FALSE;
2160     }
2161
2162     if ((decide_query = self->priv->allocation_query))
2163       gst_query_ref (decide_query);
2164     PAD_UNLOCK (aggpad);
2165     GST_OBJECT_UNLOCK (self);
2166
2167     GST_DEBUG_OBJECT (self,
2168         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2169
2170     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2171
2172     /* pass the query to the propose_allocation vmethod if any */
2173     if (agg_class->propose_allocation)
2174       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2175     else
2176       ret = FALSE;
2177
2178     if (decide_query)
2179       gst_query_unref (decide_query);
2180
2181     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2182     return ret;
2183   }
2184
2185   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2186 }
2187
2188 static void
2189 gst_aggregator_finalize (GObject * object)
2190 {
2191   GstAggregator *self = (GstAggregator *) object;
2192
2193   g_mutex_clear (&self->priv->src_lock);
2194   g_cond_clear (&self->priv->src_cond);
2195
2196   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2197 }
2198
2199 /*
2200  * gst_aggregator_set_latency_property:
2201  * @agg: a #GstAggregator
2202  * @latency: the new latency value (in nanoseconds).
2203  *
2204  * Sets the new latency value to @latency. This value is used to limit the
2205  * amount of time a pad waits for data to appear before considering the pad
2206  * as unresponsive.
2207  */
2208 static void
2209 gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
2210 {
2211   gboolean changed;
2212
2213   g_return_if_fail (GST_IS_AGGREGATOR (self));
2214   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2215
2216   SRC_LOCK (self);
2217   changed = (self->priv->latency != latency);
2218
2219   if (changed) {
2220     GList *item;
2221
2222     GST_OBJECT_LOCK (self);
2223     /* First lock all the pads */
2224     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2225       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2226       PAD_LOCK (aggpad);
2227     }
2228
2229     self->priv->latency = latency;
2230
2231     SRC_BROADCAST (self);
2232
2233     /* Now wake up the pads */
2234     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2235       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2236       PAD_BROADCAST_EVENT (aggpad);
2237       PAD_UNLOCK (aggpad);
2238     }
2239     GST_OBJECT_UNLOCK (self);
2240   }
2241
2242   SRC_UNLOCK (self);
2243
2244   if (changed)
2245     gst_element_post_message (GST_ELEMENT_CAST (self),
2246         gst_message_new_latency (GST_OBJECT_CAST (self)));
2247 }
2248
2249 /*
2250  * gst_aggregator_get_latency_property:
2251  * @agg: a #GstAggregator
2252  *
2253  * Gets the latency value. See gst_aggregator_set_latency for
2254  * more details.
2255  *
2256  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2257  * before a pad is deemed unresponsive. A value of -1 means an
2258  * unlimited time.
2259  */
2260 static GstClockTime
2261 gst_aggregator_get_latency_property (GstAggregator * agg)
2262 {
2263   GstClockTime res;
2264
2265   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
2266
2267   GST_OBJECT_LOCK (agg);
2268   res = agg->priv->latency;
2269   GST_OBJECT_UNLOCK (agg);
2270
2271   return res;
2272 }
2273
2274 static void
2275 gst_aggregator_set_property (GObject * object, guint prop_id,
2276     const GValue * value, GParamSpec * pspec)
2277 {
2278   GstAggregator *agg = GST_AGGREGATOR (object);
2279
2280   switch (prop_id) {
2281     case PROP_LATENCY:
2282       gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
2283       break;
2284     case PROP_MIN_UPSTREAM_LATENCY:
2285       SRC_LOCK (agg);
2286       agg->priv->upstream_latency_min = g_value_get_uint64 (value);
2287       SRC_UNLOCK (agg);
2288       break;
2289     case PROP_START_TIME_SELECTION:
2290       agg->priv->start_time_selection = g_value_get_enum (value);
2291       break;
2292     case PROP_START_TIME:
2293       agg->priv->start_time = g_value_get_uint64 (value);
2294       break;
2295     default:
2296       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2297       break;
2298   }
2299 }
2300
2301 static void
2302 gst_aggregator_get_property (GObject * object, guint prop_id,
2303     GValue * value, GParamSpec * pspec)
2304 {
2305   GstAggregator *agg = GST_AGGREGATOR (object);
2306
2307   switch (prop_id) {
2308     case PROP_LATENCY:
2309       g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
2310       break;
2311     case PROP_MIN_UPSTREAM_LATENCY:
2312       SRC_LOCK (agg);
2313       g_value_set_uint64 (value, agg->priv->upstream_latency_min);
2314       SRC_UNLOCK (agg);
2315       break;
2316     case PROP_START_TIME_SELECTION:
2317       g_value_set_enum (value, agg->priv->start_time_selection);
2318       break;
2319     case PROP_START_TIME:
2320       g_value_set_uint64 (value, agg->priv->start_time);
2321       break;
2322     default:
2323       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2324       break;
2325   }
2326 }
2327
2328 /* GObject vmethods implementations */
2329 static void
2330 gst_aggregator_class_init (GstAggregatorClass * klass)
2331 {
2332   GObjectClass *gobject_class = (GObjectClass *) klass;
2333   GstElementClass *gstelement_class = (GstElementClass *) klass;
2334
2335   aggregator_parent_class = g_type_class_peek_parent (klass);
2336
2337   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2338       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2339
2340   if (aggregator_private_offset != 0)
2341     g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
2342
2343   klass->finish_buffer = gst_aggregator_default_finish_buffer;
2344
2345   klass->sink_event = gst_aggregator_default_sink_event;
2346   klass->sink_query = gst_aggregator_default_sink_query;
2347
2348   klass->src_event = gst_aggregator_default_src_event;
2349   klass->src_query = gst_aggregator_default_src_query;
2350
2351   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2352   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2353   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2354   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2355
2356   gstelement_class->request_new_pad =
2357       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2358   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2359   gstelement_class->release_pad =
2360       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2361   gstelement_class->change_state =
2362       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2363
2364   gobject_class->set_property = gst_aggregator_set_property;
2365   gobject_class->get_property = gst_aggregator_get_property;
2366   gobject_class->finalize = gst_aggregator_finalize;
2367
2368   g_object_class_install_property (gobject_class, PROP_LATENCY,
2369       g_param_spec_uint64 ("latency", "Buffer latency",
2370           "Additional latency in live mode to allow upstream "
2371           "to take longer to produce buffers for the current "
2372           "position (in nanoseconds)", 0, G_MAXUINT64,
2373           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2374
2375   /**
2376    * GstAggregator:min-upstream-latency:
2377    *
2378    * Force minimum upstream latency (in nanoseconds). When sources with a
2379    * higher latency are expected to be plugged in dynamically after the
2380    * aggregator has started playing, this allows overriding the minimum
2381    * latency reported by the initial source(s). This is only taken into
2382    * account when larger than the actually reported minimum latency.
2383    *
2384    * Since: 1.16
2385    */
2386   g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
2387       g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
2388           "When sources with a higher latency are expected to be plugged "
2389           "in dynamically after the aggregator has started playing, "
2390           "this allows overriding the minimum latency reported by the "
2391           "initial source(s). This is only taken into account when larger "
2392           "than the actually reported minimum latency. (nanoseconds)",
2393           0, G_MAXUINT64,
2394           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2395
2396   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2397       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2398           "Decides which start time is output",
2399           gst_aggregator_start_time_selection_get_type (),
2400           DEFAULT_START_TIME_SELECTION,
2401           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2402
2403   g_object_class_install_property (gobject_class, PROP_START_TIME,
2404       g_param_spec_uint64 ("start-time", "Start Time",
2405           "Start time to use if start-time-selection=set", 0,
2406           G_MAXUINT64,
2407           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2408 }
2409
2410 static inline gpointer
2411 gst_aggregator_get_instance_private (GstAggregator * self)
2412 {
2413   return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
2414 }
2415
2416 static void
2417 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2418 {
2419   GstPadTemplate *pad_template;
2420   GstAggregatorPrivate *priv;
2421
2422   g_return_if_fail (klass->aggregate != NULL);
2423
2424   self->priv = gst_aggregator_get_instance_private (self);
2425
2426   priv = self->priv;
2427
2428   pad_template =
2429       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2430   g_return_if_fail (pad_template != NULL);
2431
2432   priv->max_padserial = -1;
2433   priv->tags_changed = FALSE;
2434
2435   self->priv->peer_latency_live = FALSE;
2436   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2437   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2438   self->priv->has_peer_latency = FALSE;
2439
2440   self->srcpad = gst_pad_new_from_template (pad_template, "src");
2441
2442   gst_aggregator_reset_flow_values (self);
2443
2444   gst_pad_set_event_function (self->srcpad,
2445       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2446   gst_pad_set_query_function (self->srcpad,
2447       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2448   gst_pad_set_activatemode_function (self->srcpad,
2449       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2450
2451   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2452
2453   self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
2454   self->priv->latency = DEFAULT_LATENCY;
2455   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2456   self->priv->start_time = DEFAULT_START_TIME;
2457
2458   g_mutex_init (&self->priv->src_lock);
2459   g_cond_init (&self->priv->src_cond);
2460 }
2461
2462 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2463  * method to get to the padtemplates */
2464 GType
2465 gst_aggregator_get_type (void)
2466 {
2467   static volatile gsize type = 0;
2468
2469   if (g_once_init_enter (&type)) {
2470     GType _type;
2471     static const GTypeInfo info = {
2472       sizeof (GstAggregatorClass),
2473       NULL,
2474       NULL,
2475       (GClassInitFunc) gst_aggregator_class_init,
2476       NULL,
2477       NULL,
2478       sizeof (GstAggregator),
2479       0,
2480       (GInstanceInitFunc) gst_aggregator_init,
2481     };
2482
2483     _type = g_type_register_static (GST_TYPE_ELEMENT,
2484         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2485
2486     aggregator_private_offset =
2487         g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
2488
2489     g_once_init_leave (&type, _type);
2490   }
2491   return type;
2492 }
2493
2494 /* Must be called with SRC lock and PAD lock held */
2495 static gboolean
2496 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2497 {
2498   /* Empty queue always has space */
2499   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2500     return TRUE;
2501
2502   /* We also want at least two buffers, one is being processed and one is ready
2503    * for the next iteration when we operate in live mode. */
2504   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2505     return TRUE;
2506
2507   /* zero latency, if there is a buffer, it's full */
2508   if (self->priv->latency == 0)
2509     return FALSE;
2510
2511   /* Allow no more buffers than the latency */
2512   return (aggpad->priv->time_level <= self->priv->latency);
2513 }
2514
2515 /* Must be called with the PAD_LOCK held */
2516 static void
2517 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2518 {
2519   GstClockTime timestamp;
2520
2521   if (GST_BUFFER_DTS_IS_VALID (buffer))
2522     timestamp = GST_BUFFER_DTS (buffer);
2523   else
2524     timestamp = GST_BUFFER_PTS (buffer);
2525
2526   if (timestamp == GST_CLOCK_TIME_NONE) {
2527     if (head)
2528       timestamp = aggpad->priv->head_position;
2529     else
2530       timestamp = aggpad->priv->tail_position;
2531   }
2532
2533   /* add duration */
2534   if (GST_BUFFER_DURATION_IS_VALID (buffer))
2535     timestamp += GST_BUFFER_DURATION (buffer);
2536
2537   if (head)
2538     aggpad->priv->head_position = timestamp;
2539   else
2540     aggpad->priv->tail_position = timestamp;
2541
2542   update_time_level (aggpad, head);
2543 }
2544
2545 /*
2546  * Can be called either from the sinkpad's chain function or from the srcpad's
2547  * thread in the case of a buffer synthetized from a GAP event.
2548  * Because of this second case, FLUSH_LOCK can't be used here.
2549  */
2550
2551 static GstFlowReturn
2552 gst_aggregator_pad_chain_internal (GstAggregator * self,
2553     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2554 {
2555   GstFlowReturn flow_return;
2556   GstClockTime buf_pts;
2557
2558   PAD_LOCK (aggpad);
2559   flow_return = aggpad->priv->flow_return;
2560   if (flow_return != GST_FLOW_OK)
2561     goto flushing;
2562
2563   PAD_UNLOCK (aggpad);
2564
2565   buf_pts = GST_BUFFER_PTS (buffer);
2566
2567   for (;;) {
2568     SRC_LOCK (self);
2569     GST_OBJECT_LOCK (self);
2570     PAD_LOCK (aggpad);
2571
2572     if (aggpad->priv->first_buffer) {
2573       self->priv->has_peer_latency = FALSE;
2574       aggpad->priv->first_buffer = FALSE;
2575     }
2576
2577     if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
2578         && aggpad->priv->flow_return == GST_FLOW_OK) {
2579       if (head)
2580         g_queue_push_head (&aggpad->priv->data, buffer);
2581       else
2582         g_queue_push_tail (&aggpad->priv->data, buffer);
2583       apply_buffer (aggpad, buffer, head);
2584       aggpad->priv->num_buffers++;
2585       buffer = NULL;
2586       SRC_BROADCAST (self);
2587       break;
2588     }
2589
2590     flow_return = aggpad->priv->flow_return;
2591     if (flow_return != GST_FLOW_OK) {
2592       GST_OBJECT_UNLOCK (self);
2593       SRC_UNLOCK (self);
2594       goto flushing;
2595     }
2596     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2597     GST_OBJECT_UNLOCK (self);
2598     SRC_UNLOCK (self);
2599     PAD_WAIT_EVENT (aggpad);
2600
2601     PAD_UNLOCK (aggpad);
2602   }
2603
2604   if (self->priv->first_buffer) {
2605     GstClockTime start_time;
2606     GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
2607
2608     switch (self->priv->start_time_selection) {
2609       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2610       default:
2611         start_time = 0;
2612         break;
2613       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2614         GST_OBJECT_LOCK (aggpad);
2615         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2616           start_time = buf_pts;
2617           if (start_time != -1) {
2618             start_time = MAX (start_time, aggpad->priv->head_segment.start);
2619             start_time =
2620                 gst_segment_to_running_time (&aggpad->priv->head_segment,
2621                 GST_FORMAT_TIME, start_time);
2622           }
2623         } else {
2624           start_time = 0;
2625           GST_WARNING_OBJECT (aggpad,
2626               "Ignoring request of selecting the first start time "
2627               "as the segment is a %s segment instead of a time segment",
2628               gst_format_get_name (aggpad->segment.format));
2629         }
2630         GST_OBJECT_UNLOCK (aggpad);
2631         break;
2632       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2633         start_time = self->priv->start_time;
2634         if (start_time == -1)
2635           start_time = 0;
2636         break;
2637     }
2638
2639     if (start_time != -1) {
2640       if (srcpad->segment.position == -1)
2641         srcpad->segment.position = start_time;
2642       else
2643         srcpad->segment.position = MIN (start_time, srcpad->segment.position);
2644
2645       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2646           GST_TIME_ARGS (start_time));
2647     }
2648   }
2649
2650   PAD_UNLOCK (aggpad);
2651   GST_OBJECT_UNLOCK (self);
2652   SRC_UNLOCK (self);
2653
2654   GST_DEBUG_OBJECT (aggpad, "Done chaining");
2655
2656   return flow_return;
2657
2658 flushing:
2659   PAD_UNLOCK (aggpad);
2660
2661   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2662       gst_flow_get_name (flow_return));
2663   if (buffer)
2664     gst_buffer_unref (buffer);
2665
2666   return flow_return;
2667 }
2668
2669 static GstFlowReturn
2670 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2671 {
2672   GstFlowReturn ret;
2673   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2674
2675   PAD_FLUSH_LOCK (aggpad);
2676
2677   ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2678       aggpad, buffer, TRUE);
2679
2680   PAD_FLUSH_UNLOCK (aggpad);
2681
2682   return ret;
2683 }
2684
2685 static gboolean
2686 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2687     GstQuery * query)
2688 {
2689   GstAggregator *self = GST_AGGREGATOR (parent);
2690   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2691
2692   if (GST_QUERY_IS_SERIALIZED (query)) {
2693     GstStructure *s;
2694     gboolean ret = FALSE;
2695
2696     SRC_LOCK (self);
2697     PAD_LOCK (aggpad);
2698
2699     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2700       SRC_UNLOCK (self);
2701       goto flushing;
2702     }
2703
2704     g_queue_push_head (&aggpad->priv->data, query);
2705     SRC_BROADCAST (self);
2706     SRC_UNLOCK (self);
2707
2708     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2709         && aggpad->priv->flow_return == GST_FLOW_OK) {
2710       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2711       PAD_WAIT_EVENT (aggpad);
2712     }
2713
2714     s = gst_query_writable_structure (query);
2715     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2716       gst_structure_remove_field (s, "gst-aggregator-retval");
2717     else
2718       g_queue_remove (&aggpad->priv->data, query);
2719
2720     if (aggpad->priv->flow_return != GST_FLOW_OK)
2721       goto flushing;
2722
2723     PAD_UNLOCK (aggpad);
2724
2725     return ret;
2726   } else {
2727     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2728
2729     return klass->sink_query (self, aggpad, query);
2730   }
2731
2732 flushing:
2733   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2734       gst_flow_get_name (aggpad->priv->flow_return));
2735   PAD_UNLOCK (aggpad);
2736
2737   return FALSE;
2738 }
2739
2740 /* Queue serialized events and let the others go through directly.
2741  * The queued events with be handled from the src-pad task in
2742  * gst_aggregator_do_events_and_queries().
2743  */
2744 static GstFlowReturn
2745 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2746     GstEvent * event)
2747 {
2748   GstFlowReturn ret = GST_FLOW_OK;
2749   GstAggregator *self = GST_AGGREGATOR (parent);
2750   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2751
2752   if (GST_EVENT_IS_SERIALIZED (event)
2753       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2754     SRC_LOCK (self);
2755     PAD_LOCK (aggpad);
2756
2757     if (aggpad->priv->flow_return != GST_FLOW_OK)
2758       goto flushing;
2759
2760     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2761       GST_OBJECT_LOCK (aggpad);
2762       gst_event_copy_segment (event, &aggpad->priv->head_segment);
2763       aggpad->priv->head_position = aggpad->priv->head_segment.position;
2764       update_time_level (aggpad, TRUE);
2765       GST_OBJECT_UNLOCK (aggpad);
2766     }
2767
2768     GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
2769     g_queue_push_head (&aggpad->priv->data, event);
2770     SRC_BROADCAST (self);
2771     PAD_UNLOCK (aggpad);
2772     SRC_UNLOCK (self);
2773   } else {
2774     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2775
2776     if (!klass->sink_event (self, aggpad, event)) {
2777       /* Copied from GstPad to convert boolean to a GstFlowReturn in
2778        * the event handling func */
2779       ret = GST_FLOW_ERROR;
2780     }
2781   }
2782
2783   return ret;
2784
2785 flushing:
2786   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2787       gst_flow_get_name (aggpad->priv->flow_return));
2788   PAD_UNLOCK (aggpad);
2789   SRC_UNLOCK (self);
2790   if (GST_EVENT_IS_STICKY (event))
2791     gst_pad_store_sticky_event (pad, event);
2792   gst_event_unref (event);
2793
2794   return aggpad->priv->flow_return;
2795 }
2796
2797 static gboolean
2798 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2799     GstObject * parent, GstPadMode mode, gboolean active)
2800 {
2801   GstAggregator *self = GST_AGGREGATOR (parent);
2802   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2803
2804   if (active == FALSE) {
2805     SRC_LOCK (self);
2806     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2807     SRC_BROADCAST (self);
2808     SRC_UNLOCK (self);
2809   } else {
2810     PAD_LOCK (aggpad);
2811     aggpad->priv->flow_return = GST_FLOW_OK;
2812     PAD_BROADCAST_EVENT (aggpad);
2813     PAD_UNLOCK (aggpad);
2814   }
2815
2816   return TRUE;
2817 }
2818
2819 /***********************************
2820  * GstAggregatorPad implementation  *
2821  ************************************/
2822 G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2823
2824 static void
2825 gst_aggregator_pad_constructed (GObject * object)
2826 {
2827   GstPad *pad = GST_PAD (object);
2828
2829   if (GST_PAD_IS_SINK (pad)) {
2830     gst_pad_set_chain_function (pad,
2831         GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2832     gst_pad_set_event_full_function_full (pad,
2833         GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2834     gst_pad_set_query_function (pad,
2835         GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2836     gst_pad_set_activatemode_function (pad,
2837         GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2838   }
2839 }
2840
2841 static void
2842 gst_aggregator_pad_finalize (GObject * object)
2843 {
2844   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2845
2846   g_cond_clear (&pad->priv->event_cond);
2847   g_mutex_clear (&pad->priv->flush_lock);
2848   g_mutex_clear (&pad->priv->lock);
2849
2850   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2851 }
2852
2853 static void
2854 gst_aggregator_pad_dispose (GObject * object)
2855 {
2856   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2857
2858   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2859
2860   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2861 }
2862
2863 static void
2864 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2865 {
2866   GObjectClass *gobject_class = (GObjectClass *) klass;
2867
2868   gobject_class->constructed = gst_aggregator_pad_constructed;
2869   gobject_class->finalize = gst_aggregator_pad_finalize;
2870   gobject_class->dispose = gst_aggregator_pad_dispose;
2871 }
2872
2873 static void
2874 gst_aggregator_pad_init (GstAggregatorPad * pad)
2875 {
2876   pad->priv = gst_aggregator_pad_get_instance_private (pad);
2877
2878   g_queue_init (&pad->priv->data);
2879   g_cond_init (&pad->priv->event_cond);
2880
2881   g_mutex_init (&pad->priv->flush_lock);
2882   g_mutex_init (&pad->priv->lock);
2883
2884   gst_aggregator_pad_reset_unlocked (pad);
2885   pad->priv->negotiated = FALSE;
2886 }
2887
2888 /* Must be called with the PAD_LOCK held */
2889 static void
2890 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
2891 {
2892   pad->priv->num_buffers--;
2893   GST_TRACE_OBJECT (pad, "Consuming buffer");
2894   PAD_BROADCAST_EVENT (pad);
2895 }
2896
2897 /* Must be called with the PAD_LOCK held */
2898 static void
2899 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
2900 {
2901   GstAggregator *self = NULL;
2902   GstAggregatorClass *aggclass = NULL;
2903   GstBuffer *buffer = NULL;
2904
2905   while (pad->priv->clipped_buffer == NULL &&
2906       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
2907     buffer = g_queue_pop_tail (&pad->priv->data);
2908
2909     apply_buffer (pad, buffer, FALSE);
2910
2911     /* We only take the parent here so that it's not taken if the buffer is
2912      * already clipped or if the queue is empty.
2913      */
2914     if (self == NULL) {
2915       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
2916       if (self == NULL) {
2917         gst_buffer_unref (buffer);
2918         return;
2919       }
2920
2921       aggclass = GST_AGGREGATOR_GET_CLASS (self);
2922     }
2923
2924     if (aggclass->clip) {
2925       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
2926
2927       buffer = aggclass->clip (self, pad, buffer);
2928
2929       if (buffer == NULL) {
2930         gst_aggregator_pad_buffer_consumed (pad);
2931         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
2932       }
2933     }
2934
2935     pad->priv->clipped_buffer = buffer;
2936   }
2937
2938   if (self)
2939     gst_object_unref (self);
2940 }
2941
2942 /**
2943  * gst_aggregator_pad_pop_buffer:
2944  * @pad: the pad to get buffer from
2945  *
2946  * Steal the ref to the buffer currently queued in @pad.
2947  *
2948  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2949  *   queued. You should unref the buffer after usage.
2950  */
2951 GstBuffer *
2952 gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
2953 {
2954   GstBuffer *buffer;
2955
2956   PAD_LOCK (pad);
2957
2958   if (pad->priv->flow_return != GST_FLOW_OK) {
2959     PAD_UNLOCK (pad);
2960     return NULL;
2961   }
2962
2963   gst_aggregator_pad_clip_buffer_unlocked (pad);
2964
2965   buffer = pad->priv->clipped_buffer;
2966
2967   if (buffer) {
2968     pad->priv->clipped_buffer = NULL;
2969     gst_aggregator_pad_buffer_consumed (pad);
2970     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2971   }
2972
2973   PAD_UNLOCK (pad);
2974
2975   return buffer;
2976 }
2977
2978 /**
2979  * gst_aggregator_pad_drop_buffer:
2980  * @pad: the pad where to drop any pending buffer
2981  *
2982  * Drop the buffer currently queued in @pad.
2983  *
2984  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2985  */
2986 gboolean
2987 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2988 {
2989   GstBuffer *buf;
2990
2991   buf = gst_aggregator_pad_pop_buffer (pad);
2992
2993   if (buf == NULL)
2994     return FALSE;
2995
2996   gst_buffer_unref (buf);
2997   return TRUE;
2998 }
2999
3000 /**
3001  * gst_aggregator_pad_peek_buffer:
3002  * @pad: the pad to get buffer from
3003  *
3004  * Returns: (transfer full): A reference to the buffer in @pad or
3005  * NULL if no buffer was queued. You should unref the buffer after
3006  * usage.
3007  */
3008 GstBuffer *
3009 gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
3010 {
3011   GstBuffer *buffer;
3012
3013   PAD_LOCK (pad);
3014
3015   if (pad->priv->flow_return != GST_FLOW_OK) {
3016     PAD_UNLOCK (pad);
3017     return NULL;
3018   }
3019
3020   gst_aggregator_pad_clip_buffer_unlocked (pad);
3021
3022   if (pad->priv->clipped_buffer) {
3023     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3024   } else {
3025     buffer = NULL;
3026   }
3027   PAD_UNLOCK (pad);
3028
3029   return buffer;
3030 }
3031
3032 /**
3033  * gst_aggregator_pad_has_buffer:
3034  * @pad: the pad to check the buffer on
3035  *
3036  * This checks if a pad has a buffer available that will be returned by
3037  * a call to gst_aggregator_pad_peek_buffer() or
3038  * gst_aggregator_pad_pop_buffer().
3039  *
3040  * Returns: %TRUE if the pad has a buffer available as the next thing.
3041  *
3042  * Since: 1.14.1
3043  */
3044 gboolean
3045 gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
3046 {
3047   gboolean has_buffer;
3048
3049   PAD_LOCK (pad);
3050   gst_aggregator_pad_clip_buffer_unlocked (pad);
3051   has_buffer = (pad->priv->clipped_buffer != NULL);
3052   PAD_UNLOCK (pad);
3053
3054   return has_buffer;
3055 }
3056
3057 /**
3058  * gst_aggregator_pad_is_eos:
3059  * @pad: an aggregator pad
3060  *
3061  * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
3062  */
3063 gboolean
3064 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
3065 {
3066   gboolean is_eos;
3067
3068   PAD_LOCK (pad);
3069   is_eos = pad->priv->eos;
3070   PAD_UNLOCK (pad);
3071
3072   return is_eos;
3073 }
3074
3075 #if 0
3076 /*
3077  * gst_aggregator_merge_tags:
3078  * @self: a #GstAggregator
3079  * @tags: a #GstTagList to merge
3080  * @mode: the #GstTagMergeMode to use
3081  *
3082  * Adds tags to so-called pending tags, which will be processed
3083  * before pushing out data downstream.
3084  *
3085  * Note that this is provided for convenience, and the subclass is
3086  * not required to use this and can still do tag handling on its own.
3087  *
3088  * MT safe.
3089  */
3090 void
3091 gst_aggregator_merge_tags (GstAggregator * self,
3092     const GstTagList * tags, GstTagMergeMode mode)
3093 {
3094   GstTagList *otags;
3095
3096   g_return_if_fail (GST_IS_AGGREGATOR (self));
3097   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3098
3099   /* FIXME Check if we can use OBJECT lock here! */
3100   GST_OBJECT_LOCK (self);
3101   if (tags)
3102     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3103   otags = self->priv->tags;
3104   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3105   if (otags)
3106     gst_tag_list_unref (otags);
3107   self->priv->tags_changed = TRUE;
3108   GST_OBJECT_UNLOCK (self);
3109 }
3110 #endif
3111
3112 /**
3113  * gst_aggregator_set_latency:
3114  * @self: a #GstAggregator
3115  * @min_latency: minimum latency
3116  * @max_latency: maximum latency
3117  *
3118  * Lets #GstAggregator sub-classes tell the baseclass what their internal
3119  * latency is. Will also post a LATENCY message on the bus so the pipeline
3120  * can reconfigure its global latency.
3121  */
3122 void
3123 gst_aggregator_set_latency (GstAggregator * self,
3124     GstClockTime min_latency, GstClockTime max_latency)
3125 {
3126   gboolean changed = FALSE;
3127
3128   g_return_if_fail (GST_IS_AGGREGATOR (self));
3129   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3130   g_return_if_fail (max_latency >= min_latency);
3131
3132   SRC_LOCK (self);
3133   if (self->priv->sub_latency_min != min_latency) {
3134     self->priv->sub_latency_min = min_latency;
3135     changed = TRUE;
3136   }
3137   if (self->priv->sub_latency_max != max_latency) {
3138     self->priv->sub_latency_max = max_latency;
3139     changed = TRUE;
3140   }
3141
3142   if (changed)
3143     SRC_BROADCAST (self);
3144   SRC_UNLOCK (self);
3145
3146   if (changed) {
3147     gst_element_post_message (GST_ELEMENT_CAST (self),
3148         gst_message_new_latency (GST_OBJECT_CAST (self)));
3149   }
3150 }
3151
3152 /**
3153  * gst_aggregator_get_buffer_pool:
3154  * @self: a #GstAggregator
3155  *
3156  * Returns: (transfer full): the instance of the #GstBufferPool used
3157  * by @trans; free it after use it
3158  */
3159 GstBufferPool *
3160 gst_aggregator_get_buffer_pool (GstAggregator * self)
3161 {
3162   GstBufferPool *pool;
3163
3164   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3165
3166   GST_OBJECT_LOCK (self);
3167   pool = self->priv->pool;
3168   if (pool)
3169     gst_object_ref (pool);
3170   GST_OBJECT_UNLOCK (self);
3171
3172   return pool;
3173 }
3174
3175 /**
3176  * gst_aggregator_get_allocator:
3177  * @self: a #GstAggregator
3178  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3179  * used
3180  * @params: (out) (allow-none) (transfer full): the
3181  * #GstAllocationParams of @allocator
3182  *
3183  * Lets #GstAggregator sub-classes get the memory @allocator
3184  * acquired by the base class and its @params.
3185  *
3186  * Unref the @allocator after use it.
3187  */
3188 void
3189 gst_aggregator_get_allocator (GstAggregator * self,
3190     GstAllocator ** allocator, GstAllocationParams * params)
3191 {
3192   g_return_if_fail (GST_IS_AGGREGATOR (self));
3193
3194   if (allocator)
3195     *allocator = self->priv->allocator ?
3196         gst_object_ref (self->priv->allocator) : NULL;
3197
3198   if (params)
3199     *params = self->priv->allocation_params;
3200 }
3201
3202 /**
3203  * gst_aggregator_simple_get_next_time:
3204  * @self: A #GstAggregator
3205  *
3206  * This is a simple #GstAggregator::get_next_time implementation that
3207  * just looks at the #GstSegment on the srcpad of the aggregator and bases
3208  * the next time on the running time there.
3209  *
3210  * This is the desired behaviour in most cases where you have a live source
3211  * and you have a dead line based aggregator subclass.
3212  *
3213  * Returns: The running time based on the position
3214  *
3215  * Since: 1.16
3216  */
3217 GstClockTime
3218 gst_aggregator_simple_get_next_time (GstAggregator * self)
3219 {
3220   GstClockTime next_time;
3221   GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3222   GstSegment *segment = &srcpad->segment;
3223
3224   GST_OBJECT_LOCK (self);
3225   if (segment->position == -1 || segment->position < segment->start)
3226     next_time = segment->start;
3227   else
3228     next_time = segment->position;
3229
3230   if (segment->stop != -1 && next_time > segment->stop)
3231     next_time = segment->stop;
3232
3233   next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
3234   GST_OBJECT_UNLOCK (self);
3235
3236   return next_time;
3237 }