aggregator: add gtk-doc blurb for new min-upstream-latency prop
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: Base class for mixers and muxers, manages a set of input
26  *     pads and aggregates their streams
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *
35  *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36  *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
37  *    Subclasses should not overwrite those, but instead implement
38  *    #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
39  *    needed.
40  *
41  *  * When data is queued on all pads, the aggregate vmethod is called.
42  *
43  *  * One can peek at the data on any given GstAggregatorPad with the
44  *    gst_aggregator_pad_peek_buffer () method, and remove it from the pad
45  *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
46  *    has been taken with pop_buffer (), a new buffer can be queued
47  *    on that pad.
48  *
49  *  * If the subclass wishes to push a buffer downstream in its aggregate
50  *    implementation, it should do so through the
51  *    gst_aggregator_finish_buffer () method. This method will take care
52  *    of sending and ordering mandatory events such as stream start, caps
53  *    and segment.
54  *
55  *  * Same goes for EOS events, which should not be pushed directly by the
56  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
57  *    implementation.
58  *
59  *  * Note that the aggregator logic regarding gap event handling is to turn
60  *    these into gap buffers with matching PTS and duration. It will also
61  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
62  *    to ease their identification and subsequent processing.
63  *
64  *  * Subclasses must use (a subclass of) #GstAggregatorPad for both their
65  *    sink and source pads.
66  *    See gst_element_class_add_static_pad_template_with_gtype().
67  *
68  * This class used to live in gst-plugins-bad and was moved to core.
69  *
70  * Since: 1.14
71  */
72
73 /**
74  * SECTION: gstaggregatorpad
75  * @title: GstAggregatorPad
76  * @short_description: #GstPad subclass for pads managed by #GstAggregator
77  * @see_also: gstcollectpads for historical reasons.
78  *
79  * Pads managed by a #GstAggregor subclass.
80  *
81  * This class used to live in gst-plugins-bad and was moved to core.
82  *
83  * Since: 1.14
84  */
85
86 #ifdef HAVE_CONFIG_H
87 #  include "config.h"
88 #endif
89
90 #include <string.h>             /* strlen */
91
92 #include "gstaggregator.h"
93
94 typedef enum
95 {
96   GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
97   GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
98   GST_AGGREGATOR_START_TIME_SELECTION_SET
99 } GstAggregatorStartTimeSelection;
100
101 static GType
102 gst_aggregator_start_time_selection_get_type (void)
103 {
104   static GType gtype = 0;
105
106   if (gtype == 0) {
107     static const GEnumValue values[] = {
108       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
109           "Start at 0 running time (default)", "zero"},
110       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
111           "Start at first observed input running time", "first"},
112       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
113           "Set start time with start-time property", "set"},
114       {0, NULL, NULL}
115     };
116
117     gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
118   }
119   return gtype;
120 }
121
122 /*  Might become API */
123 #if 0
124 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
125     const GstTagList * tags, GstTagMergeMode mode);
126 #endif
127 static void gst_aggregator_set_latency_property (GstAggregator * agg,
128     GstClockTime latency);
129 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
130
131 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
132
133 static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
134
135 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
136 #define GST_CAT_DEFAULT aggregator_debug
137
138 /* Locking order, locks in this element must always be taken in this order
139  *
140  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
141  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
142  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
143  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
144  * standard element object lock -> GST_OBJECT_LOCK(agg)
145  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
146  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
147  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
148  */
149
150 /* GstAggregatorPad definitions */
151 #define PAD_LOCK(pad)   G_STMT_START {                                  \
152   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
153         g_thread_self());                                               \
154   g_mutex_lock(&pad->priv->lock);                                       \
155   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
156         g_thread_self());                                               \
157   } G_STMT_END
158
159 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
160   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
161       g_thread_self());                                                 \
162   g_mutex_unlock(&pad->priv->lock);                                     \
163   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
164         g_thread_self());                                               \
165   } G_STMT_END
166
167
168 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
169   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
170         g_thread_self());                                               \
171   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
172       (&((GstAggregatorPad*)pad)->priv->lock));                         \
173   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
174         g_thread_self());                                               \
175   } G_STMT_END
176
177 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
178   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
179         g_thread_self());                                              \
180   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
181   } G_STMT_END
182
183
184 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
185   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
186         g_thread_self());                                               \
187   g_mutex_lock(&pad->priv->flush_lock);                                 \
188   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
189         g_thread_self());                                               \
190   } G_STMT_END
191
192 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
193   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
194         g_thread_self());                                               \
195   g_mutex_unlock(&pad->priv->flush_lock);                               \
196   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
197         g_thread_self());                                               \
198   } G_STMT_END
199
200 #define SRC_LOCK(self)   G_STMT_START {                             \
201   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
202       g_thread_self());                                             \
203   g_mutex_lock(&self->priv->src_lock);                              \
204   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
205         g_thread_self());                                           \
206   } G_STMT_END
207
208 #define SRC_UNLOCK(self)  G_STMT_START {                            \
209   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
210         g_thread_self());                                           \
211   g_mutex_unlock(&self->priv->src_lock);                            \
212   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
213         g_thread_self());                                           \
214   } G_STMT_END
215
216 #define SRC_WAIT(self) G_STMT_START {                               \
217   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
218         g_thread_self());                                           \
219   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
220   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
221         g_thread_self());                                           \
222   } G_STMT_END
223
224 #define SRC_BROADCAST(self) G_STMT_START {                          \
225     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
226         g_thread_self());                                           \
227     if (self->priv->aggregate_id)                                   \
228       gst_clock_id_unschedule (self->priv->aggregate_id);           \
229     g_cond_broadcast(&(self->priv->src_cond));                      \
230   } G_STMT_END
231
232 struct _GstAggregatorPadPrivate
233 {
234   /* Following fields are protected by the PAD_LOCK */
235   GstFlowReturn flow_return;
236   gboolean pending_flush_start;
237   gboolean pending_flush_stop;
238
239   gboolean first_buffer;
240
241   GQueue data;                  /* buffers, events and queries */
242   GstBuffer *clipped_buffer;
243   guint num_buffers;
244
245   /* used to track fill state of queues, only used with live-src and when
246    * latency property is set to > 0 */
247   GstClockTime head_position;
248   GstClockTime tail_position;
249   GstClockTime head_time;       /* running time */
250   GstClockTime tail_time;
251   GstClockTime time_level;      /* how much head is ahead of tail */
252   GstSegment head_segment;      /* segment before the queue */
253
254   gboolean negotiated;
255
256   gboolean eos;
257
258   GMutex lock;
259   GCond event_cond;
260   /* This lock prevents a flush start processing happening while
261    * the chain function is also happening.
262    */
263   GMutex flush_lock;
264 };
265
266 /* Must be called with PAD_LOCK held */
267 static void
268 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
269 {
270   aggpad->priv->eos = FALSE;
271   aggpad->priv->flow_return = GST_FLOW_OK;
272   GST_OBJECT_LOCK (aggpad);
273   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
274   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
275   GST_OBJECT_UNLOCK (aggpad);
276   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
277   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
278   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
279   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
280   aggpad->priv->time_level = 0;
281   aggpad->priv->first_buffer = TRUE;
282 }
283
284 static gboolean
285 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
286 {
287   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
288
289   PAD_LOCK (aggpad);
290   gst_aggregator_pad_reset_unlocked (aggpad);
291   PAD_UNLOCK (aggpad);
292
293   if (klass->flush)
294     return klass->flush (aggpad, agg);
295
296   return TRUE;
297 }
298
299 /*************************************
300  * GstAggregator implementation  *
301  *************************************/
302 static GstElementClass *aggregator_parent_class = NULL;
303 static gint aggregator_private_offset = 0;
304
305 /* All members are protected by the object lock unless otherwise noted */
306
307 struct _GstAggregatorPrivate
308 {
309   gint max_padserial;
310
311   /* Our state is >= PAUSED */
312   gboolean running;             /* protected by src_lock */
313
314   /* seqnum from seek or segment,
315    * to be applied to synthetic segment/eos events */
316   gint seqnum;
317   gboolean send_stream_start;   /* protected by srcpad stream lock */
318   gboolean send_segment;
319   gboolean flush_seeking;
320   gboolean pending_flush_start;
321   gboolean send_eos;            /* protected by srcpad stream lock */
322
323   GstCaps *srccaps;             /* protected by the srcpad stream lock */
324
325   GstTagList *tags;
326   gboolean tags_changed;
327
328   gboolean peer_latency_live;   /* protected by src_lock */
329   GstClockTime peer_latency_min;        /* protected by src_lock */
330   GstClockTime peer_latency_max;        /* protected by src_lock */
331   gboolean has_peer_latency;    /* protected by src_lock */
332
333   GstClockTime sub_latency_min; /* protected by src_lock */
334   GstClockTime sub_latency_max; /* protected by src_lock */
335
336   GstClockTime upstream_latency_min;    /* protected by src_lock */
337
338   /* aggregate */
339   GstClockID aggregate_id;      /* protected by src_lock */
340   GMutex src_lock;
341   GCond src_cond;
342
343   gboolean first_buffer;        /* protected by object lock */
344   GstAggregatorStartTimeSelection start_time_selection;
345   GstClockTime start_time;
346
347   /* protected by the object lock */
348   GstQuery *allocation_query;
349   GstAllocator *allocator;
350   GstBufferPool *pool;
351   GstAllocationParams allocation_params;
352
353   /* properties */
354   gint64 latency;               /* protected by both src_lock and all pad locks */
355 };
356
357 /* Seek event forwarding helper */
358 typedef struct
359 {
360   /* parameters */
361   GstEvent *event;
362   gboolean flush;
363   gboolean only_to_active_pads;
364
365   /* results */
366   gboolean result;
367   gboolean one_actually_seeked;
368 } EventData;
369
370 #define DEFAULT_LATENCY              0
371 #define DEFAULT_MIN_UPSTREAM_LATENCY              0
372 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
373 #define DEFAULT_START_TIME           (-1)
374
375 enum
376 {
377   PROP_0,
378   PROP_LATENCY,
379   PROP_MIN_UPSTREAM_LATENCY,
380   PROP_START_TIME_SELECTION,
381   PROP_START_TIME,
382   PROP_LAST
383 };
384
385 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
386     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
387
388 static gboolean
389 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
390 {
391   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
392       pad->priv->clipped_buffer == NULL);
393 }
394
395 static gboolean
396 gst_aggregator_check_pads_ready (GstAggregator * self)
397 {
398   GstAggregatorPad *pad = NULL;
399   GList *l, *sinkpads;
400   gboolean have_buffer = TRUE;
401   gboolean have_event_or_query = FALSE;
402
403   GST_LOG_OBJECT (self, "checking pads");
404
405   GST_OBJECT_LOCK (self);
406
407   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
408   if (sinkpads == NULL)
409     goto no_sinkpads;
410
411   for (l = sinkpads; l != NULL; l = l->next) {
412     pad = l->data;
413
414     PAD_LOCK (pad);
415
416     if (pad->priv->num_buffers == 0) {
417       if (!gst_aggregator_pad_queue_is_empty (pad))
418         have_event_or_query = TRUE;
419       if (!pad->priv->eos) {
420         have_buffer = FALSE;
421
422         /* If not live we need data on all pads, so leave the loop */
423         if (!self->priv->peer_latency_live) {
424           PAD_UNLOCK (pad);
425           goto pad_not_ready;
426         }
427       }
428     } else if (self->priv->peer_latency_live) {
429       /* In live mode, having a single pad with buffers is enough to
430        * generate a start time from it. In non-live mode all pads need
431        * to have a buffer
432        */
433       self->priv->first_buffer = FALSE;
434     }
435
436     PAD_UNLOCK (pad);
437   }
438
439   if (!have_buffer && !have_event_or_query)
440     goto pad_not_ready;
441
442   if (have_buffer)
443     self->priv->first_buffer = FALSE;
444
445   GST_OBJECT_UNLOCK (self);
446   GST_LOG_OBJECT (self, "pads are ready");
447   return TRUE;
448
449 no_sinkpads:
450   {
451     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
452     GST_OBJECT_UNLOCK (self);
453     return FALSE;
454   }
455 pad_not_ready:
456   {
457     if (have_event_or_query)
458       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
459           " but waking up for serialized event");
460     else
461       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
462     GST_OBJECT_UNLOCK (self);
463     return have_event_or_query;
464   }
465 }
466
467 static void
468 gst_aggregator_reset_flow_values (GstAggregator * self)
469 {
470   GST_OBJECT_LOCK (self);
471   self->priv->send_stream_start = TRUE;
472   self->priv->send_segment = TRUE;
473   gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
474       GST_FORMAT_TIME);
475   self->priv->first_buffer = TRUE;
476   GST_OBJECT_UNLOCK (self);
477 }
478
479 static inline void
480 gst_aggregator_push_mandatory_events (GstAggregator * self)
481 {
482   GstAggregatorPrivate *priv = self->priv;
483   GstEvent *segment = NULL;
484   GstEvent *tags = NULL;
485
486   if (self->priv->send_stream_start) {
487     gchar s_id[32];
488
489     GST_INFO_OBJECT (self, "pushing stream start");
490     /* stream-start (FIXME: create id based on input ids) */
491     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
492     if (!gst_pad_push_event (GST_PAD (self->srcpad),
493             gst_event_new_stream_start (s_id))) {
494       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
495     }
496     self->priv->send_stream_start = FALSE;
497   }
498
499   if (self->priv->srccaps) {
500
501     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
502         self->priv->srccaps);
503     if (!gst_pad_push_event (GST_PAD (self->srcpad),
504             gst_event_new_caps (self->priv->srccaps))) {
505       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
506     }
507     gst_caps_unref (self->priv->srccaps);
508     self->priv->srccaps = NULL;
509   }
510
511   GST_OBJECT_LOCK (self);
512   if (self->priv->send_segment && !self->priv->flush_seeking) {
513     segment =
514         gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
515
516     if (!self->priv->seqnum)
517       /* This code-path is in preparation to be able to run without a source
518        * connected. Then we won't have a seq-num from a segment event. */
519       self->priv->seqnum = gst_event_get_seqnum (segment);
520     else
521       gst_event_set_seqnum (segment, self->priv->seqnum);
522     self->priv->send_segment = FALSE;
523
524     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
525   }
526
527   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
528     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
529     priv->tags_changed = FALSE;
530   }
531   GST_OBJECT_UNLOCK (self);
532
533   if (segment)
534     gst_pad_push_event (self->srcpad, segment);
535   if (tags)
536     gst_pad_push_event (self->srcpad, tags);
537
538 }
539
540 /**
541  * gst_aggregator_set_src_caps:
542  * @self: The #GstAggregator
543  * @caps: The #GstCaps to set on the src pad.
544  *
545  * Sets the caps to be used on the src pad.
546  */
547 void
548 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
549 {
550   GST_PAD_STREAM_LOCK (self->srcpad);
551   gst_caps_replace (&self->priv->srccaps, caps);
552   gst_aggregator_push_mandatory_events (self);
553   GST_PAD_STREAM_UNLOCK (self->srcpad);
554 }
555
556 static GstFlowReturn
557 gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
558 {
559   gst_aggregator_push_mandatory_events (self);
560
561   GST_OBJECT_LOCK (self);
562   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
563     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
564     GST_OBJECT_UNLOCK (self);
565     return gst_pad_push (self->srcpad, buffer);
566   } else {
567     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
568         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
569     GST_OBJECT_UNLOCK (self);
570     gst_buffer_unref (buffer);
571     return GST_FLOW_OK;
572   }
573 }
574
575 /**
576  * gst_aggregator_finish_buffer:
577  * @aggregator: The #GstAggregator
578  * @buffer: (transfer full): the #GstBuffer to push.
579  *
580  * This method will push the provided output buffer downstream. If needed,
581  * mandatory events such as stream-start, caps, and segment events will be
582  * sent before pushing the buffer.
583  */
584 GstFlowReturn
585 gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
586 {
587   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
588
589   g_assert (klass->finish_buffer != NULL);
590
591   return klass->finish_buffer (aggregator, buffer);
592 }
593
594 static void
595 gst_aggregator_push_eos (GstAggregator * self)
596 {
597   GstEvent *event;
598   gst_aggregator_push_mandatory_events (self);
599
600   event = gst_event_new_eos ();
601
602   GST_OBJECT_LOCK (self);
603   self->priv->send_eos = FALSE;
604   gst_event_set_seqnum (event, self->priv->seqnum);
605   GST_OBJECT_UNLOCK (self);
606
607   gst_pad_push_event (self->srcpad, event);
608 }
609
610 static GstClockTime
611 gst_aggregator_get_next_time (GstAggregator * self)
612 {
613   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
614
615   if (klass->get_next_time)
616     return klass->get_next_time (self);
617
618   return GST_CLOCK_TIME_NONE;
619 }
620
621 static gboolean
622 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
623 {
624   GstClockTime latency;
625   GstClockTime start;
626   gboolean res;
627
628   *timeout = FALSE;
629
630   SRC_LOCK (self);
631
632   latency = gst_aggregator_get_latency_unlocked (self);
633
634   if (gst_aggregator_check_pads_ready (self)) {
635     GST_DEBUG_OBJECT (self, "all pads have data");
636     SRC_UNLOCK (self);
637
638     return TRUE;
639   }
640
641   /* Before waiting, check if we're actually still running */
642   if (!self->priv->running || !self->priv->send_eos) {
643     SRC_UNLOCK (self);
644
645     return FALSE;
646   }
647
648   start = gst_aggregator_get_next_time (self);
649
650   /* If we're not live, or if we use the running time
651    * of the first buffer as start time, we wait until
652    * all pads have buffers.
653    * Otherwise (i.e. if we are live!), we wait on the clock
654    * and if a pad does not have a buffer in time we ignore
655    * that pad.
656    */
657   GST_OBJECT_LOCK (self);
658   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
659       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
660       !GST_CLOCK_TIME_IS_VALID (start) ||
661       (self->priv->first_buffer
662           && self->priv->start_time_selection ==
663           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
664     /* We wake up here when something happened, and below
665      * then check if we're ready now. If we return FALSE,
666      * we will be directly called again.
667      */
668     GST_OBJECT_UNLOCK (self);
669     SRC_WAIT (self);
670   } else {
671     GstClockTime base_time, time;
672     GstClock *clock;
673     GstClockReturn status;
674     GstClockTimeDiff jitter;
675
676     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
677         GST_TIME_ARGS (start));
678
679     base_time = GST_ELEMENT_CAST (self)->base_time;
680     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
681     GST_OBJECT_UNLOCK (self);
682
683     time = base_time + start;
684     time += latency;
685
686     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
687         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
688         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
689         GST_TIME_ARGS (time),
690         GST_TIME_ARGS (base_time),
691         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
692         GST_TIME_ARGS (gst_clock_get_time (clock)));
693
694     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
695     gst_object_unref (clock);
696     SRC_UNLOCK (self);
697
698     jitter = 0;
699     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
700
701     SRC_LOCK (self);
702     if (self->priv->aggregate_id) {
703       gst_clock_id_unref (self->priv->aggregate_id);
704       self->priv->aggregate_id = NULL;
705     }
706
707     GST_DEBUG_OBJECT (self,
708         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
709         status, GST_STIME_ARGS (jitter));
710
711     /* we timed out */
712     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
713       SRC_UNLOCK (self);
714       *timeout = TRUE;
715       return TRUE;
716     }
717   }
718
719   res = gst_aggregator_check_pads_ready (self);
720   SRC_UNLOCK (self);
721
722   return res;
723 }
724
725 typedef struct
726 {
727   gboolean processed_event;
728   GstFlowReturn flow_ret;
729 } DoHandleEventsAndQueriesData;
730
731 static gboolean
732 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
733     gpointer user_data)
734 {
735   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
736   GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
737   GstEvent *event = NULL;
738   GstQuery *query = NULL;
739   GstAggregatorClass *klass = NULL;
740   DoHandleEventsAndQueriesData *data = user_data;
741
742   do {
743     event = NULL;
744     query = NULL;
745
746     PAD_LOCK (pad);
747     if (pad->priv->clipped_buffer == NULL &&
748         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
749       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
750         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
751       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
752         query = g_queue_peek_tail (&pad->priv->data);
753     }
754     PAD_UNLOCK (pad);
755     if (event || query) {
756       gboolean ret;
757
758       data->processed_event = TRUE;
759       if (klass == NULL)
760         klass = GST_AGGREGATOR_GET_CLASS (self);
761
762       if (event) {
763         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
764         gst_event_ref (event);
765         ret = klass->sink_event (aggregator, pad, event);
766
767         PAD_LOCK (pad);
768         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
769           pad->priv->negotiated = ret;
770           if (!ret)
771             pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED;
772         }
773         if (g_queue_peek_tail (&pad->priv->data) == event)
774           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
775         gst_event_unref (event);
776       } else if (query) {
777         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
778         ret = klass->sink_query (aggregator, pad, query);
779
780         PAD_LOCK (pad);
781         if (g_queue_peek_tail (&pad->priv->data) == query) {
782           GstStructure *s;
783
784           s = gst_query_writable_structure (query);
785           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
786               NULL);
787           g_queue_pop_tail (&pad->priv->data);
788         }
789       }
790
791       PAD_BROADCAST_EVENT (pad);
792       PAD_UNLOCK (pad);
793     }
794   } while (event || query);
795
796   return TRUE;
797 }
798
799 static gboolean
800 gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
801     gpointer user_data)
802 {
803   GList *item;
804   GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
805   GstAggregator *agg = (GstAggregator *) self;
806   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
807
808   if (!klass->skip_buffer)
809     return FALSE;
810
811   PAD_LOCK (aggpad);
812
813   item = g_queue_peek_head_link (&aggpad->priv->data);
814   while (item) {
815     GList *next = item->next;
816
817     if (GST_IS_BUFFER (item->data)
818         && klass->skip_buffer (aggpad, agg, item->data)) {
819       GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
820       gst_aggregator_pad_buffer_consumed (aggpad);
821       gst_buffer_unref (item->data);
822       g_queue_delete_link (&aggpad->priv->data, item);
823     } else {
824       break;
825     }
826
827     item = next;
828   }
829
830   PAD_UNLOCK (aggpad);
831
832   return TRUE;
833 }
834
835 static void
836 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
837     GstFlowReturn flow_return, gboolean full)
838 {
839   GList *item;
840
841   PAD_LOCK (aggpad);
842   if (flow_return == GST_FLOW_NOT_LINKED)
843     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
844   else
845     aggpad->priv->flow_return = flow_return;
846
847   item = g_queue_peek_head_link (&aggpad->priv->data);
848   while (item) {
849     GList *next = item->next;
850
851     /* In partial flush, we do like the pad, we get rid of non-sticky events
852      * and EOS/SEGMENT.
853      */
854     if (full || GST_IS_BUFFER (item->data) ||
855         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
856         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
857         !GST_EVENT_IS_STICKY (item->data)) {
858       if (!GST_IS_QUERY (item->data))
859         gst_mini_object_unref (item->data);
860       g_queue_delete_link (&aggpad->priv->data, item);
861     }
862     item = next;
863   }
864   aggpad->priv->num_buffers = 0;
865   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
866
867   PAD_BROADCAST_EVENT (aggpad);
868   PAD_UNLOCK (aggpad);
869 }
870
871 static GstFlowReturn
872 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
873     GstCaps ** ret)
874 {
875   *ret = gst_caps_ref (caps);
876
877   return GST_FLOW_OK;
878 }
879
880 static GstCaps *
881 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
882 {
883   caps = gst_caps_fixate (caps);
884
885   return caps;
886 }
887
888 static gboolean
889 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
890 {
891   return TRUE;
892 }
893
894
895 /* takes ownership of the pool, allocator and query */
896 static gboolean
897 gst_aggregator_set_allocation (GstAggregator * self,
898     GstBufferPool * pool, GstAllocator * allocator,
899     GstAllocationParams * params, GstQuery * query)
900 {
901   GstAllocator *oldalloc;
902   GstBufferPool *oldpool;
903   GstQuery *oldquery;
904
905   GST_DEBUG ("storing allocation query");
906
907   GST_OBJECT_LOCK (self);
908   oldpool = self->priv->pool;
909   self->priv->pool = pool;
910
911   oldalloc = self->priv->allocator;
912   self->priv->allocator = allocator;
913
914   oldquery = self->priv->allocation_query;
915   self->priv->allocation_query = query;
916
917   if (params)
918     self->priv->allocation_params = *params;
919   else
920     gst_allocation_params_init (&self->priv->allocation_params);
921   GST_OBJECT_UNLOCK (self);
922
923   if (oldpool) {
924     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
925     gst_buffer_pool_set_active (oldpool, FALSE);
926     gst_object_unref (oldpool);
927   }
928   if (oldalloc) {
929     gst_object_unref (oldalloc);
930   }
931   if (oldquery) {
932     gst_query_unref (oldquery);
933   }
934   return TRUE;
935 }
936
937
938 static gboolean
939 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
940 {
941   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
942
943   if (aggclass->decide_allocation)
944     if (!aggclass->decide_allocation (self, query))
945       return FALSE;
946
947   return TRUE;
948 }
949
950 static gboolean
951 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
952 {
953   GstQuery *query;
954   gboolean result = TRUE;
955   GstBufferPool *pool = NULL;
956   GstAllocator *allocator;
957   GstAllocationParams params;
958
959   /* find a pool for the negotiated caps now */
960   GST_DEBUG_OBJECT (self, "doing allocation query");
961   query = gst_query_new_allocation (caps, TRUE);
962   if (!gst_pad_peer_query (self->srcpad, query)) {
963     /* not a problem, just debug a little */
964     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
965   }
966
967   GST_DEBUG_OBJECT (self, "calling decide_allocation");
968   result = gst_aggregator_decide_allocation (self, query);
969
970   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
971       query);
972
973   if (!result)
974     goto no_decide_allocation;
975
976   /* we got configuration from our peer or the decide_allocation method,
977    * parse them */
978   if (gst_query_get_n_allocation_params (query) > 0) {
979     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
980   } else {
981     allocator = NULL;
982     gst_allocation_params_init (&params);
983   }
984
985   if (gst_query_get_n_allocation_pools (query) > 0)
986     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
987
988   /* now store */
989   result =
990       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
991
992   return result;
993
994   /* Errors */
995 no_decide_allocation:
996   {
997     GST_WARNING_OBJECT (self, "Failed to decide allocation");
998     gst_query_unref (query);
999
1000     return result;
1001   }
1002
1003 }
1004
1005 /* WITH SRC_LOCK held */
1006 static GstFlowReturn
1007 gst_aggregator_update_src_caps (GstAggregator * self)
1008 {
1009   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1010   GstCaps *downstream_caps, *template_caps, *caps = NULL;
1011   GstFlowReturn ret = GST_FLOW_OK;
1012
1013   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1014   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1015
1016   if (gst_caps_is_empty (downstream_caps)) {
1017     GST_INFO_OBJECT (self, "Downstream caps (%"
1018         GST_PTR_FORMAT ") not compatible with pad template caps (%"
1019         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1020     ret = GST_FLOW_NOT_NEGOTIATED;
1021     goto done;
1022   }
1023
1024   g_assert (agg_klass->update_src_caps);
1025   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1026       downstream_caps);
1027   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1028   if (ret < GST_FLOW_OK) {
1029     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1030     goto done;
1031   }
1032   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1033     ret = GST_FLOW_NOT_NEGOTIATED;
1034     goto done;
1035   }
1036   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1037
1038 #ifdef GST_ENABLE_EXTRA_CHECKS
1039   if (!gst_caps_is_subset (caps, template_caps)) {
1040     GstCaps *intersection;
1041
1042     GST_ERROR_OBJECT (self,
1043         "update_src_caps returned caps %" GST_PTR_FORMAT
1044         " which are not a real subset of the template caps %"
1045         GST_PTR_FORMAT, caps, template_caps);
1046     g_warning ("%s: update_src_caps returned caps which are not a real "
1047         "subset of the filter caps", GST_ELEMENT_NAME (self));
1048
1049     intersection =
1050         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1051     gst_caps_unref (caps);
1052     caps = intersection;
1053   }
1054 #endif
1055
1056   if (gst_caps_is_any (caps)) {
1057     goto done;
1058   }
1059
1060   if (!gst_caps_is_fixed (caps)) {
1061     g_assert (agg_klass->fixate_src_caps);
1062
1063     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1064     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1065       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1066       ret = GST_FLOW_NOT_NEGOTIATED;
1067       goto done;
1068     }
1069     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1070   }
1071
1072   if (agg_klass->negotiated_src_caps) {
1073     if (!agg_klass->negotiated_src_caps (self, caps)) {
1074       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1075       ret = GST_FLOW_NOT_NEGOTIATED;
1076       goto done;
1077     }
1078   }
1079
1080   gst_aggregator_set_src_caps (self, caps);
1081
1082   if (!gst_aggregator_do_allocation (self, caps)) {
1083     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1084     ret = GST_FLOW_NOT_NEGOTIATED;
1085   }
1086
1087 done:
1088   gst_caps_unref (downstream_caps);
1089   gst_caps_unref (template_caps);
1090
1091   if (caps)
1092     gst_caps_unref (caps);
1093
1094   return ret;
1095 }
1096
1097 static void
1098 gst_aggregator_aggregate_func (GstAggregator * self)
1099 {
1100   GstAggregatorPrivate *priv = self->priv;
1101   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1102   gboolean timeout = FALSE;
1103
1104   if (self->priv->running == FALSE) {
1105     GST_DEBUG_OBJECT (self, "Not running anymore");
1106     return;
1107   }
1108
1109   GST_LOG_OBJECT (self, "Checking aggregate");
1110   while (priv->send_eos && priv->running) {
1111     GstFlowReturn flow_return = GST_FLOW_OK;
1112     DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
1113
1114     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1115         gst_aggregator_do_events_and_queries, &events_query_data);
1116
1117     if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1118       goto handle_error;
1119
1120     if (self->priv->peer_latency_live)
1121       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1122           gst_aggregator_pad_skip_buffers, NULL);
1123
1124     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
1125      * the queue */
1126     if (!gst_aggregator_wait_and_check (self, &timeout))
1127       continue;
1128
1129     events_query_data.processed_event = FALSE;
1130     events_query_data.flow_ret = GST_FLOW_OK;
1131     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1132         gst_aggregator_do_events_and_queries, &events_query_data);
1133
1134     if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1135       goto handle_error;
1136
1137     if (events_query_data.processed_event)
1138       continue;
1139
1140     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1141       flow_return = gst_aggregator_update_src_caps (self);
1142       if (flow_return != GST_FLOW_OK)
1143         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1144     }
1145
1146     if (timeout || flow_return >= GST_FLOW_OK) {
1147       GST_TRACE_OBJECT (self, "Actually aggregating!");
1148       flow_return = klass->aggregate (self, timeout);
1149     }
1150
1151     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1152       continue;
1153
1154     GST_OBJECT_LOCK (self);
1155     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
1156       /* We don't want to set the pads to flushing, but we want to
1157        * stop the thread, so just break here */
1158       GST_OBJECT_UNLOCK (self);
1159       break;
1160     }
1161     GST_OBJECT_UNLOCK (self);
1162
1163     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1164       gst_aggregator_push_eos (self);
1165     }
1166
1167   handle_error:
1168     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1169
1170     if (flow_return != GST_FLOW_OK) {
1171       GList *item;
1172
1173       GST_OBJECT_LOCK (self);
1174       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1175         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1176
1177         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1178       }
1179       GST_OBJECT_UNLOCK (self);
1180       break;
1181     }
1182   }
1183
1184   /* Pause the task here, the only ways to get here are:
1185    * 1) We're stopping, in which case the task is stopped anyway
1186    * 2) We got a flow error above, in which case it might take
1187    *    some time to forward the flow return upstream and we
1188    *    would otherwise call the task function over and over
1189    *    again without doing anything
1190    */
1191   gst_pad_pause_task (self->srcpad);
1192 }
1193
1194 static gboolean
1195 gst_aggregator_start (GstAggregator * self)
1196 {
1197   GstAggregatorClass *klass;
1198   gboolean result;
1199
1200   self->priv->send_stream_start = TRUE;
1201   self->priv->send_segment = TRUE;
1202   self->priv->send_eos = TRUE;
1203   self->priv->srccaps = NULL;
1204
1205   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1206
1207   klass = GST_AGGREGATOR_GET_CLASS (self);
1208
1209   if (klass->start)
1210     result = klass->start (self);
1211   else
1212     result = TRUE;
1213
1214   return result;
1215 }
1216
1217 static gboolean
1218 _check_pending_flush_stop (GstAggregatorPad * pad)
1219 {
1220   gboolean res;
1221
1222   PAD_LOCK (pad);
1223   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1224   PAD_UNLOCK (pad);
1225
1226   return res;
1227 }
1228
1229 static gboolean
1230 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1231 {
1232   gboolean res = TRUE;
1233
1234   GST_INFO_OBJECT (self, "%s srcpad task",
1235       flush_start ? "Pausing" : "Stopping");
1236
1237   SRC_LOCK (self);
1238   self->priv->running = FALSE;
1239   SRC_BROADCAST (self);
1240   SRC_UNLOCK (self);
1241
1242   if (flush_start) {
1243     res = gst_pad_push_event (self->srcpad, flush_start);
1244   }
1245
1246   gst_pad_stop_task (self->srcpad);
1247
1248   return res;
1249 }
1250
1251 static void
1252 gst_aggregator_start_srcpad_task (GstAggregator * self)
1253 {
1254   GST_INFO_OBJECT (self, "Starting srcpad task");
1255
1256   self->priv->running = TRUE;
1257   gst_pad_start_task (GST_PAD (self->srcpad),
1258       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1259 }
1260
1261 static GstFlowReturn
1262 gst_aggregator_flush (GstAggregator * self)
1263 {
1264   GstFlowReturn ret = GST_FLOW_OK;
1265   GstAggregatorPrivate *priv = self->priv;
1266   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1267
1268   GST_DEBUG_OBJECT (self, "Flushing everything");
1269   GST_OBJECT_LOCK (self);
1270   priv->send_segment = TRUE;
1271   priv->flush_seeking = FALSE;
1272   priv->tags_changed = FALSE;
1273   GST_OBJECT_UNLOCK (self);
1274   if (klass->flush)
1275     ret = klass->flush (self);
1276
1277   return ret;
1278 }
1279
1280
1281 /* Called with GstAggregator's object lock held */
1282
1283 static gboolean
1284 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1285 {
1286   GList *tmp;
1287   GstAggregatorPad *tmppad;
1288
1289   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1290     tmppad = (GstAggregatorPad *) tmp->data;
1291
1292     if (_check_pending_flush_stop (tmppad) == FALSE) {
1293       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1294           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1295       return FALSE;
1296     }
1297   }
1298
1299   return TRUE;
1300 }
1301
1302 static void
1303 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1304     GstEvent * event)
1305 {
1306   GstAggregatorPrivate *priv = self->priv;
1307   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1308
1309   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1310
1311   PAD_FLUSH_LOCK (aggpad);
1312   PAD_LOCK (aggpad);
1313   if (padpriv->pending_flush_start) {
1314     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1315
1316     padpriv->pending_flush_start = FALSE;
1317     padpriv->pending_flush_stop = TRUE;
1318   }
1319   PAD_UNLOCK (aggpad);
1320
1321   GST_OBJECT_LOCK (self);
1322   if (priv->flush_seeking) {
1323     /* If flush_seeking we forward the first FLUSH_START */
1324     if (priv->pending_flush_start) {
1325       priv->pending_flush_start = FALSE;
1326       GST_OBJECT_UNLOCK (self);
1327
1328       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1329       gst_aggregator_stop_srcpad_task (self, event);
1330
1331       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1332       GST_PAD_STREAM_LOCK (self->srcpad);
1333       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1334       event = NULL;
1335     } else {
1336       GST_OBJECT_UNLOCK (self);
1337       gst_event_unref (event);
1338     }
1339   } else {
1340     GST_OBJECT_UNLOCK (self);
1341     gst_event_unref (event);
1342   }
1343   PAD_FLUSH_UNLOCK (aggpad);
1344 }
1345
1346 /* Must be called with the the PAD_LOCK held */
1347 static void
1348 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1349 {
1350   GstAggregatorPadPrivate *priv = aggpad->priv;
1351
1352   if (head) {
1353     if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
1354         priv->head_segment.format == GST_FORMAT_TIME)
1355       priv->head_time = gst_segment_to_running_time (&priv->head_segment,
1356           GST_FORMAT_TIME, priv->head_position);
1357     else
1358       priv->head_time = GST_CLOCK_TIME_NONE;
1359
1360     if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
1361       priv->tail_time = priv->head_time;
1362   } else {
1363     if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
1364         aggpad->segment.format == GST_FORMAT_TIME)
1365       priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
1366           GST_FORMAT_TIME, priv->tail_position);
1367     else
1368       priv->tail_time = priv->head_time;
1369   }
1370
1371   if (priv->head_time == GST_CLOCK_TIME_NONE ||
1372       priv->tail_time == GST_CLOCK_TIME_NONE) {
1373     priv->time_level = 0;
1374     return;
1375   }
1376
1377   if (priv->tail_time > priv->head_time)
1378     priv->time_level = 0;
1379   else
1380     priv->time_level = priv->head_time - priv->tail_time;
1381 }
1382
1383
1384 /* GstAggregator vmethods default implementations */
1385 static gboolean
1386 gst_aggregator_default_sink_event (GstAggregator * self,
1387     GstAggregatorPad * aggpad, GstEvent * event)
1388 {
1389   gboolean res = TRUE;
1390   GstPad *pad = GST_PAD (aggpad);
1391   GstAggregatorPrivate *priv = self->priv;
1392
1393   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1394
1395   switch (GST_EVENT_TYPE (event)) {
1396     case GST_EVENT_FLUSH_START:
1397     {
1398       gst_aggregator_flush_start (self, aggpad, event);
1399       /* We forward only in one case: right after flush_seeking */
1400       event = NULL;
1401       goto eat;
1402     }
1403     case GST_EVENT_FLUSH_STOP:
1404     {
1405       gst_aggregator_pad_flush (aggpad, self);
1406       GST_OBJECT_LOCK (self);
1407       if (priv->flush_seeking) {
1408         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1409         if (gst_aggregator_all_flush_stop_received_locked (self)) {
1410           GST_OBJECT_UNLOCK (self);
1411           /* That means we received FLUSH_STOP/FLUSH_STOP on
1412            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1413           gst_aggregator_flush (self);
1414           gst_pad_push_event (self->srcpad, event);
1415           event = NULL;
1416           SRC_LOCK (self);
1417           priv->send_eos = TRUE;
1418           SRC_BROADCAST (self);
1419           SRC_UNLOCK (self);
1420
1421           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1422           GST_PAD_STREAM_UNLOCK (self->srcpad);
1423           gst_aggregator_start_srcpad_task (self);
1424         } else {
1425           GST_OBJECT_UNLOCK (self);
1426         }
1427       } else {
1428         GST_OBJECT_UNLOCK (self);
1429       }
1430
1431       /* We never forward the event */
1432       goto eat;
1433     }
1434     case GST_EVENT_EOS:
1435     {
1436       SRC_LOCK (self);
1437       PAD_LOCK (aggpad);
1438       g_assert (aggpad->priv->num_buffers == 0);
1439       aggpad->priv->eos = TRUE;
1440       PAD_UNLOCK (aggpad);
1441       SRC_BROADCAST (self);
1442       SRC_UNLOCK (self);
1443       goto eat;
1444     }
1445     case GST_EVENT_SEGMENT:
1446     {
1447       PAD_LOCK (aggpad);
1448       GST_OBJECT_LOCK (aggpad);
1449       gst_event_copy_segment (event, &aggpad->segment);
1450       /* We've got a new segment, tail_position is now meaningless
1451        * and may interfere with the time_level calculation
1452        */
1453       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1454       update_time_level (aggpad, FALSE);
1455       GST_OBJECT_UNLOCK (aggpad);
1456       PAD_UNLOCK (aggpad);
1457
1458       GST_OBJECT_LOCK (self);
1459       self->priv->seqnum = gst_event_get_seqnum (event);
1460       GST_OBJECT_UNLOCK (self);
1461       goto eat;
1462     }
1463     case GST_EVENT_STREAM_START:
1464     {
1465       goto eat;
1466     }
1467     case GST_EVENT_GAP:
1468     {
1469       GstClockTime pts, endpts;
1470       GstClockTime duration;
1471       GstBuffer *gapbuf;
1472
1473       gst_event_parse_gap (event, &pts, &duration);
1474       gapbuf = gst_buffer_new ();
1475
1476       if (GST_CLOCK_TIME_IS_VALID (duration))
1477         endpts = pts + duration;
1478       else
1479         endpts = GST_CLOCK_TIME_NONE;
1480
1481       GST_OBJECT_LOCK (aggpad);
1482       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1483           &pts, &endpts);
1484       GST_OBJECT_UNLOCK (aggpad);
1485
1486       if (!res) {
1487         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1488         goto eat;
1489       }
1490
1491       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1492         duration = endpts - pts;
1493       else
1494         duration = GST_CLOCK_TIME_NONE;
1495
1496       GST_BUFFER_PTS (gapbuf) = pts;
1497       GST_BUFFER_DURATION (gapbuf) = duration;
1498       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1499       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1500
1501       /* Remove GAP event so we can replace it with the buffer */
1502       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1503         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1504
1505       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1506           GST_FLOW_OK) {
1507         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1508         res = FALSE;
1509       }
1510
1511       goto eat;
1512     }
1513     case GST_EVENT_TAG:
1514       goto eat;
1515     default:
1516     {
1517       break;
1518     }
1519   }
1520
1521   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1522   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1523
1524 eat:
1525   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1526   if (event)
1527     gst_event_unref (event);
1528
1529   return res;
1530 }
1531
1532 static gboolean
1533 gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
1534 {
1535   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
1536   GstAggregator *agg = GST_AGGREGATOR_CAST (self);
1537
1538   gst_aggregator_pad_flush (pad, agg);
1539
1540   PAD_LOCK (pad);
1541   pad->priv->flow_return = GST_FLOW_FLUSHING;
1542   pad->priv->negotiated = FALSE;
1543   PAD_BROADCAST_EVENT (pad);
1544   PAD_UNLOCK (pad);
1545
1546   return TRUE;
1547 }
1548
1549 static gboolean
1550 gst_aggregator_stop (GstAggregator * agg)
1551 {
1552   GstAggregatorClass *klass;
1553   gboolean result;
1554
1555   gst_aggregator_reset_flow_values (agg);
1556
1557   /* Application needs to make sure no pads are added while it shuts us down */
1558   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
1559       gst_aggregator_stop_pad, NULL);
1560
1561   klass = GST_AGGREGATOR_GET_CLASS (agg);
1562
1563   if (klass->stop)
1564     result = klass->stop (agg);
1565   else
1566     result = TRUE;
1567
1568   agg->priv->has_peer_latency = FALSE;
1569   agg->priv->peer_latency_live = FALSE;
1570   agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
1571
1572   if (agg->priv->tags)
1573     gst_tag_list_unref (agg->priv->tags);
1574   agg->priv->tags = NULL;
1575
1576   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1577
1578   return result;
1579 }
1580
1581 /* GstElement vmethods implementations */
1582 static GstStateChangeReturn
1583 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1584 {
1585   GstStateChangeReturn ret;
1586   GstAggregator *self = GST_AGGREGATOR (element);
1587
1588   switch (transition) {
1589     case GST_STATE_CHANGE_READY_TO_PAUSED:
1590       if (!gst_aggregator_start (self))
1591         goto error_start;
1592       break;
1593     default:
1594       break;
1595   }
1596
1597   if ((ret =
1598           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1599               transition)) == GST_STATE_CHANGE_FAILURE)
1600     goto failure;
1601
1602
1603   switch (transition) {
1604     case GST_STATE_CHANGE_PAUSED_TO_READY:
1605       if (!gst_aggregator_stop (self)) {
1606         /* What to do in this case? Error out? */
1607         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1608       }
1609       break;
1610     default:
1611       break;
1612   }
1613
1614   return ret;
1615
1616 /* ERRORS */
1617 failure:
1618   {
1619     GST_ERROR_OBJECT (element, "parent failed state change");
1620     return ret;
1621   }
1622 error_start:
1623   {
1624     GST_ERROR_OBJECT (element, "Subclass failed to start");
1625     return GST_STATE_CHANGE_FAILURE;
1626   }
1627 }
1628
1629 static void
1630 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1631 {
1632   GstAggregator *self = GST_AGGREGATOR (element);
1633   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1634
1635   GST_INFO_OBJECT (pad, "Removing pad");
1636
1637   SRC_LOCK (self);
1638   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1639   gst_element_remove_pad (element, pad);
1640
1641   self->priv->has_peer_latency = FALSE;
1642   SRC_BROADCAST (self);
1643   SRC_UNLOCK (self);
1644 }
1645
1646 static GstAggregatorPad *
1647 gst_aggregator_default_create_new_pad (GstAggregator * self,
1648     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1649 {
1650   GstAggregatorPad *agg_pad;
1651   GstAggregatorPrivate *priv = self->priv;
1652   gint serial = 0;
1653   gchar *name = NULL;
1654   GType pad_type =
1655       GST_PAD_TEMPLATE_GTYPE (templ) ==
1656       G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
1657
1658   if (templ->direction != GST_PAD_SINK)
1659     goto not_sink;
1660
1661   if (templ->presence != GST_PAD_REQUEST)
1662     goto not_request;
1663
1664   GST_OBJECT_LOCK (self);
1665   if (req_name == NULL || strlen (req_name) < 6
1666       || !g_str_has_prefix (req_name, "sink_")) {
1667     /* no name given when requesting the pad, use next available int */
1668     serial = ++priv->max_padserial;
1669   } else {
1670     /* parse serial number from requested padname */
1671     serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1672     if (serial > priv->max_padserial)
1673       priv->max_padserial = serial;
1674   }
1675
1676   name = g_strdup_printf ("sink_%u", serial);
1677   agg_pad = g_object_new (pad_type,
1678       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1679   g_free (name);
1680
1681   GST_OBJECT_UNLOCK (self);
1682
1683   return agg_pad;
1684
1685   /* errors */
1686 not_sink:
1687   {
1688     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
1689     return NULL;
1690   }
1691 not_request:
1692   {
1693     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
1694     return NULL;
1695   }
1696 }
1697
1698 static GstPad *
1699 gst_aggregator_request_new_pad (GstElement * element,
1700     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1701 {
1702   GstAggregator *self;
1703   GstAggregatorPad *agg_pad;
1704   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1705   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1706
1707   self = GST_AGGREGATOR (element);
1708
1709   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1710   if (!agg_pad) {
1711     GST_ERROR_OBJECT (element, "Couldn't create new pad");
1712     return NULL;
1713   }
1714
1715   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1716
1717   if (priv->running)
1718     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1719
1720   /* add the pad to the element */
1721   gst_element_add_pad (element, GST_PAD (agg_pad));
1722
1723   return GST_PAD (agg_pad);
1724 }
1725
1726 /* Must be called with SRC_LOCK held */
1727
1728 static gboolean
1729 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1730 {
1731   gboolean query_ret, live;
1732   GstClockTime our_latency, min, max;
1733
1734   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1735
1736   if (!query_ret) {
1737     GST_WARNING_OBJECT (self, "Latency query failed");
1738     return FALSE;
1739   }
1740
1741   gst_query_parse_latency (query, &live, &min, &max);
1742
1743   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1744     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1745         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1746     return FALSE;
1747   }
1748
1749   if (self->priv->upstream_latency_min > min) {
1750     GstClockTimeDiff diff =
1751         GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
1752
1753     min += diff;
1754     if (GST_CLOCK_TIME_IS_VALID (max)) {
1755       max += diff;
1756     }
1757   }
1758
1759   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1760     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1761         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1762             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1763             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1764     return FALSE;
1765   }
1766
1767   our_latency = self->priv->latency;
1768
1769   self->priv->peer_latency_live = live;
1770   self->priv->peer_latency_min = min;
1771   self->priv->peer_latency_max = max;
1772   self->priv->has_peer_latency = TRUE;
1773
1774   /* add our own */
1775   min += our_latency;
1776   min += self->priv->sub_latency_min;
1777   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1778       && GST_CLOCK_TIME_IS_VALID (max))
1779     max += self->priv->sub_latency_max + our_latency;
1780   else
1781     max = GST_CLOCK_TIME_NONE;
1782
1783   SRC_BROADCAST (self);
1784
1785   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1786       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1787
1788   gst_query_set_latency (query, live, min, max);
1789
1790   return query_ret;
1791 }
1792
1793 /*
1794  * MUST be called with the src_lock held.
1795  *
1796  * See  gst_aggregator_get_latency() for doc
1797  */
1798 static GstClockTime
1799 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1800 {
1801   GstClockTime latency;
1802
1803   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1804
1805   if (!self->priv->has_peer_latency) {
1806     GstQuery *query = gst_query_new_latency ();
1807     gboolean ret;
1808
1809     ret = gst_aggregator_query_latency_unlocked (self, query);
1810     gst_query_unref (query);
1811     if (!ret)
1812       return GST_CLOCK_TIME_NONE;
1813   }
1814
1815   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1816     return GST_CLOCK_TIME_NONE;
1817
1818   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1819   latency = self->priv->peer_latency_min;
1820
1821   /* add our own */
1822   latency += self->priv->latency;
1823   latency += self->priv->sub_latency_min;
1824
1825   return latency;
1826 }
1827
1828 /**
1829  * gst_aggregator_get_latency:
1830  * @self: a #GstAggregator
1831  *
1832  * Retrieves the latency values reported by @self in response to the latency
1833  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1834  * will not wait for the clock.
1835  *
1836  * Typically only called by subclasses.
1837  *
1838  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1839  */
1840 GstClockTime
1841 gst_aggregator_get_latency (GstAggregator * self)
1842 {
1843   GstClockTime ret;
1844
1845   SRC_LOCK (self);
1846   ret = gst_aggregator_get_latency_unlocked (self);
1847   SRC_UNLOCK (self);
1848
1849   return ret;
1850 }
1851
1852 static gboolean
1853 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1854 {
1855   GstAggregator *self = GST_AGGREGATOR (element);
1856
1857   GST_STATE_LOCK (element);
1858   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1859       GST_STATE (element) < GST_STATE_PAUSED) {
1860     gdouble rate;
1861     GstFormat fmt;
1862     GstSeekFlags flags;
1863     GstSeekType start_type, stop_type;
1864     gint64 start, stop;
1865
1866     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1867         &start, &stop_type, &stop);
1868
1869     GST_OBJECT_LOCK (self);
1870     gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
1871         flags, start_type, start, stop_type, stop, NULL);
1872     self->priv->seqnum = gst_event_get_seqnum (event);
1873     self->priv->first_buffer = FALSE;
1874     GST_OBJECT_UNLOCK (self);
1875
1876     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1877   }
1878   GST_STATE_UNLOCK (element);
1879
1880
1881   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1882       event);
1883 }
1884
1885 static gboolean
1886 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1887 {
1888   gboolean res = TRUE;
1889
1890   switch (GST_QUERY_TYPE (query)) {
1891     case GST_QUERY_SEEKING:
1892     {
1893       GstFormat format;
1894
1895       /* don't pass it along as some (file)sink might claim it does
1896        * whereas with a collectpads in between that will not likely work */
1897       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1898       gst_query_set_seeking (query, format, FALSE, 0, -1);
1899       res = TRUE;
1900
1901       break;
1902     }
1903     case GST_QUERY_LATENCY:
1904       SRC_LOCK (self);
1905       res = gst_aggregator_query_latency_unlocked (self, query);
1906       SRC_UNLOCK (self);
1907       break;
1908     default:
1909       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1910   }
1911
1912   return res;
1913 }
1914
1915 static gboolean
1916 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1917 {
1918   EventData *evdata = user_data;
1919   gboolean ret = TRUE;
1920   GstPad *peer = gst_pad_get_peer (pad);
1921   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1922
1923   if (peer) {
1924     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1925       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1926       ret = TRUE;
1927     } else {
1928       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1929       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1930     }
1931   }
1932
1933   if (ret == FALSE) {
1934     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1935       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1936
1937       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1938
1939       if (gst_pad_query (peer, seeking)) {
1940         gboolean seekable;
1941
1942         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1943
1944         if (seekable == FALSE) {
1945           GST_INFO_OBJECT (pad,
1946               "Source not seekable, We failed but it does not matter!");
1947
1948           ret = TRUE;
1949         }
1950       } else {
1951         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1952       }
1953
1954       gst_query_unref (seeking);
1955     }
1956
1957     if (evdata->flush) {
1958       PAD_LOCK (aggpad);
1959       aggpad->priv->pending_flush_start = FALSE;
1960       aggpad->priv->pending_flush_stop = FALSE;
1961       PAD_UNLOCK (aggpad);
1962     }
1963   } else {
1964     evdata->one_actually_seeked = TRUE;
1965   }
1966
1967   evdata->result &= ret;
1968
1969   if (peer)
1970     gst_object_unref (peer);
1971
1972   /* Always send to all pads */
1973   return FALSE;
1974 }
1975
1976 static void
1977 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1978     EventData * evdata)
1979 {
1980   evdata->result = TRUE;
1981   evdata->one_actually_seeked = FALSE;
1982
1983   /* We first need to set all pads as flushing in a first pass
1984    * as flush_start flush_stop is sometimes sent synchronously
1985    * while we send the seek event */
1986   if (evdata->flush) {
1987     GList *l;
1988
1989     GST_OBJECT_LOCK (self);
1990     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1991       GstAggregatorPad *pad = l->data;
1992
1993       PAD_LOCK (pad);
1994       pad->priv->pending_flush_start = TRUE;
1995       pad->priv->pending_flush_stop = FALSE;
1996       PAD_UNLOCK (pad);
1997     }
1998     GST_OBJECT_UNLOCK (self);
1999   }
2000
2001   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
2002
2003   gst_event_unref (evdata->event);
2004 }
2005
2006 static gboolean
2007 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
2008 {
2009   gdouble rate;
2010   GstFormat fmt;
2011   GstSeekFlags flags;
2012   GstSeekType start_type, stop_type;
2013   gint64 start, stop;
2014   gboolean flush;
2015   EventData evdata = { 0, };
2016   GstAggregatorPrivate *priv = self->priv;
2017
2018   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2019       &start, &stop_type, &stop);
2020
2021   GST_INFO_OBJECT (self, "starting SEEK");
2022
2023   flush = flags & GST_SEEK_FLAG_FLUSH;
2024
2025   GST_OBJECT_LOCK (self);
2026   if (flush) {
2027     priv->pending_flush_start = TRUE;
2028     priv->flush_seeking = TRUE;
2029   }
2030
2031   gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2032       flags, start_type, start, stop_type, stop, NULL);
2033
2034   /* Seeking sets a position */
2035   self->priv->first_buffer = FALSE;
2036   GST_OBJECT_UNLOCK (self);
2037
2038   /* forward the seek upstream */
2039   evdata.event = event;
2040   evdata.flush = flush;
2041   evdata.only_to_active_pads = FALSE;
2042   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2043   event = NULL;
2044
2045   if (!evdata.result || !evdata.one_actually_seeked) {
2046     GST_OBJECT_LOCK (self);
2047     priv->flush_seeking = FALSE;
2048     priv->pending_flush_start = FALSE;
2049     GST_OBJECT_UNLOCK (self);
2050   }
2051
2052   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2053
2054   return evdata.result;
2055 }
2056
2057 static gboolean
2058 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2059 {
2060   EventData evdata = { 0, };
2061
2062   switch (GST_EVENT_TYPE (event)) {
2063     case GST_EVENT_SEEK:
2064       /* _do_seek() unrefs the event. */
2065       return gst_aggregator_do_seek (self, event);
2066     case GST_EVENT_NAVIGATION:
2067       /* navigation is rather pointless. */
2068       gst_event_unref (event);
2069       return FALSE;
2070     default:
2071       break;
2072   }
2073
2074   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2075    * they will receive a QOS event that has earliest_time=0 (because we can't
2076    * have negative timestamps), and consider their buffer as too late */
2077   evdata.event = event;
2078   evdata.flush = FALSE;
2079   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2080   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2081   return evdata.result;
2082 }
2083
2084 static gboolean
2085 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2086     GstEvent * event)
2087 {
2088   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2089
2090   return klass->src_event (GST_AGGREGATOR (parent), event);
2091 }
2092
2093 static gboolean
2094 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2095     GstQuery * query)
2096 {
2097   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2098
2099   return klass->src_query (GST_AGGREGATOR (parent), query);
2100 }
2101
2102 static gboolean
2103 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2104     GstObject * parent, GstPadMode mode, gboolean active)
2105 {
2106   GstAggregator *self = GST_AGGREGATOR (parent);
2107   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2108
2109   if (klass->src_activate) {
2110     if (klass->src_activate (self, mode, active) == FALSE) {
2111       return FALSE;
2112     }
2113   }
2114
2115   if (active == TRUE) {
2116     switch (mode) {
2117       case GST_PAD_MODE_PUSH:
2118       {
2119         GST_INFO_OBJECT (pad, "Activating pad!");
2120         gst_aggregator_start_srcpad_task (self);
2121         return TRUE;
2122       }
2123       default:
2124       {
2125         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2126         return FALSE;
2127       }
2128     }
2129   }
2130
2131   /* deactivating */
2132   GST_INFO_OBJECT (self, "Deactivating srcpad");
2133   gst_aggregator_stop_srcpad_task (self, FALSE);
2134
2135   return TRUE;
2136 }
2137
2138 static gboolean
2139 gst_aggregator_default_sink_query (GstAggregator * self,
2140     GstAggregatorPad * aggpad, GstQuery * query)
2141 {
2142   GstPad *pad = GST_PAD (aggpad);
2143
2144   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2145     GstQuery *decide_query = NULL;
2146     GstAggregatorClass *agg_class;
2147     gboolean ret;
2148
2149     GST_OBJECT_LOCK (self);
2150     PAD_LOCK (aggpad);
2151     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2152       GST_DEBUG_OBJECT (self,
2153           "not negotiated yet, can't answer ALLOCATION query");
2154       PAD_UNLOCK (aggpad);
2155       GST_OBJECT_UNLOCK (self);
2156
2157       return FALSE;
2158     }
2159
2160     if ((decide_query = self->priv->allocation_query))
2161       gst_query_ref (decide_query);
2162     PAD_UNLOCK (aggpad);
2163     GST_OBJECT_UNLOCK (self);
2164
2165     GST_DEBUG_OBJECT (self,
2166         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2167
2168     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2169
2170     /* pass the query to the propose_allocation vmethod if any */
2171     if (agg_class->propose_allocation)
2172       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2173     else
2174       ret = FALSE;
2175
2176     if (decide_query)
2177       gst_query_unref (decide_query);
2178
2179     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2180     return ret;
2181   }
2182
2183   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2184 }
2185
2186 static void
2187 gst_aggregator_finalize (GObject * object)
2188 {
2189   GstAggregator *self = (GstAggregator *) object;
2190
2191   g_mutex_clear (&self->priv->src_lock);
2192   g_cond_clear (&self->priv->src_cond);
2193
2194   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2195 }
2196
2197 /*
2198  * gst_aggregator_set_latency_property:
2199  * @agg: a #GstAggregator
2200  * @latency: the new latency value (in nanoseconds).
2201  *
2202  * Sets the new latency value to @latency. This value is used to limit the
2203  * amount of time a pad waits for data to appear before considering the pad
2204  * as unresponsive.
2205  */
2206 static void
2207 gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
2208 {
2209   gboolean changed;
2210
2211   g_return_if_fail (GST_IS_AGGREGATOR (self));
2212   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2213
2214   SRC_LOCK (self);
2215   changed = (self->priv->latency != latency);
2216
2217   if (changed) {
2218     GList *item;
2219
2220     GST_OBJECT_LOCK (self);
2221     /* First lock all the pads */
2222     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2223       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2224       PAD_LOCK (aggpad);
2225     }
2226
2227     self->priv->latency = latency;
2228
2229     SRC_BROADCAST (self);
2230
2231     /* Now wake up the pads */
2232     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2233       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2234       PAD_BROADCAST_EVENT (aggpad);
2235       PAD_UNLOCK (aggpad);
2236     }
2237     GST_OBJECT_UNLOCK (self);
2238   }
2239
2240   SRC_UNLOCK (self);
2241
2242   if (changed)
2243     gst_element_post_message (GST_ELEMENT_CAST (self),
2244         gst_message_new_latency (GST_OBJECT_CAST (self)));
2245 }
2246
2247 /*
2248  * gst_aggregator_get_latency_property:
2249  * @agg: a #GstAggregator
2250  *
2251  * Gets the latency value. See gst_aggregator_set_latency for
2252  * more details.
2253  *
2254  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2255  * before a pad is deemed unresponsive. A value of -1 means an
2256  * unlimited time.
2257  */
2258 static GstClockTime
2259 gst_aggregator_get_latency_property (GstAggregator * agg)
2260 {
2261   GstClockTime res;
2262
2263   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
2264
2265   GST_OBJECT_LOCK (agg);
2266   res = agg->priv->latency;
2267   GST_OBJECT_UNLOCK (agg);
2268
2269   return res;
2270 }
2271
2272 static void
2273 gst_aggregator_set_property (GObject * object, guint prop_id,
2274     const GValue * value, GParamSpec * pspec)
2275 {
2276   GstAggregator *agg = GST_AGGREGATOR (object);
2277
2278   switch (prop_id) {
2279     case PROP_LATENCY:
2280       gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
2281       break;
2282     case PROP_MIN_UPSTREAM_LATENCY:
2283       SRC_LOCK (agg);
2284       agg->priv->upstream_latency_min = g_value_get_uint64 (value);
2285       SRC_UNLOCK (agg);
2286       break;
2287     case PROP_START_TIME_SELECTION:
2288       agg->priv->start_time_selection = g_value_get_enum (value);
2289       break;
2290     case PROP_START_TIME:
2291       agg->priv->start_time = g_value_get_uint64 (value);
2292       break;
2293     default:
2294       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2295       break;
2296   }
2297 }
2298
2299 static void
2300 gst_aggregator_get_property (GObject * object, guint prop_id,
2301     GValue * value, GParamSpec * pspec)
2302 {
2303   GstAggregator *agg = GST_AGGREGATOR (object);
2304
2305   switch (prop_id) {
2306     case PROP_LATENCY:
2307       g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
2308       break;
2309     case PROP_MIN_UPSTREAM_LATENCY:
2310       SRC_LOCK (agg);
2311       g_value_set_uint64 (value, agg->priv->upstream_latency_min);
2312       SRC_UNLOCK (agg);
2313       break;
2314     case PROP_START_TIME_SELECTION:
2315       g_value_set_enum (value, agg->priv->start_time_selection);
2316       break;
2317     case PROP_START_TIME:
2318       g_value_set_uint64 (value, agg->priv->start_time);
2319       break;
2320     default:
2321       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2322       break;
2323   }
2324 }
2325
2326 /* GObject vmethods implementations */
2327 static void
2328 gst_aggregator_class_init (GstAggregatorClass * klass)
2329 {
2330   GObjectClass *gobject_class = (GObjectClass *) klass;
2331   GstElementClass *gstelement_class = (GstElementClass *) klass;
2332
2333   aggregator_parent_class = g_type_class_peek_parent (klass);
2334
2335   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2336       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2337
2338   if (aggregator_private_offset != 0)
2339     g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
2340
2341   klass->finish_buffer = gst_aggregator_default_finish_buffer;
2342
2343   klass->sink_event = gst_aggregator_default_sink_event;
2344   klass->sink_query = gst_aggregator_default_sink_query;
2345
2346   klass->src_event = gst_aggregator_default_src_event;
2347   klass->src_query = gst_aggregator_default_src_query;
2348
2349   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2350   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2351   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2352   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2353
2354   gstelement_class->request_new_pad =
2355       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2356   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2357   gstelement_class->release_pad =
2358       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2359   gstelement_class->change_state =
2360       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2361
2362   gobject_class->set_property = gst_aggregator_set_property;
2363   gobject_class->get_property = gst_aggregator_get_property;
2364   gobject_class->finalize = gst_aggregator_finalize;
2365
2366   g_object_class_install_property (gobject_class, PROP_LATENCY,
2367       g_param_spec_uint64 ("latency", "Buffer latency",
2368           "Additional latency in live mode to allow upstream "
2369           "to take longer to produce buffers for the current "
2370           "position (in nanoseconds)", 0, G_MAXUINT64,
2371           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2372
2373   /**
2374    * GstAggregator:min-upstream-latency:
2375    *
2376    * Since: 1.16
2377    */
2378   g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
2379       g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
2380           "When sources with a higher latency are expected to be plugged "
2381           "in dynamically after the aggregator has started playing, "
2382           "this allows overriding the minimum latency reported by the "
2383           "initial source(s). This is only taken into account when superior "
2384           "to the reported minimum latency.",
2385           0, G_MAXUINT64,
2386           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2387
2388   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2389       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2390           "Decides which start time is output",
2391           gst_aggregator_start_time_selection_get_type (),
2392           DEFAULT_START_TIME_SELECTION,
2393           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2394
2395   g_object_class_install_property (gobject_class, PROP_START_TIME,
2396       g_param_spec_uint64 ("start-time", "Start Time",
2397           "Start time to use if start-time-selection=set", 0,
2398           G_MAXUINT64,
2399           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2400 }
2401
2402 static inline gpointer
2403 gst_aggregator_get_instance_private (GstAggregator * self)
2404 {
2405   return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
2406 }
2407
2408 static void
2409 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2410 {
2411   GstPadTemplate *pad_template;
2412   GstAggregatorPrivate *priv;
2413
2414   g_return_if_fail (klass->aggregate != NULL);
2415
2416   self->priv = gst_aggregator_get_instance_private (self);
2417
2418   priv = self->priv;
2419
2420   pad_template =
2421       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2422   g_return_if_fail (pad_template != NULL);
2423
2424   priv->max_padserial = -1;
2425   priv->tags_changed = FALSE;
2426
2427   self->priv->peer_latency_live = FALSE;
2428   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2429   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2430   self->priv->has_peer_latency = FALSE;
2431
2432   self->srcpad = gst_pad_new_from_template (pad_template, "src");
2433
2434   gst_aggregator_reset_flow_values (self);
2435
2436   gst_pad_set_event_function (self->srcpad,
2437       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2438   gst_pad_set_query_function (self->srcpad,
2439       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2440   gst_pad_set_activatemode_function (self->srcpad,
2441       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2442
2443   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2444
2445   self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
2446   self->priv->latency = DEFAULT_LATENCY;
2447   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2448   self->priv->start_time = DEFAULT_START_TIME;
2449
2450   g_mutex_init (&self->priv->src_lock);
2451   g_cond_init (&self->priv->src_cond);
2452 }
2453
2454 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2455  * method to get to the padtemplates */
2456 GType
2457 gst_aggregator_get_type (void)
2458 {
2459   static volatile gsize type = 0;
2460
2461   if (g_once_init_enter (&type)) {
2462     GType _type;
2463     static const GTypeInfo info = {
2464       sizeof (GstAggregatorClass),
2465       NULL,
2466       NULL,
2467       (GClassInitFunc) gst_aggregator_class_init,
2468       NULL,
2469       NULL,
2470       sizeof (GstAggregator),
2471       0,
2472       (GInstanceInitFunc) gst_aggregator_init,
2473     };
2474
2475     _type = g_type_register_static (GST_TYPE_ELEMENT,
2476         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2477
2478     aggregator_private_offset =
2479         g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
2480
2481     g_once_init_leave (&type, _type);
2482   }
2483   return type;
2484 }
2485
2486 /* Must be called with SRC lock and PAD lock held */
2487 static gboolean
2488 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2489 {
2490   /* Empty queue always has space */
2491   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2492     return TRUE;
2493
2494   /* We also want at least two buffers, one is being processed and one is ready
2495    * for the next iteration when we operate in live mode. */
2496   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2497     return TRUE;
2498
2499   /* zero latency, if there is a buffer, it's full */
2500   if (self->priv->latency == 0)
2501     return FALSE;
2502
2503   /* Allow no more buffers than the latency */
2504   return (aggpad->priv->time_level <= self->priv->latency);
2505 }
2506
2507 /* Must be called with the PAD_LOCK held */
2508 static void
2509 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2510 {
2511   GstClockTime timestamp;
2512
2513   if (GST_BUFFER_DTS_IS_VALID (buffer))
2514     timestamp = GST_BUFFER_DTS (buffer);
2515   else
2516     timestamp = GST_BUFFER_PTS (buffer);
2517
2518   if (timestamp == GST_CLOCK_TIME_NONE) {
2519     if (head)
2520       timestamp = aggpad->priv->head_position;
2521     else
2522       timestamp = aggpad->priv->tail_position;
2523   }
2524
2525   /* add duration */
2526   if (GST_BUFFER_DURATION_IS_VALID (buffer))
2527     timestamp += GST_BUFFER_DURATION (buffer);
2528
2529   if (head)
2530     aggpad->priv->head_position = timestamp;
2531   else
2532     aggpad->priv->tail_position = timestamp;
2533
2534   update_time_level (aggpad, head);
2535 }
2536
2537 /*
2538  * Can be called either from the sinkpad's chain function or from the srcpad's
2539  * thread in the case of a buffer synthetized from a GAP event.
2540  * Because of this second case, FLUSH_LOCK can't be used here.
2541  */
2542
2543 static GstFlowReturn
2544 gst_aggregator_pad_chain_internal (GstAggregator * self,
2545     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2546 {
2547   GstFlowReturn flow_return;
2548   GstClockTime buf_pts;
2549
2550   PAD_LOCK (aggpad);
2551   flow_return = aggpad->priv->flow_return;
2552   if (flow_return != GST_FLOW_OK)
2553     goto flushing;
2554
2555   PAD_UNLOCK (aggpad);
2556
2557   buf_pts = GST_BUFFER_PTS (buffer);
2558
2559   for (;;) {
2560     SRC_LOCK (self);
2561     GST_OBJECT_LOCK (self);
2562     PAD_LOCK (aggpad);
2563
2564     if (aggpad->priv->first_buffer) {
2565       self->priv->has_peer_latency = FALSE;
2566       aggpad->priv->first_buffer = FALSE;
2567     }
2568
2569     if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
2570         && aggpad->priv->flow_return == GST_FLOW_OK) {
2571       if (head)
2572         g_queue_push_head (&aggpad->priv->data, buffer);
2573       else
2574         g_queue_push_tail (&aggpad->priv->data, buffer);
2575       apply_buffer (aggpad, buffer, head);
2576       aggpad->priv->num_buffers++;
2577       buffer = NULL;
2578       SRC_BROADCAST (self);
2579       break;
2580     }
2581
2582     flow_return = aggpad->priv->flow_return;
2583     if (flow_return != GST_FLOW_OK) {
2584       GST_OBJECT_UNLOCK (self);
2585       SRC_UNLOCK (self);
2586       goto flushing;
2587     }
2588     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2589     GST_OBJECT_UNLOCK (self);
2590     SRC_UNLOCK (self);
2591     PAD_WAIT_EVENT (aggpad);
2592
2593     PAD_UNLOCK (aggpad);
2594   }
2595
2596   if (self->priv->first_buffer) {
2597     GstClockTime start_time;
2598     GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
2599
2600     switch (self->priv->start_time_selection) {
2601       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2602       default:
2603         start_time = 0;
2604         break;
2605       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2606         GST_OBJECT_LOCK (aggpad);
2607         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2608           start_time = buf_pts;
2609           if (start_time != -1) {
2610             start_time = MAX (start_time, aggpad->priv->head_segment.start);
2611             start_time =
2612                 gst_segment_to_running_time (&aggpad->priv->head_segment,
2613                 GST_FORMAT_TIME, start_time);
2614           }
2615         } else {
2616           start_time = 0;
2617           GST_WARNING_OBJECT (aggpad,
2618               "Ignoring request of selecting the first start time "
2619               "as the segment is a %s segment instead of a time segment",
2620               gst_format_get_name (aggpad->segment.format));
2621         }
2622         GST_OBJECT_UNLOCK (aggpad);
2623         break;
2624       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2625         start_time = self->priv->start_time;
2626         if (start_time == -1)
2627           start_time = 0;
2628         break;
2629     }
2630
2631     if (start_time != -1) {
2632       if (srcpad->segment.position == -1)
2633         srcpad->segment.position = start_time;
2634       else
2635         srcpad->segment.position = MIN (start_time, srcpad->segment.position);
2636
2637       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2638           GST_TIME_ARGS (start_time));
2639     }
2640   }
2641
2642   PAD_UNLOCK (aggpad);
2643   GST_OBJECT_UNLOCK (self);
2644   SRC_UNLOCK (self);
2645
2646   GST_DEBUG_OBJECT (aggpad, "Done chaining");
2647
2648   return flow_return;
2649
2650 flushing:
2651   PAD_UNLOCK (aggpad);
2652
2653   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2654       gst_flow_get_name (flow_return));
2655   if (buffer)
2656     gst_buffer_unref (buffer);
2657
2658   return flow_return;
2659 }
2660
2661 static GstFlowReturn
2662 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2663 {
2664   GstFlowReturn ret;
2665   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2666
2667   PAD_FLUSH_LOCK (aggpad);
2668
2669   ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2670       aggpad, buffer, TRUE);
2671
2672   PAD_FLUSH_UNLOCK (aggpad);
2673
2674   return ret;
2675 }
2676
2677 static gboolean
2678 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2679     GstQuery * query)
2680 {
2681   GstAggregator *self = GST_AGGREGATOR (parent);
2682   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2683
2684   if (GST_QUERY_IS_SERIALIZED (query)) {
2685     GstStructure *s;
2686     gboolean ret = FALSE;
2687
2688     SRC_LOCK (self);
2689     PAD_LOCK (aggpad);
2690
2691     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2692       SRC_UNLOCK (self);
2693       goto flushing;
2694     }
2695
2696     g_queue_push_head (&aggpad->priv->data, query);
2697     SRC_BROADCAST (self);
2698     SRC_UNLOCK (self);
2699
2700     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2701         && aggpad->priv->flow_return == GST_FLOW_OK) {
2702       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2703       PAD_WAIT_EVENT (aggpad);
2704     }
2705
2706     s = gst_query_writable_structure (query);
2707     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2708       gst_structure_remove_field (s, "gst-aggregator-retval");
2709     else
2710       g_queue_remove (&aggpad->priv->data, query);
2711
2712     if (aggpad->priv->flow_return != GST_FLOW_OK)
2713       goto flushing;
2714
2715     PAD_UNLOCK (aggpad);
2716
2717     return ret;
2718   } else {
2719     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2720
2721     return klass->sink_query (self, aggpad, query);
2722   }
2723
2724 flushing:
2725   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2726       gst_flow_get_name (aggpad->priv->flow_return));
2727   PAD_UNLOCK (aggpad);
2728
2729   return FALSE;
2730 }
2731
2732 /* Queue serialized events and let the others go through directly.
2733  * The queued events with be handled from the src-pad task in
2734  * gst_aggregator_do_events_and_queries().
2735  */
2736 static GstFlowReturn
2737 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2738     GstEvent * event)
2739 {
2740   GstFlowReturn ret = GST_FLOW_OK;
2741   GstAggregator *self = GST_AGGREGATOR (parent);
2742   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2743
2744   if (GST_EVENT_IS_SERIALIZED (event)
2745       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2746     SRC_LOCK (self);
2747     PAD_LOCK (aggpad);
2748
2749     if (aggpad->priv->flow_return != GST_FLOW_OK)
2750       goto flushing;
2751
2752     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2753       GST_OBJECT_LOCK (aggpad);
2754       gst_event_copy_segment (event, &aggpad->priv->head_segment);
2755       aggpad->priv->head_position = aggpad->priv->head_segment.position;
2756       update_time_level (aggpad, TRUE);
2757       GST_OBJECT_UNLOCK (aggpad);
2758     }
2759
2760     GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
2761     g_queue_push_head (&aggpad->priv->data, event);
2762     SRC_BROADCAST (self);
2763     PAD_UNLOCK (aggpad);
2764     SRC_UNLOCK (self);
2765   } else {
2766     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2767
2768     if (!klass->sink_event (self, aggpad, event)) {
2769       /* Copied from GstPad to convert boolean to a GstFlowReturn in
2770        * the event handling func */
2771       ret = GST_FLOW_ERROR;
2772     }
2773   }
2774
2775   return ret;
2776
2777 flushing:
2778   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2779       gst_flow_get_name (aggpad->priv->flow_return));
2780   PAD_UNLOCK (aggpad);
2781   SRC_UNLOCK (self);
2782   if (GST_EVENT_IS_STICKY (event))
2783     gst_pad_store_sticky_event (pad, event);
2784   gst_event_unref (event);
2785
2786   return aggpad->priv->flow_return;
2787 }
2788
2789 static gboolean
2790 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2791     GstObject * parent, GstPadMode mode, gboolean active)
2792 {
2793   GstAggregator *self = GST_AGGREGATOR (parent);
2794   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2795
2796   if (active == FALSE) {
2797     SRC_LOCK (self);
2798     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2799     SRC_BROADCAST (self);
2800     SRC_UNLOCK (self);
2801   } else {
2802     PAD_LOCK (aggpad);
2803     aggpad->priv->flow_return = GST_FLOW_OK;
2804     PAD_BROADCAST_EVENT (aggpad);
2805     PAD_UNLOCK (aggpad);
2806   }
2807
2808   return TRUE;
2809 }
2810
2811 /***********************************
2812  * GstAggregatorPad implementation  *
2813  ************************************/
2814 G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2815
2816 static void
2817 gst_aggregator_pad_constructed (GObject * object)
2818 {
2819   GstPad *pad = GST_PAD (object);
2820
2821   if (GST_PAD_IS_SINK (pad)) {
2822     gst_pad_set_chain_function (pad,
2823         GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2824     gst_pad_set_event_full_function_full (pad,
2825         GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2826     gst_pad_set_query_function (pad,
2827         GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2828     gst_pad_set_activatemode_function (pad,
2829         GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2830   }
2831 }
2832
2833 static void
2834 gst_aggregator_pad_finalize (GObject * object)
2835 {
2836   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2837
2838   g_cond_clear (&pad->priv->event_cond);
2839   g_mutex_clear (&pad->priv->flush_lock);
2840   g_mutex_clear (&pad->priv->lock);
2841
2842   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2843 }
2844
2845 static void
2846 gst_aggregator_pad_dispose (GObject * object)
2847 {
2848   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2849
2850   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2851
2852   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2853 }
2854
2855 static void
2856 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2857 {
2858   GObjectClass *gobject_class = (GObjectClass *) klass;
2859
2860   gobject_class->constructed = gst_aggregator_pad_constructed;
2861   gobject_class->finalize = gst_aggregator_pad_finalize;
2862   gobject_class->dispose = gst_aggregator_pad_dispose;
2863 }
2864
2865 static void
2866 gst_aggregator_pad_init (GstAggregatorPad * pad)
2867 {
2868   pad->priv = gst_aggregator_pad_get_instance_private (pad);
2869
2870   g_queue_init (&pad->priv->data);
2871   g_cond_init (&pad->priv->event_cond);
2872
2873   g_mutex_init (&pad->priv->flush_lock);
2874   g_mutex_init (&pad->priv->lock);
2875
2876   gst_aggregator_pad_reset_unlocked (pad);
2877   pad->priv->negotiated = FALSE;
2878 }
2879
2880 /* Must be called with the PAD_LOCK held */
2881 static void
2882 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
2883 {
2884   pad->priv->num_buffers--;
2885   GST_TRACE_OBJECT (pad, "Consuming buffer");
2886   PAD_BROADCAST_EVENT (pad);
2887 }
2888
2889 /* Must be called with the PAD_LOCK held */
2890 static void
2891 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
2892 {
2893   GstAggregator *self = NULL;
2894   GstAggregatorClass *aggclass = NULL;
2895   GstBuffer *buffer = NULL;
2896
2897   while (pad->priv->clipped_buffer == NULL &&
2898       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
2899     buffer = g_queue_pop_tail (&pad->priv->data);
2900
2901     apply_buffer (pad, buffer, FALSE);
2902
2903     /* We only take the parent here so that it's not taken if the buffer is
2904      * already clipped or if the queue is empty.
2905      */
2906     if (self == NULL) {
2907       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
2908       if (self == NULL) {
2909         gst_buffer_unref (buffer);
2910         return;
2911       }
2912
2913       aggclass = GST_AGGREGATOR_GET_CLASS (self);
2914     }
2915
2916     if (aggclass->clip) {
2917       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
2918
2919       buffer = aggclass->clip (self, pad, buffer);
2920
2921       if (buffer == NULL) {
2922         gst_aggregator_pad_buffer_consumed (pad);
2923         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
2924       }
2925     }
2926
2927     pad->priv->clipped_buffer = buffer;
2928   }
2929
2930   if (self)
2931     gst_object_unref (self);
2932 }
2933
2934 /**
2935  * gst_aggregator_pad_pop_buffer:
2936  * @pad: the pad to get buffer from
2937  *
2938  * Steal the ref to the buffer currently queued in @pad.
2939  *
2940  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2941  *   queued. You should unref the buffer after usage.
2942  */
2943 GstBuffer *
2944 gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
2945 {
2946   GstBuffer *buffer;
2947
2948   PAD_LOCK (pad);
2949
2950   if (pad->priv->flow_return != GST_FLOW_OK) {
2951     PAD_UNLOCK (pad);
2952     return NULL;
2953   }
2954
2955   gst_aggregator_pad_clip_buffer_unlocked (pad);
2956
2957   buffer = pad->priv->clipped_buffer;
2958
2959   if (buffer) {
2960     pad->priv->clipped_buffer = NULL;
2961     gst_aggregator_pad_buffer_consumed (pad);
2962     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2963   }
2964
2965   PAD_UNLOCK (pad);
2966
2967   return buffer;
2968 }
2969
2970 /**
2971  * gst_aggregator_pad_drop_buffer:
2972  * @pad: the pad where to drop any pending buffer
2973  *
2974  * Drop the buffer currently queued in @pad.
2975  *
2976  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2977  */
2978 gboolean
2979 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2980 {
2981   GstBuffer *buf;
2982
2983   buf = gst_aggregator_pad_pop_buffer (pad);
2984
2985   if (buf == NULL)
2986     return FALSE;
2987
2988   gst_buffer_unref (buf);
2989   return TRUE;
2990 }
2991
2992 /**
2993  * gst_aggregator_pad_peek_buffer:
2994  * @pad: the pad to get buffer from
2995  *
2996  * Returns: (transfer full): A reference to the buffer in @pad or
2997  * NULL if no buffer was queued. You should unref the buffer after
2998  * usage.
2999  */
3000 GstBuffer *
3001 gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
3002 {
3003   GstBuffer *buffer;
3004
3005   PAD_LOCK (pad);
3006
3007   if (pad->priv->flow_return != GST_FLOW_OK) {
3008     PAD_UNLOCK (pad);
3009     return NULL;
3010   }
3011
3012   gst_aggregator_pad_clip_buffer_unlocked (pad);
3013
3014   if (pad->priv->clipped_buffer) {
3015     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3016   } else {
3017     buffer = NULL;
3018   }
3019   PAD_UNLOCK (pad);
3020
3021   return buffer;
3022 }
3023
3024 /**
3025  * gst_aggregator_pad_has_buffer:
3026  * @pad: the pad to check the buffer on
3027  *
3028  * This checks if a pad has a buffer available that will be returned by
3029  * a call to gst_aggregator_pad_peek_buffer() or
3030  * gst_aggregator_pad_pop_buffer().
3031  *
3032  * Returns: %TRUE if the pad has a buffer available as the next thing.
3033  *
3034  * Since: 1.14.1
3035  */
3036 gboolean
3037 gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
3038 {
3039   gboolean has_buffer;
3040
3041   PAD_LOCK (pad);
3042   gst_aggregator_pad_clip_buffer_unlocked (pad);
3043   has_buffer = (pad->priv->clipped_buffer != NULL);
3044   PAD_UNLOCK (pad);
3045
3046   return has_buffer;
3047 }
3048
3049 /**
3050  * gst_aggregator_pad_is_eos:
3051  * @pad: an aggregator pad
3052  *
3053  * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
3054  */
3055 gboolean
3056 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
3057 {
3058   gboolean is_eos;
3059
3060   PAD_LOCK (pad);
3061   is_eos = pad->priv->eos;
3062   PAD_UNLOCK (pad);
3063
3064   return is_eos;
3065 }
3066
3067 #if 0
3068 /*
3069  * gst_aggregator_merge_tags:
3070  * @self: a #GstAggregator
3071  * @tags: a #GstTagList to merge
3072  * @mode: the #GstTagMergeMode to use
3073  *
3074  * Adds tags to so-called pending tags, which will be processed
3075  * before pushing out data downstream.
3076  *
3077  * Note that this is provided for convenience, and the subclass is
3078  * not required to use this and can still do tag handling on its own.
3079  *
3080  * MT safe.
3081  */
3082 void
3083 gst_aggregator_merge_tags (GstAggregator * self,
3084     const GstTagList * tags, GstTagMergeMode mode)
3085 {
3086   GstTagList *otags;
3087
3088   g_return_if_fail (GST_IS_AGGREGATOR (self));
3089   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3090
3091   /* FIXME Check if we can use OBJECT lock here! */
3092   GST_OBJECT_LOCK (self);
3093   if (tags)
3094     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3095   otags = self->priv->tags;
3096   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3097   if (otags)
3098     gst_tag_list_unref (otags);
3099   self->priv->tags_changed = TRUE;
3100   GST_OBJECT_UNLOCK (self);
3101 }
3102 #endif
3103
3104 /**
3105  * gst_aggregator_set_latency:
3106  * @self: a #GstAggregator
3107  * @min_latency: minimum latency
3108  * @max_latency: maximum latency
3109  *
3110  * Lets #GstAggregator sub-classes tell the baseclass what their internal
3111  * latency is. Will also post a LATENCY message on the bus so the pipeline
3112  * can reconfigure its global latency.
3113  */
3114 void
3115 gst_aggregator_set_latency (GstAggregator * self,
3116     GstClockTime min_latency, GstClockTime max_latency)
3117 {
3118   gboolean changed = FALSE;
3119
3120   g_return_if_fail (GST_IS_AGGREGATOR (self));
3121   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3122   g_return_if_fail (max_latency >= min_latency);
3123
3124   SRC_LOCK (self);
3125   if (self->priv->sub_latency_min != min_latency) {
3126     self->priv->sub_latency_min = min_latency;
3127     changed = TRUE;
3128   }
3129   if (self->priv->sub_latency_max != max_latency) {
3130     self->priv->sub_latency_max = max_latency;
3131     changed = TRUE;
3132   }
3133
3134   if (changed)
3135     SRC_BROADCAST (self);
3136   SRC_UNLOCK (self);
3137
3138   if (changed) {
3139     gst_element_post_message (GST_ELEMENT_CAST (self),
3140         gst_message_new_latency (GST_OBJECT_CAST (self)));
3141   }
3142 }
3143
3144 /**
3145  * gst_aggregator_get_buffer_pool:
3146  * @self: a #GstAggregator
3147  *
3148  * Returns: (transfer full): the instance of the #GstBufferPool used
3149  * by @trans; free it after use it
3150  */
3151 GstBufferPool *
3152 gst_aggregator_get_buffer_pool (GstAggregator * self)
3153 {
3154   GstBufferPool *pool;
3155
3156   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3157
3158   GST_OBJECT_LOCK (self);
3159   pool = self->priv->pool;
3160   if (pool)
3161     gst_object_ref (pool);
3162   GST_OBJECT_UNLOCK (self);
3163
3164   return pool;
3165 }
3166
3167 /**
3168  * gst_aggregator_get_allocator:
3169  * @self: a #GstAggregator
3170  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3171  * used
3172  * @params: (out) (allow-none) (transfer full): the
3173  * #GstAllocationParams of @allocator
3174  *
3175  * Lets #GstAggregator sub-classes get the memory @allocator
3176  * acquired by the base class and its @params.
3177  *
3178  * Unref the @allocator after use it.
3179  */
3180 void
3181 gst_aggregator_get_allocator (GstAggregator * self,
3182     GstAllocator ** allocator, GstAllocationParams * params)
3183 {
3184   g_return_if_fail (GST_IS_AGGREGATOR (self));
3185
3186   if (allocator)
3187     *allocator = self->priv->allocator ?
3188         gst_object_ref (self->priv->allocator) : NULL;
3189
3190   if (params)
3191     *params = self->priv->allocation_params;
3192 }
3193
3194 /**
3195  * gst_aggregator_simple_get_next_time:
3196  * @self: A #GstAggregator
3197  *
3198  * This is a simple #GstAggregator::get_next_time implementation that
3199  * just looks at the #GstSegment on the srcpad of the aggregator and bases
3200  * the next time on the running there there.
3201  *
3202  * This is the desired behaviour in most cases where you have a live source
3203  * and you have a dead line based aggregator subclass.
3204  *
3205  * Returns: The running time based on the position
3206  *
3207  * Since: 1.16
3208  */
3209 GstClockTime
3210 gst_aggregator_simple_get_next_time (GstAggregator * self)
3211 {
3212   GstClockTime next_time;
3213   GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3214   GstSegment *segment = &srcpad->segment;
3215
3216   GST_OBJECT_LOCK (self);
3217   if (segment->position == -1 || segment->position < segment->start)
3218     next_time = segment->start;
3219   else
3220     next_time = segment->position;
3221
3222   if (segment->stop != -1 && next_time > segment->stop)
3223     next_time = segment->stop;
3224
3225   next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
3226   GST_OBJECT_UNLOCK (self);
3227
3228   return next_time;
3229 }