aggregator: add min-upstream-latency property.
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: Base class for mixers and muxers, manages a set of input
26  *     pads and aggregates their streams
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *
35  *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36  *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
37  *    Subclasses should not overwrite those, but instead implement
38  *    #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
39  *    needed.
40  *
41  *  * When data is queued on all pads, the aggregate vmethod is called.
42  *
43  *  * One can peek at the data on any given GstAggregatorPad with the
44  *    gst_aggregator_pad_peek_buffer () method, and remove it from the pad
45  *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
46  *    has been taken with pop_buffer (), a new buffer can be queued
47  *    on that pad.
48  *
49  *  * If the subclass wishes to push a buffer downstream in its aggregate
50  *    implementation, it should do so through the
51  *    gst_aggregator_finish_buffer () method. This method will take care
52  *    of sending and ordering mandatory events such as stream start, caps
53  *    and segment.
54  *
55  *  * Same goes for EOS events, which should not be pushed directly by the
56  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
57  *    implementation.
58  *
59  *  * Note that the aggregator logic regarding gap event handling is to turn
60  *    these into gap buffers with matching PTS and duration. It will also
61  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
62  *    to ease their identification and subsequent processing.
63  *
64  *  * Subclasses must use (a subclass of) #GstAggregatorPad for both their
65  *    sink and source pads.
66  *    See gst_element_class_add_static_pad_template_with_gtype().
67  *
68  * This class used to live in gst-plugins-bad and was moved to core.
69  *
70  * Since: 1.14
71  */
72
73 /**
74  * SECTION: gstaggregatorpad
75  * @title: GstAggregatorPad
76  * @short_description: #GstPad subclass for pads managed by #GstAggregator
77  * @see_also: gstcollectpads for historical reasons.
78  *
79  * Pads managed by a #GstAggregor subclass.
80  *
81  * This class used to live in gst-plugins-bad and was moved to core.
82  *
83  * Since: 1.14
84  */
85
86 #ifdef HAVE_CONFIG_H
87 #  include "config.h"
88 #endif
89
90 #include <string.h>             /* strlen */
91
92 #include "gstaggregator.h"
93
94 typedef enum
95 {
96   GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
97   GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
98   GST_AGGREGATOR_START_TIME_SELECTION_SET
99 } GstAggregatorStartTimeSelection;
100
101 static GType
102 gst_aggregator_start_time_selection_get_type (void)
103 {
104   static GType gtype = 0;
105
106   if (gtype == 0) {
107     static const GEnumValue values[] = {
108       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
109           "Start at 0 running time (default)", "zero"},
110       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
111           "Start at first observed input running time", "first"},
112       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
113           "Set start time with start-time property", "set"},
114       {0, NULL, NULL}
115     };
116
117     gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
118   }
119   return gtype;
120 }
121
122 /*  Might become API */
123 #if 0
124 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
125     const GstTagList * tags, GstTagMergeMode mode);
126 #endif
127 static void gst_aggregator_set_latency_property (GstAggregator * agg,
128     GstClockTime latency);
129 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
130
131 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
132
133 static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
134
135 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
136 #define GST_CAT_DEFAULT aggregator_debug
137
138 /* Locking order, locks in this element must always be taken in this order
139  *
140  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
141  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
142  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
143  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
144  * standard element object lock -> GST_OBJECT_LOCK(agg)
145  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
146  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
147  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
148  */
149
150 /* GstAggregatorPad definitions */
151 #define PAD_LOCK(pad)   G_STMT_START {                                  \
152   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
153         g_thread_self());                                               \
154   g_mutex_lock(&pad->priv->lock);                                       \
155   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
156         g_thread_self());                                               \
157   } G_STMT_END
158
159 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
160   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
161       g_thread_self());                                                 \
162   g_mutex_unlock(&pad->priv->lock);                                     \
163   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
164         g_thread_self());                                               \
165   } G_STMT_END
166
167
168 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
169   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
170         g_thread_self());                                               \
171   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
172       (&((GstAggregatorPad*)pad)->priv->lock));                         \
173   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
174         g_thread_self());                                               \
175   } G_STMT_END
176
177 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
178   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
179         g_thread_self());                                              \
180   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
181   } G_STMT_END
182
183
184 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
185   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
186         g_thread_self());                                               \
187   g_mutex_lock(&pad->priv->flush_lock);                                 \
188   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
189         g_thread_self());                                               \
190   } G_STMT_END
191
192 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
193   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
194         g_thread_self());                                               \
195   g_mutex_unlock(&pad->priv->flush_lock);                               \
196   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
197         g_thread_self());                                               \
198   } G_STMT_END
199
200 #define SRC_LOCK(self)   G_STMT_START {                             \
201   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
202       g_thread_self());                                             \
203   g_mutex_lock(&self->priv->src_lock);                              \
204   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
205         g_thread_self());                                           \
206   } G_STMT_END
207
208 #define SRC_UNLOCK(self)  G_STMT_START {                            \
209   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
210         g_thread_self());                                           \
211   g_mutex_unlock(&self->priv->src_lock);                            \
212   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
213         g_thread_self());                                           \
214   } G_STMT_END
215
216 #define SRC_WAIT(self) G_STMT_START {                               \
217   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
218         g_thread_self());                                           \
219   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
220   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
221         g_thread_self());                                           \
222   } G_STMT_END
223
224 #define SRC_BROADCAST(self) G_STMT_START {                          \
225     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
226         g_thread_self());                                           \
227     if (self->priv->aggregate_id)                                   \
228       gst_clock_id_unschedule (self->priv->aggregate_id);           \
229     g_cond_broadcast(&(self->priv->src_cond));                      \
230   } G_STMT_END
231
232 struct _GstAggregatorPadPrivate
233 {
234   /* Following fields are protected by the PAD_LOCK */
235   GstFlowReturn flow_return;
236   gboolean pending_flush_start;
237   gboolean pending_flush_stop;
238
239   gboolean first_buffer;
240
241   GQueue data;                  /* buffers, events and queries */
242   GstBuffer *clipped_buffer;
243   guint num_buffers;
244
245   /* used to track fill state of queues, only used with live-src and when
246    * latency property is set to > 0 */
247   GstClockTime head_position;
248   GstClockTime tail_position;
249   GstClockTime head_time;       /* running time */
250   GstClockTime tail_time;
251   GstClockTime time_level;      /* how much head is ahead of tail */
252   GstSegment head_segment;      /* segment before the queue */
253
254   gboolean negotiated;
255
256   gboolean eos;
257
258   GMutex lock;
259   GCond event_cond;
260   /* This lock prevents a flush start processing happening while
261    * the chain function is also happening.
262    */
263   GMutex flush_lock;
264 };
265
266 /* Must be called with PAD_LOCK held */
267 static void
268 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
269 {
270   aggpad->priv->eos = FALSE;
271   aggpad->priv->flow_return = GST_FLOW_OK;
272   GST_OBJECT_LOCK (aggpad);
273   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
274   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
275   GST_OBJECT_UNLOCK (aggpad);
276   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
277   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
278   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
279   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
280   aggpad->priv->time_level = 0;
281   aggpad->priv->first_buffer = TRUE;
282 }
283
284 static gboolean
285 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
286 {
287   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
288
289   PAD_LOCK (aggpad);
290   gst_aggregator_pad_reset_unlocked (aggpad);
291   PAD_UNLOCK (aggpad);
292
293   if (klass->flush)
294     return klass->flush (aggpad, agg);
295
296   return TRUE;
297 }
298
299 /*************************************
300  * GstAggregator implementation  *
301  *************************************/
302 static GstElementClass *aggregator_parent_class = NULL;
303 static gint aggregator_private_offset = 0;
304
305 /* All members are protected by the object lock unless otherwise noted */
306
307 struct _GstAggregatorPrivate
308 {
309   gint max_padserial;
310
311   /* Our state is >= PAUSED */
312   gboolean running;             /* protected by src_lock */
313
314   /* seqnum from seek or segment,
315    * to be applied to synthetic segment/eos events */
316   gint seqnum;
317   gboolean send_stream_start;   /* protected by srcpad stream lock */
318   gboolean send_segment;
319   gboolean flush_seeking;
320   gboolean pending_flush_start;
321   gboolean send_eos;            /* protected by srcpad stream lock */
322
323   GstCaps *srccaps;             /* protected by the srcpad stream lock */
324
325   GstTagList *tags;
326   gboolean tags_changed;
327
328   gboolean peer_latency_live;   /* protected by src_lock */
329   GstClockTime peer_latency_min;        /* protected by src_lock */
330   GstClockTime peer_latency_max;        /* protected by src_lock */
331   gboolean has_peer_latency;    /* protected by src_lock */
332
333   GstClockTime sub_latency_min; /* protected by src_lock */
334   GstClockTime sub_latency_max; /* protected by src_lock */
335
336   GstClockTime upstream_latency_min;    /* protected by src_lock */
337
338   /* aggregate */
339   GstClockID aggregate_id;      /* protected by src_lock */
340   GMutex src_lock;
341   GCond src_cond;
342
343   gboolean first_buffer;        /* protected by object lock */
344   GstAggregatorStartTimeSelection start_time_selection;
345   GstClockTime start_time;
346
347   /* protected by the object lock */
348   GstQuery *allocation_query;
349   GstAllocator *allocator;
350   GstBufferPool *pool;
351   GstAllocationParams allocation_params;
352
353   /* properties */
354   gint64 latency;               /* protected by both src_lock and all pad locks */
355 };
356
357 /* Seek event forwarding helper */
358 typedef struct
359 {
360   /* parameters */
361   GstEvent *event;
362   gboolean flush;
363   gboolean only_to_active_pads;
364
365   /* results */
366   gboolean result;
367   gboolean one_actually_seeked;
368 } EventData;
369
370 #define DEFAULT_LATENCY              0
371 #define DEFAULT_MIN_UPSTREAM_LATENCY              0
372 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
373 #define DEFAULT_START_TIME           (-1)
374
375 enum
376 {
377   PROP_0,
378   PROP_LATENCY,
379   PROP_MIN_UPSTREAM_LATENCY,
380   PROP_START_TIME_SELECTION,
381   PROP_START_TIME,
382   PROP_LAST
383 };
384
385 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
386     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
387
388 static gboolean
389 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
390 {
391   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
392       pad->priv->clipped_buffer == NULL);
393 }
394
395 static gboolean
396 gst_aggregator_check_pads_ready (GstAggregator * self)
397 {
398   GstAggregatorPad *pad = NULL;
399   GList *l, *sinkpads;
400   gboolean have_buffer = TRUE;
401   gboolean have_event_or_query = FALSE;
402
403   GST_LOG_OBJECT (self, "checking pads");
404
405   GST_OBJECT_LOCK (self);
406
407   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
408   if (sinkpads == NULL)
409     goto no_sinkpads;
410
411   for (l = sinkpads; l != NULL; l = l->next) {
412     pad = l->data;
413
414     PAD_LOCK (pad);
415
416     if (pad->priv->num_buffers == 0) {
417       if (!gst_aggregator_pad_queue_is_empty (pad))
418         have_event_or_query = TRUE;
419       if (!pad->priv->eos) {
420         have_buffer = FALSE;
421
422         /* If not live we need data on all pads, so leave the loop */
423         if (!self->priv->peer_latency_live) {
424           PAD_UNLOCK (pad);
425           goto pad_not_ready;
426         }
427       }
428     } else if (self->priv->peer_latency_live) {
429       /* In live mode, having a single pad with buffers is enough to
430        * generate a start time from it. In non-live mode all pads need
431        * to have a buffer
432        */
433       self->priv->first_buffer = FALSE;
434     }
435
436     PAD_UNLOCK (pad);
437   }
438
439   if (!have_buffer && !have_event_or_query)
440     goto pad_not_ready;
441
442   if (have_buffer)
443     self->priv->first_buffer = FALSE;
444
445   GST_OBJECT_UNLOCK (self);
446   GST_LOG_OBJECT (self, "pads are ready");
447   return TRUE;
448
449 no_sinkpads:
450   {
451     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
452     GST_OBJECT_UNLOCK (self);
453     return FALSE;
454   }
455 pad_not_ready:
456   {
457     if (have_event_or_query)
458       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
459           " but waking up for serialized event");
460     else
461       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
462     GST_OBJECT_UNLOCK (self);
463     return have_event_or_query;
464   }
465 }
466
467 static void
468 gst_aggregator_reset_flow_values (GstAggregator * self)
469 {
470   GST_OBJECT_LOCK (self);
471   self->priv->send_stream_start = TRUE;
472   self->priv->send_segment = TRUE;
473   gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
474       GST_FORMAT_TIME);
475   self->priv->first_buffer = TRUE;
476   GST_OBJECT_UNLOCK (self);
477 }
478
479 static inline void
480 gst_aggregator_push_mandatory_events (GstAggregator * self)
481 {
482   GstAggregatorPrivate *priv = self->priv;
483   GstEvent *segment = NULL;
484   GstEvent *tags = NULL;
485
486   if (self->priv->send_stream_start) {
487     gchar s_id[32];
488
489     GST_INFO_OBJECT (self, "pushing stream start");
490     /* stream-start (FIXME: create id based on input ids) */
491     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
492     if (!gst_pad_push_event (GST_PAD (self->srcpad),
493             gst_event_new_stream_start (s_id))) {
494       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
495     }
496     self->priv->send_stream_start = FALSE;
497   }
498
499   if (self->priv->srccaps) {
500
501     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
502         self->priv->srccaps);
503     if (!gst_pad_push_event (GST_PAD (self->srcpad),
504             gst_event_new_caps (self->priv->srccaps))) {
505       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
506     }
507     gst_caps_unref (self->priv->srccaps);
508     self->priv->srccaps = NULL;
509   }
510
511   GST_OBJECT_LOCK (self);
512   if (self->priv->send_segment && !self->priv->flush_seeking) {
513     segment =
514         gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
515
516     if (!self->priv->seqnum)
517       /* This code-path is in preparation to be able to run without a source
518        * connected. Then we won't have a seq-num from a segment event. */
519       self->priv->seqnum = gst_event_get_seqnum (segment);
520     else
521       gst_event_set_seqnum (segment, self->priv->seqnum);
522     self->priv->send_segment = FALSE;
523
524     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
525   }
526
527   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
528     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
529     priv->tags_changed = FALSE;
530   }
531   GST_OBJECT_UNLOCK (self);
532
533   if (segment)
534     gst_pad_push_event (self->srcpad, segment);
535   if (tags)
536     gst_pad_push_event (self->srcpad, tags);
537
538 }
539
540 /**
541  * gst_aggregator_set_src_caps:
542  * @self: The #GstAggregator
543  * @caps: The #GstCaps to set on the src pad.
544  *
545  * Sets the caps to be used on the src pad.
546  */
547 void
548 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
549 {
550   GST_PAD_STREAM_LOCK (self->srcpad);
551   gst_caps_replace (&self->priv->srccaps, caps);
552   gst_aggregator_push_mandatory_events (self);
553   GST_PAD_STREAM_UNLOCK (self->srcpad);
554 }
555
556 static GstFlowReturn
557 gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
558 {
559   gst_aggregator_push_mandatory_events (self);
560
561   GST_OBJECT_LOCK (self);
562   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
563     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
564     GST_OBJECT_UNLOCK (self);
565     return gst_pad_push (self->srcpad, buffer);
566   } else {
567     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
568         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
569     GST_OBJECT_UNLOCK (self);
570     gst_buffer_unref (buffer);
571     return GST_FLOW_OK;
572   }
573 }
574
575 /**
576  * gst_aggregator_finish_buffer:
577  * @aggregator: The #GstAggregator
578  * @buffer: (transfer full): the #GstBuffer to push.
579  *
580  * This method will push the provided output buffer downstream. If needed,
581  * mandatory events such as stream-start, caps, and segment events will be
582  * sent before pushing the buffer.
583  */
584 GstFlowReturn
585 gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
586 {
587   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
588
589   g_assert (klass->finish_buffer != NULL);
590
591   return klass->finish_buffer (aggregator, buffer);
592 }
593
594 static void
595 gst_aggregator_push_eos (GstAggregator * self)
596 {
597   GstEvent *event;
598   gst_aggregator_push_mandatory_events (self);
599
600   event = gst_event_new_eos ();
601
602   GST_OBJECT_LOCK (self);
603   self->priv->send_eos = FALSE;
604   gst_event_set_seqnum (event, self->priv->seqnum);
605   GST_OBJECT_UNLOCK (self);
606
607   gst_pad_push_event (self->srcpad, event);
608 }
609
610 static GstClockTime
611 gst_aggregator_get_next_time (GstAggregator * self)
612 {
613   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
614
615   if (klass->get_next_time)
616     return klass->get_next_time (self);
617
618   return GST_CLOCK_TIME_NONE;
619 }
620
621 static gboolean
622 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
623 {
624   GstClockTime latency;
625   GstClockTime start;
626   gboolean res;
627
628   *timeout = FALSE;
629
630   SRC_LOCK (self);
631
632   latency = gst_aggregator_get_latency_unlocked (self);
633
634   if (gst_aggregator_check_pads_ready (self)) {
635     GST_DEBUG_OBJECT (self, "all pads have data");
636     SRC_UNLOCK (self);
637
638     return TRUE;
639   }
640
641   /* Before waiting, check if we're actually still running */
642   if (!self->priv->running || !self->priv->send_eos) {
643     SRC_UNLOCK (self);
644
645     return FALSE;
646   }
647
648   start = gst_aggregator_get_next_time (self);
649
650   /* If we're not live, or if we use the running time
651    * of the first buffer as start time, we wait until
652    * all pads have buffers.
653    * Otherwise (i.e. if we are live!), we wait on the clock
654    * and if a pad does not have a buffer in time we ignore
655    * that pad.
656    */
657   GST_OBJECT_LOCK (self);
658   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
659       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
660       !GST_CLOCK_TIME_IS_VALID (start) ||
661       (self->priv->first_buffer
662           && self->priv->start_time_selection ==
663           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
664     /* We wake up here when something happened, and below
665      * then check if we're ready now. If we return FALSE,
666      * we will be directly called again.
667      */
668     GST_OBJECT_UNLOCK (self);
669     SRC_WAIT (self);
670   } else {
671     GstClockTime base_time, time;
672     GstClock *clock;
673     GstClockReturn status;
674     GstClockTimeDiff jitter;
675
676     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
677         GST_TIME_ARGS (start));
678
679     base_time = GST_ELEMENT_CAST (self)->base_time;
680     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
681     GST_OBJECT_UNLOCK (self);
682
683     time = base_time + start;
684     time += latency;
685
686     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
687         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
688         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
689         GST_TIME_ARGS (time),
690         GST_TIME_ARGS (base_time),
691         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
692         GST_TIME_ARGS (gst_clock_get_time (clock)));
693
694     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
695     gst_object_unref (clock);
696     SRC_UNLOCK (self);
697
698     jitter = 0;
699     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
700
701     SRC_LOCK (self);
702     if (self->priv->aggregate_id) {
703       gst_clock_id_unref (self->priv->aggregate_id);
704       self->priv->aggregate_id = NULL;
705     }
706
707     GST_DEBUG_OBJECT (self,
708         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
709         status, GST_STIME_ARGS (jitter));
710
711     /* we timed out */
712     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
713       SRC_UNLOCK (self);
714       *timeout = TRUE;
715       return TRUE;
716     }
717   }
718
719   res = gst_aggregator_check_pads_ready (self);
720   SRC_UNLOCK (self);
721
722   return res;
723 }
724
725 typedef struct
726 {
727   gboolean processed_event;
728   GstFlowReturn flow_ret;
729 } DoHandleEventsAndQueriesData;
730
731 static gboolean
732 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
733     gpointer user_data)
734 {
735   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
736   GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
737   GstEvent *event = NULL;
738   GstQuery *query = NULL;
739   GstAggregatorClass *klass = NULL;
740   DoHandleEventsAndQueriesData *data = user_data;
741
742   do {
743     event = NULL;
744     query = NULL;
745
746     PAD_LOCK (pad);
747     if (pad->priv->clipped_buffer == NULL &&
748         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
749       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
750         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
751       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
752         query = g_queue_peek_tail (&pad->priv->data);
753     }
754     PAD_UNLOCK (pad);
755     if (event || query) {
756       gboolean ret;
757
758       data->processed_event = TRUE;
759       if (klass == NULL)
760         klass = GST_AGGREGATOR_GET_CLASS (self);
761
762       if (event) {
763         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
764         gst_event_ref (event);
765         ret = klass->sink_event (aggregator, pad, event);
766
767         PAD_LOCK (pad);
768         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
769           pad->priv->negotiated = ret;
770           if (!ret)
771             pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED;
772         }
773         if (g_queue_peek_tail (&pad->priv->data) == event)
774           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
775         gst_event_unref (event);
776       } else if (query) {
777         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
778         ret = klass->sink_query (aggregator, pad, query);
779
780         PAD_LOCK (pad);
781         if (g_queue_peek_tail (&pad->priv->data) == query) {
782           GstStructure *s;
783
784           s = gst_query_writable_structure (query);
785           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
786               NULL);
787           g_queue_pop_tail (&pad->priv->data);
788         }
789       }
790
791       PAD_BROADCAST_EVENT (pad);
792       PAD_UNLOCK (pad);
793     }
794   } while (event || query);
795
796   return TRUE;
797 }
798
799 static gboolean
800 gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
801     gpointer user_data)
802 {
803   GList *item;
804   GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
805   GstAggregator *agg = (GstAggregator *) self;
806   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
807
808   if (!klass->skip_buffer)
809     return FALSE;
810
811   PAD_LOCK (aggpad);
812
813   item = g_queue_peek_head_link (&aggpad->priv->data);
814   while (item) {
815     GList *next = item->next;
816
817     if (GST_IS_BUFFER (item->data)
818         && klass->skip_buffer (aggpad, agg, item->data)) {
819       GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
820       gst_aggregator_pad_buffer_consumed (aggpad);
821       gst_buffer_unref (item->data);
822       g_queue_delete_link (&aggpad->priv->data, item);
823     } else {
824       break;
825     }
826
827     item = next;
828   }
829
830   PAD_UNLOCK (aggpad);
831
832   return TRUE;
833 }
834
835 static void
836 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
837     GstFlowReturn flow_return, gboolean full)
838 {
839   GList *item;
840
841   PAD_LOCK (aggpad);
842   if (flow_return == GST_FLOW_NOT_LINKED)
843     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
844   else
845     aggpad->priv->flow_return = flow_return;
846
847   item = g_queue_peek_head_link (&aggpad->priv->data);
848   while (item) {
849     GList *next = item->next;
850
851     /* In partial flush, we do like the pad, we get rid of non-sticky events
852      * and EOS/SEGMENT.
853      */
854     if (full || GST_IS_BUFFER (item->data) ||
855         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
856         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
857         !GST_EVENT_IS_STICKY (item->data)) {
858       if (!GST_IS_QUERY (item->data))
859         gst_mini_object_unref (item->data);
860       g_queue_delete_link (&aggpad->priv->data, item);
861     }
862     item = next;
863   }
864   aggpad->priv->num_buffers = 0;
865   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
866
867   PAD_BROADCAST_EVENT (aggpad);
868   PAD_UNLOCK (aggpad);
869 }
870
871 static GstFlowReturn
872 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
873     GstCaps ** ret)
874 {
875   *ret = gst_caps_ref (caps);
876
877   return GST_FLOW_OK;
878 }
879
880 static GstCaps *
881 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
882 {
883   caps = gst_caps_fixate (caps);
884
885   return caps;
886 }
887
888 static gboolean
889 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
890 {
891   return TRUE;
892 }
893
894
895 /* takes ownership of the pool, allocator and query */
896 static gboolean
897 gst_aggregator_set_allocation (GstAggregator * self,
898     GstBufferPool * pool, GstAllocator * allocator,
899     GstAllocationParams * params, GstQuery * query)
900 {
901   GstAllocator *oldalloc;
902   GstBufferPool *oldpool;
903   GstQuery *oldquery;
904
905   GST_DEBUG ("storing allocation query");
906
907   GST_OBJECT_LOCK (self);
908   oldpool = self->priv->pool;
909   self->priv->pool = pool;
910
911   oldalloc = self->priv->allocator;
912   self->priv->allocator = allocator;
913
914   oldquery = self->priv->allocation_query;
915   self->priv->allocation_query = query;
916
917   if (params)
918     self->priv->allocation_params = *params;
919   else
920     gst_allocation_params_init (&self->priv->allocation_params);
921   GST_OBJECT_UNLOCK (self);
922
923   if (oldpool) {
924     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
925     gst_buffer_pool_set_active (oldpool, FALSE);
926     gst_object_unref (oldpool);
927   }
928   if (oldalloc) {
929     gst_object_unref (oldalloc);
930   }
931   if (oldquery) {
932     gst_query_unref (oldquery);
933   }
934   return TRUE;
935 }
936
937
938 static gboolean
939 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
940 {
941   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
942
943   if (aggclass->decide_allocation)
944     if (!aggclass->decide_allocation (self, query))
945       return FALSE;
946
947   return TRUE;
948 }
949
950 static gboolean
951 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
952 {
953   GstQuery *query;
954   gboolean result = TRUE;
955   GstBufferPool *pool = NULL;
956   GstAllocator *allocator;
957   GstAllocationParams params;
958
959   /* find a pool for the negotiated caps now */
960   GST_DEBUG_OBJECT (self, "doing allocation query");
961   query = gst_query_new_allocation (caps, TRUE);
962   if (!gst_pad_peer_query (self->srcpad, query)) {
963     /* not a problem, just debug a little */
964     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
965   }
966
967   GST_DEBUG_OBJECT (self, "calling decide_allocation");
968   result = gst_aggregator_decide_allocation (self, query);
969
970   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
971       query);
972
973   if (!result)
974     goto no_decide_allocation;
975
976   /* we got configuration from our peer or the decide_allocation method,
977    * parse them */
978   if (gst_query_get_n_allocation_params (query) > 0) {
979     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
980   } else {
981     allocator = NULL;
982     gst_allocation_params_init (&params);
983   }
984
985   if (gst_query_get_n_allocation_pools (query) > 0)
986     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
987
988   /* now store */
989   result =
990       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
991
992   return result;
993
994   /* Errors */
995 no_decide_allocation:
996   {
997     GST_WARNING_OBJECT (self, "Failed to decide allocation");
998     gst_query_unref (query);
999
1000     return result;
1001   }
1002
1003 }
1004
1005 /* WITH SRC_LOCK held */
1006 static GstFlowReturn
1007 gst_aggregator_update_src_caps (GstAggregator * self)
1008 {
1009   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1010   GstCaps *downstream_caps, *template_caps, *caps = NULL;
1011   GstFlowReturn ret = GST_FLOW_OK;
1012
1013   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1014   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1015
1016   if (gst_caps_is_empty (downstream_caps)) {
1017     GST_INFO_OBJECT (self, "Downstream caps (%"
1018         GST_PTR_FORMAT ") not compatible with pad template caps (%"
1019         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1020     ret = GST_FLOW_NOT_NEGOTIATED;
1021     goto done;
1022   }
1023
1024   g_assert (agg_klass->update_src_caps);
1025   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1026       downstream_caps);
1027   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1028   if (ret < GST_FLOW_OK) {
1029     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1030     goto done;
1031   }
1032   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1033     ret = GST_FLOW_NOT_NEGOTIATED;
1034     goto done;
1035   }
1036   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1037
1038 #ifdef GST_ENABLE_EXTRA_CHECKS
1039   if (!gst_caps_is_subset (caps, template_caps)) {
1040     GstCaps *intersection;
1041
1042     GST_ERROR_OBJECT (self,
1043         "update_src_caps returned caps %" GST_PTR_FORMAT
1044         " which are not a real subset of the template caps %"
1045         GST_PTR_FORMAT, caps, template_caps);
1046     g_warning ("%s: update_src_caps returned caps which are not a real "
1047         "subset of the filter caps", GST_ELEMENT_NAME (self));
1048
1049     intersection =
1050         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1051     gst_caps_unref (caps);
1052     caps = intersection;
1053   }
1054 #endif
1055
1056   if (gst_caps_is_any (caps)) {
1057     goto done;
1058   }
1059
1060   if (!gst_caps_is_fixed (caps)) {
1061     g_assert (agg_klass->fixate_src_caps);
1062
1063     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1064     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1065       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1066       ret = GST_FLOW_NOT_NEGOTIATED;
1067       goto done;
1068     }
1069     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1070   }
1071
1072   if (agg_klass->negotiated_src_caps) {
1073     if (!agg_klass->negotiated_src_caps (self, caps)) {
1074       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1075       ret = GST_FLOW_NOT_NEGOTIATED;
1076       goto done;
1077     }
1078   }
1079
1080   gst_aggregator_set_src_caps (self, caps);
1081
1082   if (!gst_aggregator_do_allocation (self, caps)) {
1083     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1084     ret = GST_FLOW_NOT_NEGOTIATED;
1085   }
1086
1087 done:
1088   gst_caps_unref (downstream_caps);
1089   gst_caps_unref (template_caps);
1090
1091   if (caps)
1092     gst_caps_unref (caps);
1093
1094   return ret;
1095 }
1096
1097 static void
1098 gst_aggregator_aggregate_func (GstAggregator * self)
1099 {
1100   GstAggregatorPrivate *priv = self->priv;
1101   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1102   gboolean timeout = FALSE;
1103
1104   if (self->priv->running == FALSE) {
1105     GST_DEBUG_OBJECT (self, "Not running anymore");
1106     return;
1107   }
1108
1109   GST_LOG_OBJECT (self, "Checking aggregate");
1110   while (priv->send_eos && priv->running) {
1111     GstFlowReturn flow_return = GST_FLOW_OK;
1112     DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
1113
1114     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1115         gst_aggregator_do_events_and_queries, &events_query_data);
1116
1117     if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1118       goto handle_error;
1119
1120     if (self->priv->peer_latency_live)
1121       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1122           gst_aggregator_pad_skip_buffers, NULL);
1123
1124     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
1125      * the queue */
1126     if (!gst_aggregator_wait_and_check (self, &timeout))
1127       continue;
1128
1129     events_query_data.processed_event = FALSE;
1130     events_query_data.flow_ret = GST_FLOW_OK;
1131     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1132         gst_aggregator_do_events_and_queries, &events_query_data);
1133
1134     if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1135       goto handle_error;
1136
1137     if (events_query_data.processed_event)
1138       continue;
1139
1140     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1141       flow_return = gst_aggregator_update_src_caps (self);
1142       if (flow_return != GST_FLOW_OK)
1143         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1144     }
1145
1146     if (timeout || flow_return >= GST_FLOW_OK) {
1147       GST_TRACE_OBJECT (self, "Actually aggregating!");
1148       flow_return = klass->aggregate (self, timeout);
1149     }
1150
1151     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1152       continue;
1153
1154     GST_OBJECT_LOCK (self);
1155     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
1156       /* We don't want to set the pads to flushing, but we want to
1157        * stop the thread, so just break here */
1158       GST_OBJECT_UNLOCK (self);
1159       break;
1160     }
1161     GST_OBJECT_UNLOCK (self);
1162
1163     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1164       gst_aggregator_push_eos (self);
1165     }
1166
1167   handle_error:
1168     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1169
1170     if (flow_return != GST_FLOW_OK) {
1171       GList *item;
1172
1173       GST_OBJECT_LOCK (self);
1174       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1175         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1176
1177         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1178       }
1179       GST_OBJECT_UNLOCK (self);
1180       break;
1181     }
1182   }
1183
1184   /* Pause the task here, the only ways to get here are:
1185    * 1) We're stopping, in which case the task is stopped anyway
1186    * 2) We got a flow error above, in which case it might take
1187    *    some time to forward the flow return upstream and we
1188    *    would otherwise call the task function over and over
1189    *    again without doing anything
1190    */
1191   gst_pad_pause_task (self->srcpad);
1192 }
1193
1194 static gboolean
1195 gst_aggregator_start (GstAggregator * self)
1196 {
1197   GstAggregatorClass *klass;
1198   gboolean result;
1199
1200   self->priv->send_stream_start = TRUE;
1201   self->priv->send_segment = TRUE;
1202   self->priv->send_eos = TRUE;
1203   self->priv->srccaps = NULL;
1204
1205   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1206
1207   klass = GST_AGGREGATOR_GET_CLASS (self);
1208
1209   if (klass->start)
1210     result = klass->start (self);
1211   else
1212     result = TRUE;
1213
1214   return result;
1215 }
1216
1217 static gboolean
1218 _check_pending_flush_stop (GstAggregatorPad * pad)
1219 {
1220   gboolean res;
1221
1222   PAD_LOCK (pad);
1223   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1224   PAD_UNLOCK (pad);
1225
1226   return res;
1227 }
1228
1229 static gboolean
1230 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1231 {
1232   gboolean res = TRUE;
1233
1234   GST_INFO_OBJECT (self, "%s srcpad task",
1235       flush_start ? "Pausing" : "Stopping");
1236
1237   SRC_LOCK (self);
1238   self->priv->running = FALSE;
1239   SRC_BROADCAST (self);
1240   SRC_UNLOCK (self);
1241
1242   if (flush_start) {
1243     res = gst_pad_push_event (self->srcpad, flush_start);
1244   }
1245
1246   gst_pad_stop_task (self->srcpad);
1247
1248   return res;
1249 }
1250
1251 static void
1252 gst_aggregator_start_srcpad_task (GstAggregator * self)
1253 {
1254   GST_INFO_OBJECT (self, "Starting srcpad task");
1255
1256   self->priv->running = TRUE;
1257   gst_pad_start_task (GST_PAD (self->srcpad),
1258       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1259 }
1260
1261 static GstFlowReturn
1262 gst_aggregator_flush (GstAggregator * self)
1263 {
1264   GstFlowReturn ret = GST_FLOW_OK;
1265   GstAggregatorPrivate *priv = self->priv;
1266   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1267
1268   GST_DEBUG_OBJECT (self, "Flushing everything");
1269   GST_OBJECT_LOCK (self);
1270   priv->send_segment = TRUE;
1271   priv->flush_seeking = FALSE;
1272   priv->tags_changed = FALSE;
1273   GST_OBJECT_UNLOCK (self);
1274   if (klass->flush)
1275     ret = klass->flush (self);
1276
1277   return ret;
1278 }
1279
1280
1281 /* Called with GstAggregator's object lock held */
1282
1283 static gboolean
1284 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1285 {
1286   GList *tmp;
1287   GstAggregatorPad *tmppad;
1288
1289   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1290     tmppad = (GstAggregatorPad *) tmp->data;
1291
1292     if (_check_pending_flush_stop (tmppad) == FALSE) {
1293       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1294           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1295       return FALSE;
1296     }
1297   }
1298
1299   return TRUE;
1300 }
1301
1302 static void
1303 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1304     GstEvent * event)
1305 {
1306   GstAggregatorPrivate *priv = self->priv;
1307   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1308
1309   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1310
1311   PAD_FLUSH_LOCK (aggpad);
1312   PAD_LOCK (aggpad);
1313   if (padpriv->pending_flush_start) {
1314     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1315
1316     padpriv->pending_flush_start = FALSE;
1317     padpriv->pending_flush_stop = TRUE;
1318   }
1319   PAD_UNLOCK (aggpad);
1320
1321   GST_OBJECT_LOCK (self);
1322   if (priv->flush_seeking) {
1323     /* If flush_seeking we forward the first FLUSH_START */
1324     if (priv->pending_flush_start) {
1325       priv->pending_flush_start = FALSE;
1326       GST_OBJECT_UNLOCK (self);
1327
1328       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1329       gst_aggregator_stop_srcpad_task (self, event);
1330
1331       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1332       GST_PAD_STREAM_LOCK (self->srcpad);
1333       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1334       event = NULL;
1335     } else {
1336       GST_OBJECT_UNLOCK (self);
1337       gst_event_unref (event);
1338     }
1339   } else {
1340     GST_OBJECT_UNLOCK (self);
1341     gst_event_unref (event);
1342   }
1343   PAD_FLUSH_UNLOCK (aggpad);
1344 }
1345
1346 /* Must be called with the the PAD_LOCK held */
1347 static void
1348 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1349 {
1350   GstAggregatorPadPrivate *priv = aggpad->priv;
1351
1352   if (head) {
1353     if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
1354         priv->head_segment.format == GST_FORMAT_TIME)
1355       priv->head_time = gst_segment_to_running_time (&priv->head_segment,
1356           GST_FORMAT_TIME, priv->head_position);
1357     else
1358       priv->head_time = GST_CLOCK_TIME_NONE;
1359
1360     if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
1361       priv->tail_time = priv->head_time;
1362   } else {
1363     if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
1364         aggpad->segment.format == GST_FORMAT_TIME)
1365       priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
1366           GST_FORMAT_TIME, priv->tail_position);
1367     else
1368       priv->tail_time = priv->head_time;
1369   }
1370
1371   if (priv->head_time == GST_CLOCK_TIME_NONE ||
1372       priv->tail_time == GST_CLOCK_TIME_NONE) {
1373     priv->time_level = 0;
1374     return;
1375   }
1376
1377   if (priv->tail_time > priv->head_time)
1378     priv->time_level = 0;
1379   else
1380     priv->time_level = priv->head_time - priv->tail_time;
1381 }
1382
1383
1384 /* GstAggregator vmethods default implementations */
1385 static gboolean
1386 gst_aggregator_default_sink_event (GstAggregator * self,
1387     GstAggregatorPad * aggpad, GstEvent * event)
1388 {
1389   gboolean res = TRUE;
1390   GstPad *pad = GST_PAD (aggpad);
1391   GstAggregatorPrivate *priv = self->priv;
1392
1393   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1394
1395   switch (GST_EVENT_TYPE (event)) {
1396     case GST_EVENT_FLUSH_START:
1397     {
1398       gst_aggregator_flush_start (self, aggpad, event);
1399       /* We forward only in one case: right after flush_seeking */
1400       event = NULL;
1401       goto eat;
1402     }
1403     case GST_EVENT_FLUSH_STOP:
1404     {
1405       gst_aggregator_pad_flush (aggpad, self);
1406       GST_OBJECT_LOCK (self);
1407       if (priv->flush_seeking) {
1408         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1409         if (gst_aggregator_all_flush_stop_received_locked (self)) {
1410           GST_OBJECT_UNLOCK (self);
1411           /* That means we received FLUSH_STOP/FLUSH_STOP on
1412            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1413           gst_aggregator_flush (self);
1414           gst_pad_push_event (self->srcpad, event);
1415           event = NULL;
1416           SRC_LOCK (self);
1417           priv->send_eos = TRUE;
1418           SRC_BROADCAST (self);
1419           SRC_UNLOCK (self);
1420
1421           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1422           GST_PAD_STREAM_UNLOCK (self->srcpad);
1423           gst_aggregator_start_srcpad_task (self);
1424         } else {
1425           GST_OBJECT_UNLOCK (self);
1426         }
1427       } else {
1428         GST_OBJECT_UNLOCK (self);
1429       }
1430
1431       /* We never forward the event */
1432       goto eat;
1433     }
1434     case GST_EVENT_EOS:
1435     {
1436       SRC_LOCK (self);
1437       PAD_LOCK (aggpad);
1438       g_assert (aggpad->priv->num_buffers == 0);
1439       aggpad->priv->eos = TRUE;
1440       PAD_UNLOCK (aggpad);
1441       SRC_BROADCAST (self);
1442       SRC_UNLOCK (self);
1443       goto eat;
1444     }
1445     case GST_EVENT_SEGMENT:
1446     {
1447       PAD_LOCK (aggpad);
1448       GST_OBJECT_LOCK (aggpad);
1449       gst_event_copy_segment (event, &aggpad->segment);
1450       /* We've got a new segment, tail_position is now meaningless
1451        * and may interfere with the time_level calculation
1452        */
1453       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1454       update_time_level (aggpad, FALSE);
1455       GST_OBJECT_UNLOCK (aggpad);
1456       PAD_UNLOCK (aggpad);
1457
1458       GST_OBJECT_LOCK (self);
1459       self->priv->seqnum = gst_event_get_seqnum (event);
1460       GST_OBJECT_UNLOCK (self);
1461       goto eat;
1462     }
1463     case GST_EVENT_STREAM_START:
1464     {
1465       goto eat;
1466     }
1467     case GST_EVENT_GAP:
1468     {
1469       GstClockTime pts, endpts;
1470       GstClockTime duration;
1471       GstBuffer *gapbuf;
1472
1473       gst_event_parse_gap (event, &pts, &duration);
1474       gapbuf = gst_buffer_new ();
1475
1476       if (GST_CLOCK_TIME_IS_VALID (duration))
1477         endpts = pts + duration;
1478       else
1479         endpts = GST_CLOCK_TIME_NONE;
1480
1481       GST_OBJECT_LOCK (aggpad);
1482       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1483           &pts, &endpts);
1484       GST_OBJECT_UNLOCK (aggpad);
1485
1486       if (!res) {
1487         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1488         goto eat;
1489       }
1490
1491       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1492         duration = endpts - pts;
1493       else
1494         duration = GST_CLOCK_TIME_NONE;
1495
1496       GST_BUFFER_PTS (gapbuf) = pts;
1497       GST_BUFFER_DURATION (gapbuf) = duration;
1498       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1499       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1500
1501       /* Remove GAP event so we can replace it with the buffer */
1502       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1503         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1504
1505       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1506           GST_FLOW_OK) {
1507         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1508         res = FALSE;
1509       }
1510
1511       goto eat;
1512     }
1513     case GST_EVENT_TAG:
1514       goto eat;
1515     default:
1516     {
1517       break;
1518     }
1519   }
1520
1521   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1522   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1523
1524 eat:
1525   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1526   if (event)
1527     gst_event_unref (event);
1528
1529   return res;
1530 }
1531
1532 static gboolean
1533 gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
1534 {
1535   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
1536   GstAggregator *agg = GST_AGGREGATOR_CAST (self);
1537
1538   gst_aggregator_pad_flush (pad, agg);
1539
1540   PAD_LOCK (pad);
1541   pad->priv->flow_return = GST_FLOW_FLUSHING;
1542   pad->priv->negotiated = FALSE;
1543   PAD_BROADCAST_EVENT (pad);
1544   PAD_UNLOCK (pad);
1545
1546   return TRUE;
1547 }
1548
1549 static gboolean
1550 gst_aggregator_stop (GstAggregator * agg)
1551 {
1552   GstAggregatorClass *klass;
1553   gboolean result;
1554
1555   gst_aggregator_reset_flow_values (agg);
1556
1557   /* Application needs to make sure no pads are added while it shuts us down */
1558   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
1559       gst_aggregator_stop_pad, NULL);
1560
1561   klass = GST_AGGREGATOR_GET_CLASS (agg);
1562
1563   if (klass->stop)
1564     result = klass->stop (agg);
1565   else
1566     result = TRUE;
1567
1568   agg->priv->has_peer_latency = FALSE;
1569   agg->priv->peer_latency_live = FALSE;
1570   agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
1571
1572   if (agg->priv->tags)
1573     gst_tag_list_unref (agg->priv->tags);
1574   agg->priv->tags = NULL;
1575
1576   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1577
1578   return result;
1579 }
1580
1581 /* GstElement vmethods implementations */
1582 static GstStateChangeReturn
1583 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1584 {
1585   GstStateChangeReturn ret;
1586   GstAggregator *self = GST_AGGREGATOR (element);
1587
1588   switch (transition) {
1589     case GST_STATE_CHANGE_READY_TO_PAUSED:
1590       if (!gst_aggregator_start (self))
1591         goto error_start;
1592       break;
1593     default:
1594       break;
1595   }
1596
1597   if ((ret =
1598           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1599               transition)) == GST_STATE_CHANGE_FAILURE)
1600     goto failure;
1601
1602
1603   switch (transition) {
1604     case GST_STATE_CHANGE_PAUSED_TO_READY:
1605       if (!gst_aggregator_stop (self)) {
1606         /* What to do in this case? Error out? */
1607         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1608       }
1609       break;
1610     default:
1611       break;
1612   }
1613
1614   return ret;
1615
1616 /* ERRORS */
1617 failure:
1618   {
1619     GST_ERROR_OBJECT (element, "parent failed state change");
1620     return ret;
1621   }
1622 error_start:
1623   {
1624     GST_ERROR_OBJECT (element, "Subclass failed to start");
1625     return GST_STATE_CHANGE_FAILURE;
1626   }
1627 }
1628
1629 static void
1630 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1631 {
1632   GstAggregator *self = GST_AGGREGATOR (element);
1633   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1634
1635   GST_INFO_OBJECT (pad, "Removing pad");
1636
1637   SRC_LOCK (self);
1638   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1639   gst_element_remove_pad (element, pad);
1640
1641   self->priv->has_peer_latency = FALSE;
1642   SRC_BROADCAST (self);
1643   SRC_UNLOCK (self);
1644 }
1645
1646 static GstAggregatorPad *
1647 gst_aggregator_default_create_new_pad (GstAggregator * self,
1648     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1649 {
1650   GstAggregatorPad *agg_pad;
1651   GstAggregatorPrivate *priv = self->priv;
1652   gint serial = 0;
1653   gchar *name = NULL;
1654   GType pad_type =
1655       GST_PAD_TEMPLATE_GTYPE (templ) ==
1656       G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
1657
1658   if (templ->direction != GST_PAD_SINK)
1659     goto not_sink;
1660
1661   if (templ->presence != GST_PAD_REQUEST)
1662     goto not_request;
1663
1664   GST_OBJECT_LOCK (self);
1665   if (req_name == NULL || strlen (req_name) < 6
1666       || !g_str_has_prefix (req_name, "sink_")) {
1667     /* no name given when requesting the pad, use next available int */
1668     serial = ++priv->max_padserial;
1669   } else {
1670     /* parse serial number from requested padname */
1671     serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1672     if (serial > priv->max_padserial)
1673       priv->max_padserial = serial;
1674   }
1675
1676   name = g_strdup_printf ("sink_%u", serial);
1677   agg_pad = g_object_new (pad_type,
1678       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1679   g_free (name);
1680
1681   GST_OBJECT_UNLOCK (self);
1682
1683   return agg_pad;
1684
1685   /* errors */
1686 not_sink:
1687   {
1688     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
1689     return NULL;
1690   }
1691 not_request:
1692   {
1693     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
1694     return NULL;
1695   }
1696 }
1697
1698 static GstPad *
1699 gst_aggregator_request_new_pad (GstElement * element,
1700     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1701 {
1702   GstAggregator *self;
1703   GstAggregatorPad *agg_pad;
1704   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1705   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1706
1707   self = GST_AGGREGATOR (element);
1708
1709   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1710   if (!agg_pad) {
1711     GST_ERROR_OBJECT (element, "Couldn't create new pad");
1712     return NULL;
1713   }
1714
1715   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1716
1717   if (priv->running)
1718     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1719
1720   /* add the pad to the element */
1721   gst_element_add_pad (element, GST_PAD (agg_pad));
1722
1723   return GST_PAD (agg_pad);
1724 }
1725
1726 /* Must be called with SRC_LOCK held */
1727
1728 static gboolean
1729 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1730 {
1731   gboolean query_ret, live;
1732   GstClockTime our_latency, min, max;
1733
1734   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1735
1736   if (!query_ret) {
1737     GST_WARNING_OBJECT (self, "Latency query failed");
1738     return FALSE;
1739   }
1740
1741   gst_query_parse_latency (query, &live, &min, &max);
1742
1743   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1744     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1745         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1746     return FALSE;
1747   }
1748
1749   if (self->priv->upstream_latency_min > min) {
1750     GstClockTimeDiff diff =
1751         GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
1752
1753     min += diff;
1754     if (GST_CLOCK_TIME_IS_VALID (max)) {
1755       max += diff;
1756     }
1757   }
1758
1759   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1760     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1761         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1762             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1763             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1764     return FALSE;
1765   }
1766
1767   our_latency = self->priv->latency;
1768
1769   self->priv->peer_latency_live = live;
1770   self->priv->peer_latency_min = min;
1771   self->priv->peer_latency_max = max;
1772   self->priv->has_peer_latency = TRUE;
1773
1774   /* add our own */
1775   min += our_latency;
1776   min += self->priv->sub_latency_min;
1777   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1778       && GST_CLOCK_TIME_IS_VALID (max))
1779     max += self->priv->sub_latency_max + our_latency;
1780   else
1781     max = GST_CLOCK_TIME_NONE;
1782
1783   SRC_BROADCAST (self);
1784
1785   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1786       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1787
1788   gst_query_set_latency (query, live, min, max);
1789
1790   return query_ret;
1791 }
1792
1793 /*
1794  * MUST be called with the src_lock held.
1795  *
1796  * See  gst_aggregator_get_latency() for doc
1797  */
1798 static GstClockTime
1799 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1800 {
1801   GstClockTime latency;
1802
1803   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1804
1805   if (!self->priv->has_peer_latency) {
1806     GstQuery *query = gst_query_new_latency ();
1807     gboolean ret;
1808
1809     ret = gst_aggregator_query_latency_unlocked (self, query);
1810     gst_query_unref (query);
1811     if (!ret)
1812       return GST_CLOCK_TIME_NONE;
1813   }
1814
1815   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1816     return GST_CLOCK_TIME_NONE;
1817
1818   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1819   latency = self->priv->peer_latency_min;
1820
1821   /* add our own */
1822   latency += self->priv->latency;
1823   latency += self->priv->sub_latency_min;
1824
1825   return latency;
1826 }
1827
1828 /**
1829  * gst_aggregator_get_latency:
1830  * @self: a #GstAggregator
1831  *
1832  * Retrieves the latency values reported by @self in response to the latency
1833  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1834  * will not wait for the clock.
1835  *
1836  * Typically only called by subclasses.
1837  *
1838  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1839  */
1840 GstClockTime
1841 gst_aggregator_get_latency (GstAggregator * self)
1842 {
1843   GstClockTime ret;
1844
1845   SRC_LOCK (self);
1846   ret = gst_aggregator_get_latency_unlocked (self);
1847   SRC_UNLOCK (self);
1848
1849   return ret;
1850 }
1851
1852 static gboolean
1853 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1854 {
1855   GstAggregator *self = GST_AGGREGATOR (element);
1856
1857   GST_STATE_LOCK (element);
1858   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1859       GST_STATE (element) < GST_STATE_PAUSED) {
1860     gdouble rate;
1861     GstFormat fmt;
1862     GstSeekFlags flags;
1863     GstSeekType start_type, stop_type;
1864     gint64 start, stop;
1865
1866     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1867         &start, &stop_type, &stop);
1868
1869     GST_OBJECT_LOCK (self);
1870     gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
1871         flags, start_type, start, stop_type, stop, NULL);
1872     self->priv->seqnum = gst_event_get_seqnum (event);
1873     self->priv->first_buffer = FALSE;
1874     GST_OBJECT_UNLOCK (self);
1875
1876     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1877   }
1878   GST_STATE_UNLOCK (element);
1879
1880
1881   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1882       event);
1883 }
1884
1885 static gboolean
1886 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1887 {
1888   gboolean res = TRUE;
1889
1890   switch (GST_QUERY_TYPE (query)) {
1891     case GST_QUERY_SEEKING:
1892     {
1893       GstFormat format;
1894
1895       /* don't pass it along as some (file)sink might claim it does
1896        * whereas with a collectpads in between that will not likely work */
1897       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1898       gst_query_set_seeking (query, format, FALSE, 0, -1);
1899       res = TRUE;
1900
1901       break;
1902     }
1903     case GST_QUERY_LATENCY:
1904       SRC_LOCK (self);
1905       res = gst_aggregator_query_latency_unlocked (self, query);
1906       SRC_UNLOCK (self);
1907       break;
1908     default:
1909       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1910   }
1911
1912   return res;
1913 }
1914
1915 static gboolean
1916 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1917 {
1918   EventData *evdata = user_data;
1919   gboolean ret = TRUE;
1920   GstPad *peer = gst_pad_get_peer (pad);
1921   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1922
1923   if (peer) {
1924     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1925       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1926       ret = TRUE;
1927     } else {
1928       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1929       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1930     }
1931   }
1932
1933   if (ret == FALSE) {
1934     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1935       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1936
1937       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1938
1939       if (gst_pad_query (peer, seeking)) {
1940         gboolean seekable;
1941
1942         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1943
1944         if (seekable == FALSE) {
1945           GST_INFO_OBJECT (pad,
1946               "Source not seekable, We failed but it does not matter!");
1947
1948           ret = TRUE;
1949         }
1950       } else {
1951         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1952       }
1953
1954       gst_query_unref (seeking);
1955     }
1956
1957     if (evdata->flush) {
1958       PAD_LOCK (aggpad);
1959       aggpad->priv->pending_flush_start = FALSE;
1960       aggpad->priv->pending_flush_stop = FALSE;
1961       PAD_UNLOCK (aggpad);
1962     }
1963   } else {
1964     evdata->one_actually_seeked = TRUE;
1965   }
1966
1967   evdata->result &= ret;
1968
1969   if (peer)
1970     gst_object_unref (peer);
1971
1972   /* Always send to all pads */
1973   return FALSE;
1974 }
1975
1976 static void
1977 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1978     EventData * evdata)
1979 {
1980   evdata->result = TRUE;
1981   evdata->one_actually_seeked = FALSE;
1982
1983   /* We first need to set all pads as flushing in a first pass
1984    * as flush_start flush_stop is sometimes sent synchronously
1985    * while we send the seek event */
1986   if (evdata->flush) {
1987     GList *l;
1988
1989     GST_OBJECT_LOCK (self);
1990     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1991       GstAggregatorPad *pad = l->data;
1992
1993       PAD_LOCK (pad);
1994       pad->priv->pending_flush_start = TRUE;
1995       pad->priv->pending_flush_stop = FALSE;
1996       PAD_UNLOCK (pad);
1997     }
1998     GST_OBJECT_UNLOCK (self);
1999   }
2000
2001   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
2002
2003   gst_event_unref (evdata->event);
2004 }
2005
2006 static gboolean
2007 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
2008 {
2009   gdouble rate;
2010   GstFormat fmt;
2011   GstSeekFlags flags;
2012   GstSeekType start_type, stop_type;
2013   gint64 start, stop;
2014   gboolean flush;
2015   EventData evdata = { 0, };
2016   GstAggregatorPrivate *priv = self->priv;
2017
2018   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2019       &start, &stop_type, &stop);
2020
2021   GST_INFO_OBJECT (self, "starting SEEK");
2022
2023   flush = flags & GST_SEEK_FLAG_FLUSH;
2024
2025   GST_OBJECT_LOCK (self);
2026   if (flush) {
2027     priv->pending_flush_start = TRUE;
2028     priv->flush_seeking = TRUE;
2029   }
2030
2031   gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2032       flags, start_type, start, stop_type, stop, NULL);
2033
2034   /* Seeking sets a position */
2035   self->priv->first_buffer = FALSE;
2036   GST_OBJECT_UNLOCK (self);
2037
2038   /* forward the seek upstream */
2039   evdata.event = event;
2040   evdata.flush = flush;
2041   evdata.only_to_active_pads = FALSE;
2042   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2043   event = NULL;
2044
2045   if (!evdata.result || !evdata.one_actually_seeked) {
2046     GST_OBJECT_LOCK (self);
2047     priv->flush_seeking = FALSE;
2048     priv->pending_flush_start = FALSE;
2049     GST_OBJECT_UNLOCK (self);
2050   }
2051
2052   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2053
2054   return evdata.result;
2055 }
2056
2057 static gboolean
2058 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2059 {
2060   EventData evdata = { 0, };
2061
2062   switch (GST_EVENT_TYPE (event)) {
2063     case GST_EVENT_SEEK:
2064       /* _do_seek() unrefs the event. */
2065       return gst_aggregator_do_seek (self, event);
2066     case GST_EVENT_NAVIGATION:
2067       /* navigation is rather pointless. */
2068       gst_event_unref (event);
2069       return FALSE;
2070     default:
2071       break;
2072   }
2073
2074   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2075    * they will receive a QOS event that has earliest_time=0 (because we can't
2076    * have negative timestamps), and consider their buffer as too late */
2077   evdata.event = event;
2078   evdata.flush = FALSE;
2079   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2080   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2081   return evdata.result;
2082 }
2083
2084 static gboolean
2085 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2086     GstEvent * event)
2087 {
2088   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2089
2090   return klass->src_event (GST_AGGREGATOR (parent), event);
2091 }
2092
2093 static gboolean
2094 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2095     GstQuery * query)
2096 {
2097   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2098
2099   return klass->src_query (GST_AGGREGATOR (parent), query);
2100 }
2101
2102 static gboolean
2103 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2104     GstObject * parent, GstPadMode mode, gboolean active)
2105 {
2106   GstAggregator *self = GST_AGGREGATOR (parent);
2107   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2108
2109   if (klass->src_activate) {
2110     if (klass->src_activate (self, mode, active) == FALSE) {
2111       return FALSE;
2112     }
2113   }
2114
2115   if (active == TRUE) {
2116     switch (mode) {
2117       case GST_PAD_MODE_PUSH:
2118       {
2119         GST_INFO_OBJECT (pad, "Activating pad!");
2120         gst_aggregator_start_srcpad_task (self);
2121         return TRUE;
2122       }
2123       default:
2124       {
2125         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2126         return FALSE;
2127       }
2128     }
2129   }
2130
2131   /* deactivating */
2132   GST_INFO_OBJECT (self, "Deactivating srcpad");
2133   gst_aggregator_stop_srcpad_task (self, FALSE);
2134
2135   return TRUE;
2136 }
2137
2138 static gboolean
2139 gst_aggregator_default_sink_query (GstAggregator * self,
2140     GstAggregatorPad * aggpad, GstQuery * query)
2141 {
2142   GstPad *pad = GST_PAD (aggpad);
2143
2144   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2145     GstQuery *decide_query = NULL;
2146     GstAggregatorClass *agg_class;
2147     gboolean ret;
2148
2149     GST_OBJECT_LOCK (self);
2150     PAD_LOCK (aggpad);
2151     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2152       GST_DEBUG_OBJECT (self,
2153           "not negotiated yet, can't answer ALLOCATION query");
2154       PAD_UNLOCK (aggpad);
2155       GST_OBJECT_UNLOCK (self);
2156
2157       return FALSE;
2158     }
2159
2160     if ((decide_query = self->priv->allocation_query))
2161       gst_query_ref (decide_query);
2162     PAD_UNLOCK (aggpad);
2163     GST_OBJECT_UNLOCK (self);
2164
2165     GST_DEBUG_OBJECT (self,
2166         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2167
2168     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2169
2170     /* pass the query to the propose_allocation vmethod if any */
2171     if (agg_class->propose_allocation)
2172       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2173     else
2174       ret = FALSE;
2175
2176     if (decide_query)
2177       gst_query_unref (decide_query);
2178
2179     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2180     return ret;
2181   }
2182
2183   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2184 }
2185
2186 static void
2187 gst_aggregator_finalize (GObject * object)
2188 {
2189   GstAggregator *self = (GstAggregator *) object;
2190
2191   g_mutex_clear (&self->priv->src_lock);
2192   g_cond_clear (&self->priv->src_cond);
2193
2194   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2195 }
2196
2197 /*
2198  * gst_aggregator_set_latency_property:
2199  * @agg: a #GstAggregator
2200  * @latency: the new latency value (in nanoseconds).
2201  *
2202  * Sets the new latency value to @latency. This value is used to limit the
2203  * amount of time a pad waits for data to appear before considering the pad
2204  * as unresponsive.
2205  */
2206 static void
2207 gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
2208 {
2209   gboolean changed;
2210
2211   g_return_if_fail (GST_IS_AGGREGATOR (self));
2212   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2213
2214   SRC_LOCK (self);
2215   changed = (self->priv->latency != latency);
2216
2217   if (changed) {
2218     GList *item;
2219
2220     GST_OBJECT_LOCK (self);
2221     /* First lock all the pads */
2222     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2223       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2224       PAD_LOCK (aggpad);
2225     }
2226
2227     self->priv->latency = latency;
2228
2229     SRC_BROADCAST (self);
2230
2231     /* Now wake up the pads */
2232     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2233       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2234       PAD_BROADCAST_EVENT (aggpad);
2235       PAD_UNLOCK (aggpad);
2236     }
2237     GST_OBJECT_UNLOCK (self);
2238   }
2239
2240   SRC_UNLOCK (self);
2241
2242   if (changed)
2243     gst_element_post_message (GST_ELEMENT_CAST (self),
2244         gst_message_new_latency (GST_OBJECT_CAST (self)));
2245 }
2246
2247 /*
2248  * gst_aggregator_get_latency_property:
2249  * @agg: a #GstAggregator
2250  *
2251  * Gets the latency value. See gst_aggregator_set_latency for
2252  * more details.
2253  *
2254  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2255  * before a pad is deemed unresponsive. A value of -1 means an
2256  * unlimited time.
2257  */
2258 static GstClockTime
2259 gst_aggregator_get_latency_property (GstAggregator * agg)
2260 {
2261   GstClockTime res;
2262
2263   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
2264
2265   GST_OBJECT_LOCK (agg);
2266   res = agg->priv->latency;
2267   GST_OBJECT_UNLOCK (agg);
2268
2269   return res;
2270 }
2271
2272 static void
2273 gst_aggregator_set_property (GObject * object, guint prop_id,
2274     const GValue * value, GParamSpec * pspec)
2275 {
2276   GstAggregator *agg = GST_AGGREGATOR (object);
2277
2278   switch (prop_id) {
2279     case PROP_LATENCY:
2280       gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
2281       break;
2282     case PROP_MIN_UPSTREAM_LATENCY:
2283       SRC_LOCK (agg);
2284       agg->priv->upstream_latency_min = g_value_get_uint64 (value);
2285       SRC_UNLOCK (agg);
2286       break;
2287     case PROP_START_TIME_SELECTION:
2288       agg->priv->start_time_selection = g_value_get_enum (value);
2289       break;
2290     case PROP_START_TIME:
2291       agg->priv->start_time = g_value_get_uint64 (value);
2292       break;
2293     default:
2294       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2295       break;
2296   }
2297 }
2298
2299 static void
2300 gst_aggregator_get_property (GObject * object, guint prop_id,
2301     GValue * value, GParamSpec * pspec)
2302 {
2303   GstAggregator *agg = GST_AGGREGATOR (object);
2304
2305   switch (prop_id) {
2306     case PROP_LATENCY:
2307       g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
2308       break;
2309     case PROP_MIN_UPSTREAM_LATENCY:
2310       SRC_LOCK (agg);
2311       g_value_set_uint64 (value, agg->priv->upstream_latency_min);
2312       SRC_UNLOCK (agg);
2313       break;
2314     case PROP_START_TIME_SELECTION:
2315       g_value_set_enum (value, agg->priv->start_time_selection);
2316       break;
2317     case PROP_START_TIME:
2318       g_value_set_uint64 (value, agg->priv->start_time);
2319       break;
2320     default:
2321       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2322       break;
2323   }
2324 }
2325
2326 /* GObject vmethods implementations */
2327 static void
2328 gst_aggregator_class_init (GstAggregatorClass * klass)
2329 {
2330   GObjectClass *gobject_class = (GObjectClass *) klass;
2331   GstElementClass *gstelement_class = (GstElementClass *) klass;
2332
2333   aggregator_parent_class = g_type_class_peek_parent (klass);
2334
2335   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2336       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2337
2338   if (aggregator_private_offset != 0)
2339     g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
2340
2341   klass->finish_buffer = gst_aggregator_default_finish_buffer;
2342
2343   klass->sink_event = gst_aggregator_default_sink_event;
2344   klass->sink_query = gst_aggregator_default_sink_query;
2345
2346   klass->src_event = gst_aggregator_default_src_event;
2347   klass->src_query = gst_aggregator_default_src_query;
2348
2349   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2350   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2351   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2352   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2353
2354   gstelement_class->request_new_pad =
2355       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2356   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2357   gstelement_class->release_pad =
2358       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2359   gstelement_class->change_state =
2360       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2361
2362   gobject_class->set_property = gst_aggregator_set_property;
2363   gobject_class->get_property = gst_aggregator_get_property;
2364   gobject_class->finalize = gst_aggregator_finalize;
2365
2366   g_object_class_install_property (gobject_class, PROP_LATENCY,
2367       g_param_spec_uint64 ("latency", "Buffer latency",
2368           "Additional latency in live mode to allow upstream "
2369           "to take longer to produce buffers for the current "
2370           "position (in nanoseconds)", 0, G_MAXUINT64,
2371           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2372
2373   g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
2374       g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
2375           "When sources with a higher latency are expected to be plugged "
2376           "in dynamically after the aggregator has started playing, "
2377           "this allows overriding the minimum latency reported by the "
2378           "initial source(s). This is only taken into account when superior "
2379           "to the reported minimum latency.",
2380           0, G_MAXUINT64,
2381           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2382
2383   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2384       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2385           "Decides which start time is output",
2386           gst_aggregator_start_time_selection_get_type (),
2387           DEFAULT_START_TIME_SELECTION,
2388           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2389
2390   g_object_class_install_property (gobject_class, PROP_START_TIME,
2391       g_param_spec_uint64 ("start-time", "Start Time",
2392           "Start time to use if start-time-selection=set", 0,
2393           G_MAXUINT64,
2394           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2395 }
2396
2397 static inline gpointer
2398 gst_aggregator_get_instance_private (GstAggregator * self)
2399 {
2400   return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
2401 }
2402
2403 static void
2404 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2405 {
2406   GstPadTemplate *pad_template;
2407   GstAggregatorPrivate *priv;
2408
2409   g_return_if_fail (klass->aggregate != NULL);
2410
2411   self->priv = gst_aggregator_get_instance_private (self);
2412
2413   priv = self->priv;
2414
2415   pad_template =
2416       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2417   g_return_if_fail (pad_template != NULL);
2418
2419   priv->max_padserial = -1;
2420   priv->tags_changed = FALSE;
2421
2422   self->priv->peer_latency_live = FALSE;
2423   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2424   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2425   self->priv->has_peer_latency = FALSE;
2426
2427   self->srcpad = gst_pad_new_from_template (pad_template, "src");
2428
2429   gst_aggregator_reset_flow_values (self);
2430
2431   gst_pad_set_event_function (self->srcpad,
2432       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2433   gst_pad_set_query_function (self->srcpad,
2434       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2435   gst_pad_set_activatemode_function (self->srcpad,
2436       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2437
2438   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2439
2440   self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
2441   self->priv->latency = DEFAULT_LATENCY;
2442   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2443   self->priv->start_time = DEFAULT_START_TIME;
2444
2445   g_mutex_init (&self->priv->src_lock);
2446   g_cond_init (&self->priv->src_cond);
2447 }
2448
2449 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2450  * method to get to the padtemplates */
2451 GType
2452 gst_aggregator_get_type (void)
2453 {
2454   static volatile gsize type = 0;
2455
2456   if (g_once_init_enter (&type)) {
2457     GType _type;
2458     static const GTypeInfo info = {
2459       sizeof (GstAggregatorClass),
2460       NULL,
2461       NULL,
2462       (GClassInitFunc) gst_aggregator_class_init,
2463       NULL,
2464       NULL,
2465       sizeof (GstAggregator),
2466       0,
2467       (GInstanceInitFunc) gst_aggregator_init,
2468     };
2469
2470     _type = g_type_register_static (GST_TYPE_ELEMENT,
2471         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2472
2473     aggregator_private_offset =
2474         g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
2475
2476     g_once_init_leave (&type, _type);
2477   }
2478   return type;
2479 }
2480
2481 /* Must be called with SRC lock and PAD lock held */
2482 static gboolean
2483 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2484 {
2485   /* Empty queue always has space */
2486   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2487     return TRUE;
2488
2489   /* We also want at least two buffers, one is being processed and one is ready
2490    * for the next iteration when we operate in live mode. */
2491   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2492     return TRUE;
2493
2494   /* zero latency, if there is a buffer, it's full */
2495   if (self->priv->latency == 0)
2496     return FALSE;
2497
2498   /* Allow no more buffers than the latency */
2499   return (aggpad->priv->time_level <= self->priv->latency);
2500 }
2501
2502 /* Must be called with the PAD_LOCK held */
2503 static void
2504 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2505 {
2506   GstClockTime timestamp;
2507
2508   if (GST_BUFFER_DTS_IS_VALID (buffer))
2509     timestamp = GST_BUFFER_DTS (buffer);
2510   else
2511     timestamp = GST_BUFFER_PTS (buffer);
2512
2513   if (timestamp == GST_CLOCK_TIME_NONE) {
2514     if (head)
2515       timestamp = aggpad->priv->head_position;
2516     else
2517       timestamp = aggpad->priv->tail_position;
2518   }
2519
2520   /* add duration */
2521   if (GST_BUFFER_DURATION_IS_VALID (buffer))
2522     timestamp += GST_BUFFER_DURATION (buffer);
2523
2524   if (head)
2525     aggpad->priv->head_position = timestamp;
2526   else
2527     aggpad->priv->tail_position = timestamp;
2528
2529   update_time_level (aggpad, head);
2530 }
2531
2532 /*
2533  * Can be called either from the sinkpad's chain function or from the srcpad's
2534  * thread in the case of a buffer synthetized from a GAP event.
2535  * Because of this second case, FLUSH_LOCK can't be used here.
2536  */
2537
2538 static GstFlowReturn
2539 gst_aggregator_pad_chain_internal (GstAggregator * self,
2540     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2541 {
2542   GstFlowReturn flow_return;
2543   GstClockTime buf_pts;
2544
2545   PAD_LOCK (aggpad);
2546   flow_return = aggpad->priv->flow_return;
2547   if (flow_return != GST_FLOW_OK)
2548     goto flushing;
2549
2550   PAD_UNLOCK (aggpad);
2551
2552   buf_pts = GST_BUFFER_PTS (buffer);
2553
2554   for (;;) {
2555     SRC_LOCK (self);
2556     GST_OBJECT_LOCK (self);
2557     PAD_LOCK (aggpad);
2558
2559     if (aggpad->priv->first_buffer) {
2560       self->priv->has_peer_latency = FALSE;
2561       aggpad->priv->first_buffer = FALSE;
2562     }
2563
2564     if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
2565         && aggpad->priv->flow_return == GST_FLOW_OK) {
2566       if (head)
2567         g_queue_push_head (&aggpad->priv->data, buffer);
2568       else
2569         g_queue_push_tail (&aggpad->priv->data, buffer);
2570       apply_buffer (aggpad, buffer, head);
2571       aggpad->priv->num_buffers++;
2572       buffer = NULL;
2573       SRC_BROADCAST (self);
2574       break;
2575     }
2576
2577     flow_return = aggpad->priv->flow_return;
2578     if (flow_return != GST_FLOW_OK) {
2579       GST_OBJECT_UNLOCK (self);
2580       SRC_UNLOCK (self);
2581       goto flushing;
2582     }
2583     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2584     GST_OBJECT_UNLOCK (self);
2585     SRC_UNLOCK (self);
2586     PAD_WAIT_EVENT (aggpad);
2587
2588     PAD_UNLOCK (aggpad);
2589   }
2590
2591   if (self->priv->first_buffer) {
2592     GstClockTime start_time;
2593     GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
2594
2595     switch (self->priv->start_time_selection) {
2596       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2597       default:
2598         start_time = 0;
2599         break;
2600       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2601         GST_OBJECT_LOCK (aggpad);
2602         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2603           start_time = buf_pts;
2604           if (start_time != -1) {
2605             start_time = MAX (start_time, aggpad->priv->head_segment.start);
2606             start_time =
2607                 gst_segment_to_running_time (&aggpad->priv->head_segment,
2608                 GST_FORMAT_TIME, start_time);
2609           }
2610         } else {
2611           start_time = 0;
2612           GST_WARNING_OBJECT (aggpad,
2613               "Ignoring request of selecting the first start time "
2614               "as the segment is a %s segment instead of a time segment",
2615               gst_format_get_name (aggpad->segment.format));
2616         }
2617         GST_OBJECT_UNLOCK (aggpad);
2618         break;
2619       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2620         start_time = self->priv->start_time;
2621         if (start_time == -1)
2622           start_time = 0;
2623         break;
2624     }
2625
2626     if (start_time != -1) {
2627       if (srcpad->segment.position == -1)
2628         srcpad->segment.position = start_time;
2629       else
2630         srcpad->segment.position = MIN (start_time, srcpad->segment.position);
2631
2632       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2633           GST_TIME_ARGS (start_time));
2634     }
2635   }
2636
2637   PAD_UNLOCK (aggpad);
2638   GST_OBJECT_UNLOCK (self);
2639   SRC_UNLOCK (self);
2640
2641   GST_DEBUG_OBJECT (aggpad, "Done chaining");
2642
2643   return flow_return;
2644
2645 flushing:
2646   PAD_UNLOCK (aggpad);
2647
2648   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2649       gst_flow_get_name (flow_return));
2650   if (buffer)
2651     gst_buffer_unref (buffer);
2652
2653   return flow_return;
2654 }
2655
2656 static GstFlowReturn
2657 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2658 {
2659   GstFlowReturn ret;
2660   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2661
2662   PAD_FLUSH_LOCK (aggpad);
2663
2664   ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2665       aggpad, buffer, TRUE);
2666
2667   PAD_FLUSH_UNLOCK (aggpad);
2668
2669   return ret;
2670 }
2671
2672 static gboolean
2673 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2674     GstQuery * query)
2675 {
2676   GstAggregator *self = GST_AGGREGATOR (parent);
2677   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2678
2679   if (GST_QUERY_IS_SERIALIZED (query)) {
2680     GstStructure *s;
2681     gboolean ret = FALSE;
2682
2683     SRC_LOCK (self);
2684     PAD_LOCK (aggpad);
2685
2686     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2687       SRC_UNLOCK (self);
2688       goto flushing;
2689     }
2690
2691     g_queue_push_head (&aggpad->priv->data, query);
2692     SRC_BROADCAST (self);
2693     SRC_UNLOCK (self);
2694
2695     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2696         && aggpad->priv->flow_return == GST_FLOW_OK) {
2697       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2698       PAD_WAIT_EVENT (aggpad);
2699     }
2700
2701     s = gst_query_writable_structure (query);
2702     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2703       gst_structure_remove_field (s, "gst-aggregator-retval");
2704     else
2705       g_queue_remove (&aggpad->priv->data, query);
2706
2707     if (aggpad->priv->flow_return != GST_FLOW_OK)
2708       goto flushing;
2709
2710     PAD_UNLOCK (aggpad);
2711
2712     return ret;
2713   } else {
2714     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2715
2716     return klass->sink_query (self, aggpad, query);
2717   }
2718
2719 flushing:
2720   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2721       gst_flow_get_name (aggpad->priv->flow_return));
2722   PAD_UNLOCK (aggpad);
2723
2724   return FALSE;
2725 }
2726
2727 /* Queue serialized events and let the others go through directly.
2728  * The queued events with be handled from the src-pad task in
2729  * gst_aggregator_do_events_and_queries().
2730  */
2731 static GstFlowReturn
2732 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2733     GstEvent * event)
2734 {
2735   GstFlowReturn ret = GST_FLOW_OK;
2736   GstAggregator *self = GST_AGGREGATOR (parent);
2737   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2738
2739   if (GST_EVENT_IS_SERIALIZED (event)
2740       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2741     SRC_LOCK (self);
2742     PAD_LOCK (aggpad);
2743
2744     if (aggpad->priv->flow_return != GST_FLOW_OK)
2745       goto flushing;
2746
2747     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2748       GST_OBJECT_LOCK (aggpad);
2749       gst_event_copy_segment (event, &aggpad->priv->head_segment);
2750       aggpad->priv->head_position = aggpad->priv->head_segment.position;
2751       update_time_level (aggpad, TRUE);
2752       GST_OBJECT_UNLOCK (aggpad);
2753     }
2754
2755     GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
2756     g_queue_push_head (&aggpad->priv->data, event);
2757     SRC_BROADCAST (self);
2758     PAD_UNLOCK (aggpad);
2759     SRC_UNLOCK (self);
2760   } else {
2761     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2762
2763     if (!klass->sink_event (self, aggpad, event)) {
2764       /* Copied from GstPad to convert boolean to a GstFlowReturn in
2765        * the event handling func */
2766       ret = GST_FLOW_ERROR;
2767     }
2768   }
2769
2770   return ret;
2771
2772 flushing:
2773   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2774       gst_flow_get_name (aggpad->priv->flow_return));
2775   PAD_UNLOCK (aggpad);
2776   SRC_UNLOCK (self);
2777   if (GST_EVENT_IS_STICKY (event))
2778     gst_pad_store_sticky_event (pad, event);
2779   gst_event_unref (event);
2780
2781   return aggpad->priv->flow_return;
2782 }
2783
2784 static gboolean
2785 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2786     GstObject * parent, GstPadMode mode, gboolean active)
2787 {
2788   GstAggregator *self = GST_AGGREGATOR (parent);
2789   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2790
2791   if (active == FALSE) {
2792     SRC_LOCK (self);
2793     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2794     SRC_BROADCAST (self);
2795     SRC_UNLOCK (self);
2796   } else {
2797     PAD_LOCK (aggpad);
2798     aggpad->priv->flow_return = GST_FLOW_OK;
2799     PAD_BROADCAST_EVENT (aggpad);
2800     PAD_UNLOCK (aggpad);
2801   }
2802
2803   return TRUE;
2804 }
2805
2806 /***********************************
2807  * GstAggregatorPad implementation  *
2808  ************************************/
2809 G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2810
2811 static void
2812 gst_aggregator_pad_constructed (GObject * object)
2813 {
2814   GstPad *pad = GST_PAD (object);
2815
2816   if (GST_PAD_IS_SINK (pad)) {
2817     gst_pad_set_chain_function (pad,
2818         GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2819     gst_pad_set_event_full_function_full (pad,
2820         GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2821     gst_pad_set_query_function (pad,
2822         GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2823     gst_pad_set_activatemode_function (pad,
2824         GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2825   }
2826 }
2827
2828 static void
2829 gst_aggregator_pad_finalize (GObject * object)
2830 {
2831   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2832
2833   g_cond_clear (&pad->priv->event_cond);
2834   g_mutex_clear (&pad->priv->flush_lock);
2835   g_mutex_clear (&pad->priv->lock);
2836
2837   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2838 }
2839
2840 static void
2841 gst_aggregator_pad_dispose (GObject * object)
2842 {
2843   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2844
2845   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2846
2847   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2848 }
2849
2850 static void
2851 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2852 {
2853   GObjectClass *gobject_class = (GObjectClass *) klass;
2854
2855   gobject_class->constructed = gst_aggregator_pad_constructed;
2856   gobject_class->finalize = gst_aggregator_pad_finalize;
2857   gobject_class->dispose = gst_aggregator_pad_dispose;
2858 }
2859
2860 static void
2861 gst_aggregator_pad_init (GstAggregatorPad * pad)
2862 {
2863   pad->priv = gst_aggregator_pad_get_instance_private (pad);
2864
2865   g_queue_init (&pad->priv->data);
2866   g_cond_init (&pad->priv->event_cond);
2867
2868   g_mutex_init (&pad->priv->flush_lock);
2869   g_mutex_init (&pad->priv->lock);
2870
2871   gst_aggregator_pad_reset_unlocked (pad);
2872   pad->priv->negotiated = FALSE;
2873 }
2874
2875 /* Must be called with the PAD_LOCK held */
2876 static void
2877 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
2878 {
2879   pad->priv->num_buffers--;
2880   GST_TRACE_OBJECT (pad, "Consuming buffer");
2881   PAD_BROADCAST_EVENT (pad);
2882 }
2883
2884 /* Must be called with the PAD_LOCK held */
2885 static void
2886 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
2887 {
2888   GstAggregator *self = NULL;
2889   GstAggregatorClass *aggclass = NULL;
2890   GstBuffer *buffer = NULL;
2891
2892   while (pad->priv->clipped_buffer == NULL &&
2893       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
2894     buffer = g_queue_pop_tail (&pad->priv->data);
2895
2896     apply_buffer (pad, buffer, FALSE);
2897
2898     /* We only take the parent here so that it's not taken if the buffer is
2899      * already clipped or if the queue is empty.
2900      */
2901     if (self == NULL) {
2902       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
2903       if (self == NULL) {
2904         gst_buffer_unref (buffer);
2905         return;
2906       }
2907
2908       aggclass = GST_AGGREGATOR_GET_CLASS (self);
2909     }
2910
2911     if (aggclass->clip) {
2912       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
2913
2914       buffer = aggclass->clip (self, pad, buffer);
2915
2916       if (buffer == NULL) {
2917         gst_aggregator_pad_buffer_consumed (pad);
2918         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
2919       }
2920     }
2921
2922     pad->priv->clipped_buffer = buffer;
2923   }
2924
2925   if (self)
2926     gst_object_unref (self);
2927 }
2928
2929 /**
2930  * gst_aggregator_pad_pop_buffer:
2931  * @pad: the pad to get buffer from
2932  *
2933  * Steal the ref to the buffer currently queued in @pad.
2934  *
2935  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2936  *   queued. You should unref the buffer after usage.
2937  */
2938 GstBuffer *
2939 gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
2940 {
2941   GstBuffer *buffer;
2942
2943   PAD_LOCK (pad);
2944
2945   if (pad->priv->flow_return != GST_FLOW_OK) {
2946     PAD_UNLOCK (pad);
2947     return NULL;
2948   }
2949
2950   gst_aggregator_pad_clip_buffer_unlocked (pad);
2951
2952   buffer = pad->priv->clipped_buffer;
2953
2954   if (buffer) {
2955     pad->priv->clipped_buffer = NULL;
2956     gst_aggregator_pad_buffer_consumed (pad);
2957     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2958   }
2959
2960   PAD_UNLOCK (pad);
2961
2962   return buffer;
2963 }
2964
2965 /**
2966  * gst_aggregator_pad_drop_buffer:
2967  * @pad: the pad where to drop any pending buffer
2968  *
2969  * Drop the buffer currently queued in @pad.
2970  *
2971  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2972  */
2973 gboolean
2974 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2975 {
2976   GstBuffer *buf;
2977
2978   buf = gst_aggregator_pad_pop_buffer (pad);
2979
2980   if (buf == NULL)
2981     return FALSE;
2982
2983   gst_buffer_unref (buf);
2984   return TRUE;
2985 }
2986
2987 /**
2988  * gst_aggregator_pad_peek_buffer:
2989  * @pad: the pad to get buffer from
2990  *
2991  * Returns: (transfer full): A reference to the buffer in @pad or
2992  * NULL if no buffer was queued. You should unref the buffer after
2993  * usage.
2994  */
2995 GstBuffer *
2996 gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
2997 {
2998   GstBuffer *buffer;
2999
3000   PAD_LOCK (pad);
3001
3002   if (pad->priv->flow_return != GST_FLOW_OK) {
3003     PAD_UNLOCK (pad);
3004     return NULL;
3005   }
3006
3007   gst_aggregator_pad_clip_buffer_unlocked (pad);
3008
3009   if (pad->priv->clipped_buffer) {
3010     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3011   } else {
3012     buffer = NULL;
3013   }
3014   PAD_UNLOCK (pad);
3015
3016   return buffer;
3017 }
3018
3019 /**
3020  * gst_aggregator_pad_has_buffer:
3021  * @pad: the pad to check the buffer on
3022  *
3023  * This checks if a pad has a buffer available that will be returned by
3024  * a call to gst_aggregator_pad_peek_buffer() or
3025  * gst_aggregator_pad_pop_buffer().
3026  *
3027  * Returns: %TRUE if the pad has a buffer available as the next thing.
3028  *
3029  * Since: 1.14.1
3030  */
3031 gboolean
3032 gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
3033 {
3034   gboolean has_buffer;
3035
3036   PAD_LOCK (pad);
3037   gst_aggregator_pad_clip_buffer_unlocked (pad);
3038   has_buffer = (pad->priv->clipped_buffer != NULL);
3039   PAD_UNLOCK (pad);
3040
3041   return has_buffer;
3042 }
3043
3044 /**
3045  * gst_aggregator_pad_is_eos:
3046  * @pad: an aggregator pad
3047  *
3048  * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
3049  */
3050 gboolean
3051 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
3052 {
3053   gboolean is_eos;
3054
3055   PAD_LOCK (pad);
3056   is_eos = pad->priv->eos;
3057   PAD_UNLOCK (pad);
3058
3059   return is_eos;
3060 }
3061
3062 #if 0
3063 /*
3064  * gst_aggregator_merge_tags:
3065  * @self: a #GstAggregator
3066  * @tags: a #GstTagList to merge
3067  * @mode: the #GstTagMergeMode to use
3068  *
3069  * Adds tags to so-called pending tags, which will be processed
3070  * before pushing out data downstream.
3071  *
3072  * Note that this is provided for convenience, and the subclass is
3073  * not required to use this and can still do tag handling on its own.
3074  *
3075  * MT safe.
3076  */
3077 void
3078 gst_aggregator_merge_tags (GstAggregator * self,
3079     const GstTagList * tags, GstTagMergeMode mode)
3080 {
3081   GstTagList *otags;
3082
3083   g_return_if_fail (GST_IS_AGGREGATOR (self));
3084   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3085
3086   /* FIXME Check if we can use OBJECT lock here! */
3087   GST_OBJECT_LOCK (self);
3088   if (tags)
3089     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3090   otags = self->priv->tags;
3091   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3092   if (otags)
3093     gst_tag_list_unref (otags);
3094   self->priv->tags_changed = TRUE;
3095   GST_OBJECT_UNLOCK (self);
3096 }
3097 #endif
3098
3099 /**
3100  * gst_aggregator_set_latency:
3101  * @self: a #GstAggregator
3102  * @min_latency: minimum latency
3103  * @max_latency: maximum latency
3104  *
3105  * Lets #GstAggregator sub-classes tell the baseclass what their internal
3106  * latency is. Will also post a LATENCY message on the bus so the pipeline
3107  * can reconfigure its global latency.
3108  */
3109 void
3110 gst_aggregator_set_latency (GstAggregator * self,
3111     GstClockTime min_latency, GstClockTime max_latency)
3112 {
3113   gboolean changed = FALSE;
3114
3115   g_return_if_fail (GST_IS_AGGREGATOR (self));
3116   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3117   g_return_if_fail (max_latency >= min_latency);
3118
3119   SRC_LOCK (self);
3120   if (self->priv->sub_latency_min != min_latency) {
3121     self->priv->sub_latency_min = min_latency;
3122     changed = TRUE;
3123   }
3124   if (self->priv->sub_latency_max != max_latency) {
3125     self->priv->sub_latency_max = max_latency;
3126     changed = TRUE;
3127   }
3128
3129   if (changed)
3130     SRC_BROADCAST (self);
3131   SRC_UNLOCK (self);
3132
3133   if (changed) {
3134     gst_element_post_message (GST_ELEMENT_CAST (self),
3135         gst_message_new_latency (GST_OBJECT_CAST (self)));
3136   }
3137 }
3138
3139 /**
3140  * gst_aggregator_get_buffer_pool:
3141  * @self: a #GstAggregator
3142  *
3143  * Returns: (transfer full): the instance of the #GstBufferPool used
3144  * by @trans; free it after use it
3145  */
3146 GstBufferPool *
3147 gst_aggregator_get_buffer_pool (GstAggregator * self)
3148 {
3149   GstBufferPool *pool;
3150
3151   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3152
3153   GST_OBJECT_LOCK (self);
3154   pool = self->priv->pool;
3155   if (pool)
3156     gst_object_ref (pool);
3157   GST_OBJECT_UNLOCK (self);
3158
3159   return pool;
3160 }
3161
3162 /**
3163  * gst_aggregator_get_allocator:
3164  * @self: a #GstAggregator
3165  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3166  * used
3167  * @params: (out) (allow-none) (transfer full): the
3168  * #GstAllocationParams of @allocator
3169  *
3170  * Lets #GstAggregator sub-classes get the memory @allocator
3171  * acquired by the base class and its @params.
3172  *
3173  * Unref the @allocator after use it.
3174  */
3175 void
3176 gst_aggregator_get_allocator (GstAggregator * self,
3177     GstAllocator ** allocator, GstAllocationParams * params)
3178 {
3179   g_return_if_fail (GST_IS_AGGREGATOR (self));
3180
3181   if (allocator)
3182     *allocator = self->priv->allocator ?
3183         gst_object_ref (self->priv->allocator) : NULL;
3184
3185   if (params)
3186     *params = self->priv->allocation_params;
3187 }
3188
3189 /**
3190  * gst_aggregator_simple_get_next_time:
3191  * @self: A #GstAggregator
3192  *
3193  * This is a simple #GstAggregator::get_next_time implementation that
3194  * just looks at the #GstSegment on the srcpad of the aggregator and bases
3195  * the next time on the running there there.
3196  *
3197  * This is the desired behaviour in most cases where you have a live source
3198  * and you have a dead line based aggregator subclass.
3199  *
3200  * Returns: The running time based on the position
3201  *
3202  * Since: 1.16
3203  */
3204 GstClockTime
3205 gst_aggregator_simple_get_next_time (GstAggregator * self)
3206 {
3207   GstClockTime next_time;
3208   GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3209   GstSegment *segment = &srcpad->segment;
3210
3211   GST_OBJECT_LOCK (self);
3212   if (segment->position == -1 || segment->position < segment->start)
3213     next_time = segment->start;
3214   else
3215     next_time = segment->position;
3216
3217   if (segment->stop != -1 && next_time > segment->stop)
3218     next_time = segment->stop;
3219
3220   next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
3221   GST_OBJECT_UNLOCK (self);
3222
3223   return next_time;
3224 }