aggregator: Also release clipped buffer when releasing an aggregator pad
[platform/upstream/gstreamer.git] / subprojects / gstreamer / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: Base class for mixers and muxers, manages a set of input
26  *     pads and aggregates their streams
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass::aggregate virtual method.
34  *
35  *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36  *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
37  *    Subclasses should not overwrite those, but instead implement
38  *    #GstAggregatorClass::sink_event and #GstAggregatorClass::sink_query as
39  *    needed.
40  *
41  *  * When data is queued on all pads, the aggregate vmethod is called.
42  *
43  *  * One can peek at the data on any given GstAggregatorPad with the
44  *    gst_aggregator_pad_peek_buffer() method, and remove it from the pad
45  *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
46  *    has been taken with pop_buffer (), a new buffer can be queued
47  *    on that pad.
48  *
49  *  * When gst_aggregator_pad_peek_buffer() or gst_aggregator_pad_has_buffer()
50  *    are called, a reference is taken to the returned buffer, which stays
51  *    valid until either:
52  *
53  *      - gst_aggregator_pad_pop_buffer() is called, in which case the caller
54  *        is guaranteed that the buffer they receive is the same as the peeked
55  *        buffer.
56  *      - gst_aggregator_pad_drop_buffer() is called, in which case the caller
57  *        is guaranteed that the dropped buffer is the one that was peeked.
58  *      - the subclass implementation of #GstAggregatorClass.aggregate returns.
59  *
60  *    Subsequent calls to gst_aggregator_pad_peek_buffer() or
61  *    gst_aggregator_pad_has_buffer() return / check the same buffer that was
62  *    returned / checked, until one of the conditions listed above is met.
63  *
64  *    Subclasses are only allowed to call these methods from the aggregate
65  *    thread.
66  *
67  *  * If the subclass wishes to push a buffer downstream in its aggregate
68  *    implementation, it should do so through the
69  *    gst_aggregator_finish_buffer() method. This method will take care
70  *    of sending and ordering mandatory events such as stream start, caps
71  *    and segment. Buffer lists can also be pushed out with
72  *    gst_aggregator_finish_buffer_list().
73  *
74  *  * Same goes for EOS events, which should not be pushed directly by the
75  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
76  *    implementation.
77  *
78  *  * Note that the aggregator logic regarding gap event handling is to turn
79  *    these into gap buffers with matching PTS and duration. It will also
80  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
81  *    to ease their identification and subsequent processing.
82  *    In addition, if the gap event was flagged with GST_GAP_FLAG_MISSING_DATA,
83  *    a custom meta is added to the resulting gap buffer (GstAggregatorMissingDataMeta).
84  *
85  *  * Subclasses must use (a subclass of) #GstAggregatorPad for both their
86  *    sink and source pads.
87  *    See gst_element_class_add_static_pad_template_with_gtype().
88  *
89  * This class used to live in gst-plugins-bad and was moved to core.
90  *
91  * Since: 1.14
92  */
93
94 /**
95  * SECTION: gstaggregatorpad
96  * @title: GstAggregatorPad
97  * @short_description: #GstPad subclass for pads managed by #GstAggregator
98  * @see_also: gstcollectpads for historical reasons.
99  *
100  * Pads managed by a #GstAggregator subclass.
101  *
102  * This class used to live in gst-plugins-bad and was moved to core.
103  *
104  * Since: 1.14
105  */
106
107 #ifdef HAVE_CONFIG_H
108 #  include "config.h"
109 #endif
110
111 #include <string.h>             /* strlen */
112
113 #include "gstaggregator.h"
114
115 GType
116 gst_aggregator_start_time_selection_get_type (void)
117 {
118   static GType gtype = 0;
119
120   if (g_once_init_enter (&gtype)) {
121     static const GEnumValue values[] = {
122       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
123           "GST_AGGREGATOR_START_TIME_SELECTION_ZERO", "zero"},
124       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
125           "GST_AGGREGATOR_START_TIME_SELECTION_FIRST", "first"},
126       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
127           "GST_AGGREGATOR_START_TIME_SELECTION_SET", "set"},
128       {0, NULL, NULL}
129     };
130     GType new_type =
131         g_enum_register_static ("GstAggregatorStartTimeSelection", values);
132
133     g_once_init_leave (&gtype, new_type);
134   }
135   return gtype;
136 }
137
138 /*  Might become API */
139 #if 0
140 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
141     const GstTagList * tags, GstTagMergeMode mode);
142 #endif
143 static void gst_aggregator_set_latency_property (GstAggregator * agg,
144     GstClockTime latency);
145 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
146
147 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
148
149 static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
150     GstBuffer * buffer, gboolean dequeued);
151
152 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
153 #define GST_CAT_DEFAULT aggregator_debug
154
155 /* Locking order, locks in this element must always be taken in this order
156  *
157  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
158  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
159  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
160  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
161  * standard element object lock -> GST_OBJECT_LOCK(agg)
162  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
163  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
164  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
165  */
166
167 /* GstAggregatorPad definitions */
168 #define PAD_LOCK(pad)   G_STMT_START {                                  \
169   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
170         g_thread_self());                                               \
171   g_mutex_lock(&pad->priv->lock);                                       \
172   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
173         g_thread_self());                                               \
174   } G_STMT_END
175
176 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
177   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
178       g_thread_self());                                                 \
179   g_mutex_unlock(&pad->priv->lock);                                     \
180   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
181         g_thread_self());                                               \
182   } G_STMT_END
183
184
185 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
186   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
187         g_thread_self());                                               \
188   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
189       (&((GstAggregatorPad*)pad)->priv->lock));                         \
190   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
191         g_thread_self());                                               \
192   } G_STMT_END
193
194 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
195   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
196         g_thread_self());                                              \
197   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
198   } G_STMT_END
199
200
201 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
202   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
203         g_thread_self());                                               \
204   g_mutex_lock(&pad->priv->flush_lock);                                 \
205   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
206         g_thread_self());                                               \
207   } G_STMT_END
208
209 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
210   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
211         g_thread_self());                                               \
212   g_mutex_unlock(&pad->priv->flush_lock);                               \
213   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
214         g_thread_self());                                               \
215   } G_STMT_END
216
217 #define SRC_LOCK(self)   G_STMT_START {                             \
218   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
219       g_thread_self());                                             \
220   g_mutex_lock(&self->priv->src_lock);                              \
221   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
222         g_thread_self());                                           \
223   } G_STMT_END
224
225 #define SRC_UNLOCK(self)  G_STMT_START {                            \
226   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
227         g_thread_self());                                           \
228   g_mutex_unlock(&self->priv->src_lock);                            \
229   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
230         g_thread_self());                                           \
231   } G_STMT_END
232
233 #define SRC_WAIT(self) G_STMT_START {                               \
234   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
235         g_thread_self());                                           \
236   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
237   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
238         g_thread_self());                                           \
239   } G_STMT_END
240
241 #define SRC_BROADCAST(self) G_STMT_START {                          \
242     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
243         g_thread_self());                                           \
244     if (self->priv->aggregate_id)                                   \
245       gst_clock_id_unschedule (self->priv->aggregate_id);           \
246     g_cond_broadcast(&(self->priv->src_cond));                      \
247   } G_STMT_END
248
249 struct _GstAggregatorPadPrivate
250 {
251   /* Following fields are protected by the PAD_LOCK */
252   GstFlowReturn flow_return;
253
254   guint32 last_flush_start_seqnum;
255   guint32 last_flush_stop_seqnum;
256
257   /* Whether the pad hasn't received a first buffer yet */
258   gboolean first_buffer;
259   /* Whether we waited once for the pad's first buffer */
260   gboolean waited_once;
261
262   GQueue data;                  /* buffers, events and queries */
263   GstBuffer *clipped_buffer;
264   guint num_buffers;
265   GstBuffer *peeked_buffer;
266
267   /* used to track fill state of queues, only used with live-src and when
268    * latency property is set to > 0 */
269   GstClockTime head_position;
270   GstClockTime tail_position;
271   GstClockTime head_time;       /* running time */
272   GstClockTime tail_time;
273   GstClockTime time_level;      /* how much head is ahead of tail */
274   GstSegment head_segment;      /* segment before the queue */
275
276   gboolean negotiated;
277
278   gboolean eos;
279
280   GMutex lock;
281   GCond event_cond;
282   /* This lock prevents a flush start processing happening while
283    * the chain function is also happening.
284    */
285   GMutex flush_lock;
286
287   /* properties */
288   gboolean emit_signals;
289 };
290
291 /* Must be called with PAD_LOCK held */
292 static void
293 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
294 {
295   aggpad->priv->eos = FALSE;
296   aggpad->priv->flow_return = GST_FLOW_OK;
297   GST_OBJECT_LOCK (aggpad);
298   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
299   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
300   GST_OBJECT_UNLOCK (aggpad);
301   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
302   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
303   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
304   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
305   aggpad->priv->time_level = 0;
306   aggpad->priv->first_buffer = TRUE;
307   aggpad->priv->waited_once = FALSE;
308 }
309
310 static gboolean
311 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
312 {
313   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
314
315   PAD_LOCK (aggpad);
316   gst_aggregator_pad_reset_unlocked (aggpad);
317   PAD_UNLOCK (aggpad);
318
319   if (klass->flush)
320     return (klass->flush (aggpad, agg) == GST_FLOW_OK);
321
322   return TRUE;
323 }
324
325 /**
326  * gst_aggregator_peek_next_sample:
327  *
328  * Use this function to determine what input buffers will be aggregated
329  * to produce the next output buffer. This should only be called from
330  * a #GstAggregator::samples-selected handler, and can be used to precisely
331  * control aggregating parameters for a given set of input samples.
332  *
333  * Returns: (nullable) (transfer full): The sample that is about to be aggregated. It may hold a #GstBuffer
334  *   or a #GstBufferList. The contents of its info structure is subclass-dependent,
335  *   and documented on a subclass basis. The buffers held by the sample are
336  *   not writable.
337  * Since: 1.18
338  */
339 GstSample *
340 gst_aggregator_peek_next_sample (GstAggregator * agg, GstAggregatorPad * aggpad)
341 {
342   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (agg);
343
344   if (klass->peek_next_sample)
345     return (klass->peek_next_sample (agg, aggpad));
346
347   return NULL;
348 }
349
350 /*************************************
351  * GstAggregator implementation  *
352  *************************************/
353 static GstElementClass *aggregator_parent_class = NULL;
354 static gint aggregator_private_offset = 0;
355
356 /* All members are protected by the object lock unless otherwise noted */
357
358 struct _GstAggregatorPrivate
359 {
360   gint max_padserial;
361
362   /* Our state is >= PAUSED */
363   gboolean running;             /* protected by src_lock */
364
365   /* seqnum from last seek or common seqnum to flush start events received
366    * on all pads, for flushing without a seek */
367   guint32 next_seqnum;
368   /* seqnum to apply to synthetic segment/eos events */
369   guint32 seqnum;
370   gboolean send_stream_start;   /* protected by srcpad stream lock */
371   gboolean send_segment;
372   gboolean flushing;
373   gboolean send_eos;            /* protected by srcpad stream lock */
374   gboolean got_eos_event;       /* protected by srcpad stream lock */
375
376   GstCaps *srccaps;             /* protected by the srcpad stream lock */
377
378   GstTagList *tags;
379   gboolean tags_changed;
380
381   gboolean peer_latency_live;   /* protected by src_lock */
382   GstClockTime peer_latency_min;        /* protected by src_lock */
383   GstClockTime peer_latency_max;        /* protected by src_lock */
384   gboolean has_peer_latency;    /* protected by src_lock */
385
386   GstClockTime sub_latency_min; /* protected by src_lock */
387   GstClockTime sub_latency_max; /* protected by src_lock */
388   /* Tracks whether the latency message was posted at least once */
389   gboolean posted_latency_msg;
390
391   GstClockTime upstream_latency_min;    /* protected by src_lock */
392
393   /* aggregate */
394   GstClockID aggregate_id;      /* protected by src_lock */
395   gboolean selected_samples_called_or_warned;   /* protected by src_lock */
396   GMutex src_lock;
397   GCond src_cond;
398
399   gboolean first_buffer;        /* protected by object lock */
400   GstAggregatorStartTimeSelection start_time_selection;
401   GstClockTime start_time;
402
403   /* protected by the object lock */
404   GstQuery *allocation_query;
405   GstAllocator *allocator;
406   GstBufferPool *pool;
407   GstAllocationParams allocation_params;
408
409   /* properties */
410   gint64 latency;               /* protected by both src_lock and all pad locks */
411   gboolean emit_signals;
412   gboolean ignore_inactive_pads;
413   gboolean force_live;          /* Construct only, doesn't need any locking */
414 };
415
416 /* With SRC_LOCK */
417 static gboolean
418 is_live_unlocked (GstAggregator * self)
419 {
420   return self->priv->peer_latency_live || self->priv->force_live;
421 }
422
423 /* Seek event forwarding helper */
424 typedef struct
425 {
426   /* parameters */
427   GstEvent *event;
428   gboolean flush;
429   gboolean only_to_active_pads;
430
431   /* results */
432   gboolean result;
433   gboolean one_actually_seeked;
434 } EventData;
435
436 #define DEFAULT_LATENCY              0
437 #define DEFAULT_MIN_UPSTREAM_LATENCY              0
438 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
439 #define DEFAULT_START_TIME           (-1)
440 #define DEFAULT_EMIT_SIGNALS         FALSE
441 #define DEFAULT_FORCE_LIVE           FALSE
442
443 enum
444 {
445   PROP_0,
446   PROP_LATENCY,
447   PROP_MIN_UPSTREAM_LATENCY,
448   PROP_START_TIME_SELECTION,
449   PROP_START_TIME,
450   PROP_EMIT_SIGNALS,
451   PROP_LAST
452 };
453
454 enum
455 {
456   SIGNAL_SAMPLES_SELECTED,
457   LAST_SIGNAL,
458 };
459
460 static guint gst_aggregator_signals[LAST_SIGNAL] = { 0 };
461
462 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
463     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
464
465 static gboolean
466 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
467 {
468   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
469       pad->priv->clipped_buffer == NULL);
470 }
471
472 /* Will return FALSE if there's no buffer available on every non-EOS pad, or
473  * if at least one of the pads has an event or query at the top of its queue.
474  *
475  * Only returns TRUE if all non-EOS pads have a buffer available at the top of
476  * their queue or a clipped buffer already.
477  */
478 static gboolean
479 gst_aggregator_check_pads_ready (GstAggregator * self,
480     gboolean * have_event_or_query_ret)
481 {
482   GstAggregatorPad *pad = NULL;
483   GList *l, *sinkpads;
484   gboolean have_buffer = TRUE;
485   gboolean have_event_or_query = FALSE;
486   guint n_ready = 0;
487
488   GST_LOG_OBJECT (self, "checking pads");
489
490   GST_OBJECT_LOCK (self);
491
492   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
493   if (sinkpads == NULL)
494     goto no_sinkpads;
495
496   for (l = sinkpads; l != NULL; l = l->next) {
497     pad = l->data;
498
499     PAD_LOCK (pad);
500
501     /* If there's an event or query at the top of the queue and we don't yet
502      * have taken the top buffer out and stored it as clip_buffer, remember
503      * that and exit the loop. We first have to handle all events/queries
504      * before we handle any buffers. */
505     if (!pad->priv->clipped_buffer
506         && (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))
507             || GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))) {
508       PAD_UNLOCK (pad);
509       have_event_or_query = TRUE;
510       GST_LOG_OBJECT (pad, "Have pending serialized query/event");
511       break;
512     }
513
514     if (self->priv->ignore_inactive_pads && is_live_unlocked (self) &&
515         pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) {
516       PAD_UNLOCK (pad);
517       GST_LOG_OBJECT (pad, "Ignoring inactive pad");
518       continue;
519     }
520
521     /* Otherwise check if we have a clipped buffer or a buffer at the top of
522      * the queue, and if not then this pad is not ready unless it is also EOS */
523     if (!pad->priv->clipped_buffer
524         && !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
525       /* We must not have any buffers at all in this pad then as otherwise we
526        * would've had an event/query at the top of the queue */
527       g_assert (pad->priv->num_buffers == 0);
528
529       /* Only consider this pad as worth waiting for if it's not already EOS.
530        * There's no point in waiting for buffers on EOS pads */
531       if (!pad->priv->eos) {
532         GST_LOG_OBJECT (pad, "Have no buffer and not EOS yet");
533         have_buffer = FALSE;
534       } else {
535         GST_LOG_OBJECT (pad, "Have no buffer and already EOS");
536         n_ready++;
537       }
538     } else {
539       GST_TRACE_OBJECT (pad, "Have %" GST_TIME_FORMAT " queued in %u buffers",
540           GST_TIME_ARGS (pad->priv->time_level), pad->priv->num_buffers);
541       if (is_live_unlocked (self)) {
542         /* In live mode, having a single pad with buffers is enough to
543          * generate a start time from it. In non-live mode all pads need
544          * to have a buffer
545          */
546         self->priv->first_buffer = FALSE;
547         n_ready++;
548       }
549     }
550
551     PAD_UNLOCK (pad);
552   }
553
554   if (self->priv->ignore_inactive_pads && is_live_unlocked (self)
555       && n_ready == 0)
556     goto no_sinkpads;
557
558   if (have_event_or_query)
559     goto pad_not_ready_but_event_or_query;
560
561   if (!have_buffer)
562     goto pad_not_ready;
563
564   if (have_buffer)
565     self->priv->first_buffer = FALSE;
566
567   GST_OBJECT_UNLOCK (self);
568   GST_LOG_OBJECT (self, "pads are ready");
569
570   if (have_event_or_query_ret)
571     *have_event_or_query_ret = have_event_or_query;
572
573   return TRUE;
574
575 no_sinkpads:
576   {
577     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
578     GST_OBJECT_UNLOCK (self);
579
580     if (have_event_or_query_ret)
581       *have_event_or_query_ret = have_event_or_query;
582
583     return FALSE;
584   }
585 pad_not_ready:
586   {
587     GST_LOG_OBJECT (self, "pad not ready to be aggregated yet");
588     GST_OBJECT_UNLOCK (self);
589
590     if (have_event_or_query_ret)
591       *have_event_or_query_ret = have_event_or_query;
592
593     return FALSE;
594   }
595 pad_not_ready_but_event_or_query:
596   {
597     GST_LOG_OBJECT (self,
598         "pad not ready to be aggregated yet, need to handle serialized event or query first");
599     GST_OBJECT_UNLOCK (self);
600
601     if (have_event_or_query_ret)
602       *have_event_or_query_ret = have_event_or_query;
603
604     return FALSE;
605   }
606 }
607
608 static void
609 gst_aggregator_reset_flow_values (GstAggregator * self)
610 {
611   GST_OBJECT_LOCK (self);
612   self->priv->send_stream_start = TRUE;
613   self->priv->send_segment = TRUE;
614   gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
615       GST_FORMAT_TIME);
616   /* Initialize to -1 so we set it to the start position once the first buffer
617    * is handled in gst_aggregator_pad_chain_internal() */
618   GST_AGGREGATOR_PAD (self->srcpad)->segment.position = -1;
619   self->priv->first_buffer = TRUE;
620   GST_OBJECT_UNLOCK (self);
621 }
622
623 static inline void
624 gst_aggregator_push_mandatory_events (GstAggregator * self, gboolean up_to_caps)
625 {
626   GstAggregatorPrivate *priv = self->priv;
627   GstEvent *segment = NULL;
628   GstEvent *tags = NULL;
629
630   if (self->priv->send_stream_start) {
631     gchar s_id[32];
632
633     GST_INFO_OBJECT (self, "pushing stream start");
634     /* stream-start (FIXME: create id based on input ids) */
635     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
636     if (!gst_pad_push_event (GST_PAD (self->srcpad),
637             gst_event_new_stream_start (s_id))) {
638       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
639     }
640     self->priv->send_stream_start = FALSE;
641   }
642
643   if (self->priv->srccaps) {
644     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
645         self->priv->srccaps);
646     if (!gst_pad_push_event (GST_PAD (self->srcpad),
647             gst_event_new_caps (self->priv->srccaps))) {
648       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
649     }
650     gst_caps_unref (self->priv->srccaps);
651     self->priv->srccaps = NULL;
652   }
653
654   if (up_to_caps)
655     return;
656
657   GST_OBJECT_LOCK (self);
658   if (self->priv->send_segment && !self->priv->flushing) {
659     segment =
660         gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
661
662     if (!self->priv->seqnum)
663       /* This code-path is in preparation to be able to run without a source
664        * connected. Then we won't have a seq-num from a segment event. */
665       self->priv->seqnum = gst_event_get_seqnum (segment);
666     else
667       gst_event_set_seqnum (segment, self->priv->seqnum);
668     self->priv->send_segment = FALSE;
669
670     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
671   }
672
673   if (priv->tags && priv->tags_changed && !self->priv->flushing) {
674     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
675     priv->tags_changed = FALSE;
676   }
677   GST_OBJECT_UNLOCK (self);
678
679   if (segment)
680     gst_pad_push_event (self->srcpad, segment);
681   if (tags)
682     gst_pad_push_event (self->srcpad, tags);
683 }
684
685 /**
686  * gst_aggregator_set_src_caps:
687  * @self: The #GstAggregator
688  * @caps: The #GstCaps to set on the src pad.
689  *
690  * Sets the caps to be used on the src pad.
691  */
692 void
693 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
694 {
695   GstCaps *old_caps;
696
697   GST_PAD_STREAM_LOCK (self->srcpad);
698
699   if (caps && (old_caps = gst_pad_get_current_caps (self->srcpad))) {
700     if (gst_caps_is_equal (caps, old_caps)) {
701       GST_DEBUG_OBJECT (self,
702           "New caps are the same as the previously set caps %" GST_PTR_FORMAT,
703           old_caps);
704       gst_caps_unref (old_caps);
705       GST_PAD_STREAM_UNLOCK (self->srcpad);
706       return;
707     }
708     gst_caps_unref (old_caps);
709   }
710
711   gst_caps_replace (&self->priv->srccaps, caps);
712   gst_aggregator_push_mandatory_events (self, TRUE);
713   GST_PAD_STREAM_UNLOCK (self->srcpad);
714 }
715
716 static GstFlowReturn
717 gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
718 {
719   gst_aggregator_push_mandatory_events (self, FALSE);
720
721   GST_OBJECT_LOCK (self);
722   if (!self->priv->flushing && gst_pad_is_active (self->srcpad)) {
723     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
724     GST_OBJECT_UNLOCK (self);
725     return gst_pad_push (self->srcpad, buffer);
726   } else {
727     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
728         gst_pad_is_active (self->srcpad), self->priv->flushing);
729     GST_OBJECT_UNLOCK (self);
730     gst_buffer_unref (buffer);
731     return GST_FLOW_OK;
732   }
733 }
734
735 /**
736  * gst_aggregator_finish_buffer:
737  * @aggregator: The #GstAggregator
738  * @buffer: (transfer full): the #GstBuffer to push.
739  *
740  * This method will push the provided output buffer downstream. If needed,
741  * mandatory events such as stream-start, caps, and segment events will be
742  * sent before pushing the buffer.
743  */
744 GstFlowReturn
745 gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
746 {
747   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
748
749   g_assert (klass->finish_buffer != NULL);
750
751   return klass->finish_buffer (aggregator, buffer);
752 }
753
754 static GstFlowReturn
755 gst_aggregator_default_finish_buffer_list (GstAggregator * self,
756     GstBufferList * bufferlist)
757 {
758   gst_aggregator_push_mandatory_events (self, FALSE);
759
760   GST_OBJECT_LOCK (self);
761   if (!self->priv->flushing && gst_pad_is_active (self->srcpad)) {
762     GST_TRACE_OBJECT (self, "pushing bufferlist%" GST_PTR_FORMAT, bufferlist);
763     GST_OBJECT_UNLOCK (self);
764     return gst_pad_push_list (self->srcpad, bufferlist);
765   } else {
766     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
767         gst_pad_is_active (self->srcpad), self->priv->flushing);
768     GST_OBJECT_UNLOCK (self);
769     gst_buffer_list_unref (bufferlist);
770     return GST_FLOW_OK;
771   }
772 }
773
774 /**
775  * gst_aggregator_finish_buffer_list:
776  * @aggregator: The #GstAggregator
777  * @bufferlist: (transfer full): the #GstBufferList to push.
778  *
779  * This method will push the provided output buffer list downstream. If needed,
780  * mandatory events such as stream-start, caps, and segment events will be
781  * sent before pushing the buffer.
782  *
783  * Since: 1.18
784  */
785 GstFlowReturn
786 gst_aggregator_finish_buffer_list (GstAggregator * aggregator,
787     GstBufferList * bufferlist)
788 {
789   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
790
791   g_assert (klass->finish_buffer_list != NULL);
792
793   return klass->finish_buffer_list (aggregator, bufferlist);
794 }
795
796 static void
797 gst_aggregator_push_eos (GstAggregator * self)
798 {
799   GstEvent *event;
800   gst_aggregator_push_mandatory_events (self, FALSE);
801
802   event = gst_event_new_eos ();
803
804   GST_OBJECT_LOCK (self);
805   self->priv->send_eos = FALSE;
806   gst_event_set_seqnum (event, self->priv->seqnum);
807   GST_OBJECT_UNLOCK (self);
808
809   gst_pad_push_event (self->srcpad, event);
810 }
811
812 static GstClockTime
813 gst_aggregator_get_next_time (GstAggregator * self)
814 {
815   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
816
817   if (klass->get_next_time)
818     return klass->get_next_time (self);
819
820   return GST_CLOCK_TIME_NONE;
821 }
822
823 static gboolean
824 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
825 {
826   GstClockTime latency;
827   GstClockTime start;
828   gboolean res;
829   gboolean have_event_or_query = FALSE;
830
831   *timeout = FALSE;
832
833   SRC_LOCK (self);
834
835   latency = gst_aggregator_get_latency_unlocked (self);
836
837   if (gst_aggregator_check_pads_ready (self, &have_event_or_query)) {
838     GST_DEBUG_OBJECT (self, "all pads have data");
839     SRC_UNLOCK (self);
840
841     return TRUE;
842   }
843
844   /* If we have an event or query, immediately return FALSE instead of waiting
845    * and handle it immediately */
846   if (have_event_or_query) {
847     GST_DEBUG_OBJECT (self, "Have serialized event or query to handle first");
848     SRC_UNLOCK (self);
849     return FALSE;
850   }
851
852   /* Before waiting, check if we're actually still running */
853   if (!self->priv->running || !self->priv->send_eos) {
854     SRC_UNLOCK (self);
855
856     return FALSE;
857   }
858
859   start = gst_aggregator_get_next_time (self);
860
861   /* If we're not live, or if we use the running time
862    * of the first buffer as start time, we wait until
863    * all pads have buffers.
864    * Otherwise (i.e. if we are live!), we wait on the clock
865    * and if a pad does not have a buffer in time we ignore
866    * that pad.
867    */
868   GST_OBJECT_LOCK (self);
869   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
870       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
871       !GST_CLOCK_TIME_IS_VALID (start) ||
872       (self->priv->first_buffer
873           && self->priv->start_time_selection ==
874           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
875     /* We wake up here when something happened, and below
876      * then check if we're ready now. If we return FALSE,
877      * we will be directly called again.
878      */
879     GST_OBJECT_UNLOCK (self);
880     SRC_WAIT (self);
881   } else {
882     GstClockTime base_time, time;
883     GstClock *clock;
884     GstClockReturn status;
885     GstClockTimeDiff jitter;
886
887     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
888         GST_TIME_ARGS (start));
889
890     base_time = GST_ELEMENT_CAST (self)->base_time;
891     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
892     GST_OBJECT_UNLOCK (self);
893
894     time = base_time + start;
895     time += latency;
896
897     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
898         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
899         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
900         GST_TIME_ARGS (time),
901         GST_TIME_ARGS (base_time),
902         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
903         GST_TIME_ARGS (gst_clock_get_time (clock)));
904
905     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
906     gst_object_unref (clock);
907     SRC_UNLOCK (self);
908
909     jitter = 0;
910     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
911
912     SRC_LOCK (self);
913     if (self->priv->aggregate_id) {
914       gst_clock_id_unref (self->priv->aggregate_id);
915       self->priv->aggregate_id = NULL;
916     }
917
918     GST_DEBUG_OBJECT (self,
919         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
920         status, GST_STIME_ARGS (jitter));
921
922     /* we timed out */
923     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
924       GList *l;
925
926       GST_OBJECT_LOCK (self);
927       for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
928         GstAggregatorPad *pad = GST_AGGREGATOR_PAD (l->data);
929
930         PAD_LOCK (pad);
931         pad->priv->waited_once = TRUE;
932         PAD_UNLOCK (pad);
933       }
934       GST_OBJECT_UNLOCK (self);
935
936       SRC_UNLOCK (self);
937       *timeout = TRUE;
938       return TRUE;
939     }
940   }
941
942   res = gst_aggregator_check_pads_ready (self, NULL);
943   SRC_UNLOCK (self);
944
945   return res;
946 }
947
948 typedef struct
949 {
950   gboolean processed_event;
951   GstFlowReturn flow_ret;
952 } DoHandleEventsAndQueriesData;
953
954 static gboolean
955 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
956     gpointer user_data)
957 {
958   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
959   GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
960   GstEvent *event = NULL;
961   GstQuery *query = NULL;
962   GstAggregatorClass *klass = NULL;
963   DoHandleEventsAndQueriesData *data = user_data;
964
965   do {
966     event = NULL;
967     query = NULL;
968
969     PAD_LOCK (pad);
970     if (pad->priv->clipped_buffer == NULL &&
971         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
972       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
973         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
974       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
975         query = g_queue_peek_tail (&pad->priv->data);
976     }
977     PAD_UNLOCK (pad);
978     if (event || query) {
979       gboolean ret;
980
981       data->processed_event = TRUE;
982       if (klass == NULL)
983         klass = GST_AGGREGATOR_GET_CLASS (self);
984
985       if (event) {
986         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
987         gst_event_ref (event);
988         ret = klass->sink_event (aggregator, pad, event);
989
990         PAD_LOCK (pad);
991         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
992           pad->priv->negotiated = ret;
993         }
994         if (g_queue_peek_tail (&pad->priv->data) == event)
995           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
996         gst_event_unref (event);
997       } else if (query) {
998         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
999         ret = klass->sink_query (aggregator, pad, query);
1000
1001         PAD_LOCK (pad);
1002         if (g_queue_peek_tail (&pad->priv->data) == query) {
1003           GstStructure *s;
1004
1005           s = gst_query_writable_structure (query);
1006           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
1007               NULL);
1008           g_queue_pop_tail (&pad->priv->data);
1009         }
1010       }
1011
1012       PAD_BROADCAST_EVENT (pad);
1013       PAD_UNLOCK (pad);
1014     }
1015   } while (event || query);
1016
1017   return TRUE;
1018 }
1019
1020 static gboolean
1021 gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
1022     gpointer user_data)
1023 {
1024   GList *item;
1025   GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
1026   GstAggregator *agg = (GstAggregator *) self;
1027   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
1028
1029   if (!klass->skip_buffer)
1030     return FALSE;
1031
1032   PAD_LOCK (aggpad);
1033
1034   item = g_queue_peek_tail_link (&aggpad->priv->data);
1035   while (item) {
1036     GList *prev = item->prev;
1037
1038     if (GST_IS_BUFFER (item->data)
1039         && klass->skip_buffer (aggpad, agg, item->data)) {
1040       GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
1041       gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data),
1042           TRUE);
1043       gst_buffer_unref (item->data);
1044       g_queue_delete_link (&aggpad->priv->data, item);
1045     } else {
1046       break;
1047     }
1048
1049     item = prev;
1050   }
1051
1052   PAD_UNLOCK (aggpad);
1053
1054   return TRUE;
1055 }
1056
1057 static gboolean
1058 gst_aggregator_pad_reset_peeked_buffer (GstElement * self, GstPad * epad,
1059     gpointer user_data)
1060 {
1061   GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
1062
1063   PAD_LOCK (aggpad);
1064
1065   gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
1066
1067   PAD_UNLOCK (aggpad);
1068
1069   return TRUE;
1070 }
1071
1072
1073 static void
1074 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
1075     GstFlowReturn flow_return, gboolean full)
1076 {
1077   GList *item;
1078
1079   PAD_LOCK (aggpad);
1080   if (flow_return == GST_FLOW_NOT_LINKED)
1081     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
1082   else
1083     aggpad->priv->flow_return = flow_return;
1084
1085   item = g_queue_peek_head_link (&aggpad->priv->data);
1086   while (item) {
1087     GList *next = item->next;
1088
1089     /* In partial flush, we do like the pad, we get rid of non-sticky events
1090      * and EOS/SEGMENT.
1091      */
1092     if (full || GST_IS_BUFFER (item->data) ||
1093         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
1094         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
1095         !GST_EVENT_IS_STICKY (item->data)) {
1096       if (!GST_IS_QUERY (item->data))
1097         gst_mini_object_unref (item->data);
1098       g_queue_delete_link (&aggpad->priv->data, item);
1099     }
1100     item = next;
1101   }
1102   aggpad->priv->num_buffers = 0;
1103   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
1104
1105   PAD_BROADCAST_EVENT (aggpad);
1106   PAD_UNLOCK (aggpad);
1107 }
1108
1109 static GstFlowReturn
1110 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
1111     GstCaps ** ret)
1112 {
1113   *ret = gst_caps_ref (caps);
1114
1115   return GST_FLOW_OK;
1116 }
1117
1118 static GstCaps *
1119 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
1120 {
1121   caps = gst_caps_fixate (caps);
1122
1123   return caps;
1124 }
1125
1126 static gboolean
1127 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
1128 {
1129   return TRUE;
1130 }
1131
1132
1133 /* takes ownership of the pool, allocator and query */
1134 static gboolean
1135 gst_aggregator_set_allocation (GstAggregator * self,
1136     GstBufferPool * pool, GstAllocator * allocator,
1137     const GstAllocationParams * params, GstQuery * query)
1138 {
1139   GstAllocator *oldalloc;
1140   GstBufferPool *oldpool;
1141   GstQuery *oldquery;
1142
1143   GST_DEBUG ("storing allocation query");
1144
1145   GST_OBJECT_LOCK (self);
1146   oldpool = self->priv->pool;
1147   self->priv->pool = pool;
1148
1149   oldalloc = self->priv->allocator;
1150   self->priv->allocator = allocator;
1151
1152   oldquery = self->priv->allocation_query;
1153   self->priv->allocation_query = query;
1154
1155   if (params)
1156     self->priv->allocation_params = *params;
1157   else
1158     gst_allocation_params_init (&self->priv->allocation_params);
1159   GST_OBJECT_UNLOCK (self);
1160
1161   if (oldpool) {
1162     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
1163     gst_buffer_pool_set_active (oldpool, FALSE);
1164     gst_object_unref (oldpool);
1165   }
1166   if (oldalloc) {
1167     gst_object_unref (oldalloc);
1168   }
1169   if (oldquery) {
1170     gst_query_unref (oldquery);
1171   }
1172   return TRUE;
1173 }
1174
1175
1176 static gboolean
1177 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
1178 {
1179   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
1180
1181   if (aggclass->decide_allocation)
1182     if (!aggclass->decide_allocation (self, query))
1183       return FALSE;
1184
1185   return TRUE;
1186 }
1187
1188 static gboolean
1189 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
1190 {
1191   GstQuery *query;
1192   gboolean result = TRUE;
1193   GstBufferPool *pool = NULL;
1194   GstAllocator *allocator;
1195   GstAllocationParams params;
1196
1197   /* find a pool for the negotiated caps now */
1198   GST_DEBUG_OBJECT (self, "doing allocation query");
1199   query = gst_query_new_allocation (caps, TRUE);
1200   if (!gst_pad_peer_query (self->srcpad, query)) {
1201     /* not a problem, just debug a little */
1202     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
1203   }
1204
1205   GST_DEBUG_OBJECT (self, "calling decide_allocation");
1206   result = gst_aggregator_decide_allocation (self, query);
1207
1208   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
1209       query);
1210
1211   if (!result)
1212     goto no_decide_allocation;
1213
1214   /* we got configuration from our peer or the decide_allocation method,
1215    * parse them */
1216   if (gst_query_get_n_allocation_params (query) > 0) {
1217     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
1218   } else {
1219     allocator = NULL;
1220     gst_allocation_params_init (&params);
1221   }
1222
1223   if (gst_query_get_n_allocation_pools (query) > 0)
1224     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
1225
1226   /* now store */
1227   result =
1228       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
1229
1230   return result;
1231
1232   /* Errors */
1233 no_decide_allocation:
1234   {
1235     GST_WARNING_OBJECT (self, "Failed to decide allocation");
1236     gst_query_unref (query);
1237
1238     return result;
1239   }
1240
1241 }
1242
1243 static gboolean
1244 gst_aggregator_default_negotiate (GstAggregator * self)
1245 {
1246   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1247   GstCaps *downstream_caps, *template_caps, *caps = NULL;
1248   GstFlowReturn ret = GST_FLOW_OK;
1249
1250   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1251   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1252
1253   if (gst_caps_is_empty (downstream_caps)) {
1254     GST_INFO_OBJECT (self, "Downstream caps (%"
1255         GST_PTR_FORMAT ") not compatible with pad template caps (%"
1256         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1257     ret = GST_FLOW_NOT_NEGOTIATED;
1258     goto done;
1259   }
1260
1261   g_assert (agg_klass->update_src_caps);
1262   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1263       downstream_caps);
1264   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1265   if (ret < GST_FLOW_OK) {
1266     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1267     goto done;
1268   } else if (ret == GST_AGGREGATOR_FLOW_NEED_DATA) {
1269     GST_DEBUG_OBJECT (self, "Subclass needs more data to decide on caps");
1270     goto done;
1271   }
1272   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1273     ret = GST_FLOW_NOT_NEGOTIATED;
1274     goto done;
1275   }
1276   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1277
1278 #ifdef GST_ENABLE_EXTRA_CHECKS
1279   if (!gst_caps_is_subset (caps, template_caps)) {
1280     GstCaps *intersection;
1281
1282     GST_ERROR_OBJECT (self,
1283         "update_src_caps returned caps %" GST_PTR_FORMAT
1284         " which are not a real subset of the template caps %"
1285         GST_PTR_FORMAT, caps, template_caps);
1286     g_warning ("%s: update_src_caps returned caps which are not a real "
1287         "subset of the filter caps", GST_ELEMENT_NAME (self));
1288
1289     intersection =
1290         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1291     gst_caps_unref (caps);
1292     caps = intersection;
1293   }
1294 #endif
1295
1296   if (gst_caps_is_any (caps)) {
1297     goto done;
1298   }
1299
1300   if (!gst_caps_is_fixed (caps)) {
1301     g_assert (agg_klass->fixate_src_caps);
1302
1303     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1304     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1305       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1306       ret = GST_FLOW_NOT_NEGOTIATED;
1307       goto done;
1308     }
1309     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1310   }
1311
1312   if (agg_klass->negotiated_src_caps) {
1313     if (!agg_klass->negotiated_src_caps (self, caps)) {
1314       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1315       ret = GST_FLOW_NOT_NEGOTIATED;
1316       goto done;
1317     }
1318   }
1319
1320   gst_aggregator_set_src_caps (self, caps);
1321
1322   if (!gst_aggregator_do_allocation (self, caps)) {
1323     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1324     ret = GST_FLOW_NOT_NEGOTIATED;
1325   }
1326
1327 done:
1328   gst_caps_unref (downstream_caps);
1329   gst_caps_unref (template_caps);
1330
1331   if (caps)
1332     gst_caps_unref (caps);
1333
1334   return ret >= GST_FLOW_OK || ret == GST_AGGREGATOR_FLOW_NEED_DATA;
1335 }
1336
1337 /* WITH SRC_LOCK held */
1338 static gboolean
1339 gst_aggregator_negotiate_unlocked (GstAggregator * self)
1340 {
1341   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1342
1343   if (agg_klass->negotiate)
1344     return agg_klass->negotiate (self);
1345
1346   return TRUE;
1347 }
1348
1349 /**
1350  * gst_aggregator_negotiate:
1351  * @self: a #GstAggregator
1352  *
1353  * Negotiates src pad caps with downstream elements.
1354  * Unmarks GST_PAD_FLAG_NEED_RECONFIGURE in any case. But marks it again
1355  * if #GstAggregatorClass::negotiate fails.
1356  *
1357  * Returns: %TRUE if the negotiation succeeded, else %FALSE.
1358  *
1359  * Since: 1.18
1360  */
1361 gboolean
1362 gst_aggregator_negotiate (GstAggregator * self)
1363 {
1364   gboolean ret = TRUE;
1365
1366   g_return_val_if_fail (GST_IS_AGGREGATOR (self), FALSE);
1367
1368   GST_PAD_STREAM_LOCK (GST_AGGREGATOR_SRC_PAD (self));
1369   gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1370   ret = gst_aggregator_negotiate_unlocked (self);
1371   if (!ret)
1372     gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1373   GST_PAD_STREAM_UNLOCK (GST_AGGREGATOR_SRC_PAD (self));
1374
1375   return ret;
1376 }
1377
1378 static void
1379 gst_aggregator_aggregate_func (GstAggregator * self)
1380 {
1381   GstAggregatorPrivate *priv = self->priv;
1382   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1383   gboolean timeout = FALSE;
1384
1385   if (self->priv->running == FALSE) {
1386     GST_DEBUG_OBJECT (self, "Not running anymore");
1387     return;
1388   }
1389
1390   GST_LOG_OBJECT (self, "Checking aggregate");
1391   while (priv->send_eos && priv->running) {
1392     GstFlowReturn flow_return = GST_FLOW_OK;
1393     DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
1394
1395     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1396         gst_aggregator_do_events_and_queries, &events_query_data);
1397
1398     if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1399       goto handle_error;
1400
1401     if (is_live_unlocked (self))
1402       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1403           gst_aggregator_pad_skip_buffers, NULL);
1404
1405     if (self->priv->got_eos_event) {
1406       gst_aggregator_push_eos (self);
1407       continue;
1408     }
1409
1410     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
1411      * the queue */
1412     if (!gst_aggregator_wait_and_check (self, &timeout)) {
1413       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1414           gst_aggregator_pad_reset_peeked_buffer, NULL);
1415       continue;
1416     }
1417
1418     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1419       if (!gst_aggregator_negotiate_unlocked (self)) {
1420         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1421         if (GST_PAD_IS_FLUSHING (GST_AGGREGATOR_SRC_PAD (self))) {
1422           flow_return = GST_FLOW_FLUSHING;
1423         } else {
1424           flow_return = GST_FLOW_NOT_NEGOTIATED;
1425         }
1426       }
1427     }
1428
1429     if (timeout || flow_return >= GST_FLOW_OK) {
1430       GST_LOG_OBJECT (self, "Actually aggregating, timeout: %d", timeout);
1431       flow_return = klass->aggregate (self, timeout);
1432     }
1433
1434     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1435         gst_aggregator_pad_reset_peeked_buffer, NULL);
1436
1437     if (!priv->selected_samples_called_or_warned) {
1438       GST_FIXME_OBJECT (self,
1439           "Subclass should call gst_aggregator_selected_samples() from its "
1440           "aggregate implementation.");
1441       priv->selected_samples_called_or_warned = TRUE;
1442     }
1443
1444     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1445       continue;
1446
1447     GST_OBJECT_LOCK (self);
1448     if (flow_return == GST_FLOW_FLUSHING && priv->flushing) {
1449       /* We don't want to set the pads to flushing, but we want to
1450        * stop the thread, so just break here */
1451       GST_OBJECT_UNLOCK (self);
1452       break;
1453     }
1454     GST_OBJECT_UNLOCK (self);
1455
1456     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1457       gst_aggregator_push_eos (self);
1458     }
1459
1460   handle_error:
1461     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1462
1463     if (flow_return != GST_FLOW_OK) {
1464       GList *item;
1465
1466       GST_OBJECT_LOCK (self);
1467       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1468         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1469
1470         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1471       }
1472       GST_OBJECT_UNLOCK (self);
1473       break;
1474     }
1475   }
1476
1477   /* Pause the task here, the only ways to get here are:
1478    * 1) We're stopping, in which case the task is stopped anyway
1479    * 2) We got a flow error above, in which case it might take
1480    *    some time to forward the flow return upstream and we
1481    *    would otherwise call the task function over and over
1482    *    again without doing anything
1483    */
1484   gst_pad_pause_task (self->srcpad);
1485 }
1486
1487 static gboolean
1488 gst_aggregator_start (GstAggregator * self)
1489 {
1490   GstAggregatorClass *klass;
1491   gboolean result;
1492
1493   self->priv->send_stream_start = TRUE;
1494   self->priv->send_segment = TRUE;
1495   self->priv->send_eos = TRUE;
1496   self->priv->got_eos_event = FALSE;
1497   self->priv->srccaps = NULL;
1498
1499   self->priv->has_peer_latency = FALSE;
1500   self->priv->peer_latency_live = FALSE;
1501   self->priv->peer_latency_min = self->priv->peer_latency_max = 0;
1502
1503   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1504
1505   klass = GST_AGGREGATOR_GET_CLASS (self);
1506
1507   if (klass->start)
1508     result = klass->start (self);
1509   else
1510     result = TRUE;
1511
1512   return result;
1513 }
1514
1515 static gboolean
1516 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1517 {
1518   gboolean res = TRUE;
1519
1520   GST_INFO_OBJECT (self, "%s srcpad task",
1521       flush_start ? "Pausing" : "Stopping");
1522
1523   SRC_LOCK (self);
1524   self->priv->running = FALSE;
1525   SRC_BROADCAST (self);
1526   SRC_UNLOCK (self);
1527
1528   if (flush_start) {
1529     res = gst_pad_push_event (self->srcpad, flush_start);
1530   }
1531
1532   gst_pad_stop_task (self->srcpad);
1533
1534   return res;
1535 }
1536
1537 static void
1538 gst_aggregator_start_srcpad_task (GstAggregator * self)
1539 {
1540   GST_INFO_OBJECT (self, "Starting srcpad task");
1541
1542   self->priv->running = TRUE;
1543   gst_pad_start_task (GST_PAD (self->srcpad),
1544       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1545 }
1546
1547 static GstFlowReturn
1548 gst_aggregator_flush (GstAggregator * self)
1549 {
1550   GstFlowReturn ret = GST_FLOW_OK;
1551   GstAggregatorPrivate *priv = self->priv;
1552   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1553
1554   GST_DEBUG_OBJECT (self, "Flushing everything");
1555   GST_OBJECT_LOCK (self);
1556   priv->send_segment = TRUE;
1557   priv->flushing = FALSE;
1558   priv->tags_changed = FALSE;
1559   GST_OBJECT_UNLOCK (self);
1560   if (klass->flush)
1561     ret = klass->flush (self);
1562
1563   return ret;
1564 }
1565
1566
1567 /* Called with GstAggregator's object lock held */
1568
1569 static gboolean
1570 gst_aggregator_all_flush_stop_received (GstAggregator * self, guint32 seqnum)
1571 {
1572   GList *tmp;
1573   GstAggregatorPad *tmppad;
1574
1575   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1576     tmppad = (GstAggregatorPad *) tmp->data;
1577
1578     if (tmppad->priv->last_flush_stop_seqnum != seqnum)
1579       return FALSE;
1580   }
1581
1582   return TRUE;
1583 }
1584
1585 /* Called with GstAggregator's object lock held */
1586
1587 static gboolean
1588 gst_aggregator_all_flush_start_received (GstAggregator * self, guint32 seqnum)
1589 {
1590   GList *tmp;
1591   GstAggregatorPad *tmppad;
1592
1593   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1594     tmppad = (GstAggregatorPad *) tmp->data;
1595
1596     if (tmppad->priv->last_flush_start_seqnum != seqnum) {
1597       return FALSE;
1598     }
1599   }
1600
1601   return TRUE;
1602 }
1603
1604 static void
1605 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1606     GstEvent * event)
1607 {
1608   GstAggregatorPrivate *priv = self->priv;
1609   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1610   guint32 seqnum = gst_event_get_seqnum (event);
1611
1612   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1613
1614   PAD_FLUSH_LOCK (aggpad);
1615   PAD_LOCK (aggpad);
1616   padpriv->last_flush_start_seqnum = seqnum;
1617   PAD_UNLOCK (aggpad);
1618
1619   GST_OBJECT_LOCK (self);
1620
1621   if (!priv->flushing && gst_aggregator_all_flush_start_received (self, seqnum)) {
1622     /* Make sure we don't forward more than one FLUSH_START */
1623     priv->flushing = TRUE;
1624     priv->next_seqnum = seqnum;
1625     GST_OBJECT_UNLOCK (self);
1626
1627     GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1628     gst_aggregator_stop_srcpad_task (self, event);
1629
1630     event = NULL;
1631   } else {
1632     gst_event_unref (event);
1633     GST_OBJECT_UNLOCK (self);
1634   }
1635
1636   PAD_FLUSH_UNLOCK (aggpad);
1637 }
1638
1639 /* Must be called with the PAD_LOCK and OBJECT_LOCK held */
1640 static void
1641 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1642 {
1643   GstAggregatorPadPrivate *priv = aggpad->priv;
1644
1645   if (head) {
1646     if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
1647         priv->head_segment.format == GST_FORMAT_TIME)
1648       priv->head_time = gst_segment_to_running_time (&priv->head_segment,
1649           GST_FORMAT_TIME, priv->head_position);
1650     else
1651       priv->head_time = GST_CLOCK_TIME_NONE;
1652
1653     if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
1654       priv->tail_time = priv->head_time;
1655   } else {
1656     if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
1657         aggpad->segment.format == GST_FORMAT_TIME)
1658       priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
1659           GST_FORMAT_TIME, priv->tail_position);
1660     else
1661       priv->tail_time = priv->head_time;
1662   }
1663
1664   if (priv->head_time == GST_CLOCK_TIME_NONE ||
1665       priv->tail_time == GST_CLOCK_TIME_NONE) {
1666     priv->time_level = 0;
1667     return;
1668   }
1669
1670   if (priv->tail_time > priv->head_time)
1671     priv->time_level = 0;
1672   else
1673     priv->time_level = priv->head_time - priv->tail_time;
1674 }
1675
1676
1677 /* GstAggregator vmethods default implementations */
1678 static gboolean
1679 gst_aggregator_default_sink_event (GstAggregator * self,
1680     GstAggregatorPad * aggpad, GstEvent * event)
1681 {
1682   gboolean res = TRUE;
1683   GstPad *pad = GST_PAD (aggpad);
1684   GstAggregatorPrivate *priv = self->priv;
1685
1686   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1687
1688   switch (GST_EVENT_TYPE (event)) {
1689     case GST_EVENT_FLUSH_START:
1690     {
1691       gst_aggregator_flush_start (self, aggpad, event);
1692       /* We forward only in one case: right after flushing */
1693       event = NULL;
1694       goto eat;
1695     }
1696     case GST_EVENT_FLUSH_STOP:
1697     {
1698       guint32 seqnum = gst_event_get_seqnum (event);
1699
1700       PAD_FLUSH_LOCK (aggpad);
1701       PAD_LOCK (aggpad);
1702       aggpad->priv->last_flush_stop_seqnum = seqnum;
1703       PAD_UNLOCK (aggpad);
1704
1705       gst_aggregator_pad_flush (aggpad, self);
1706
1707       GST_OBJECT_LOCK (self);
1708       if (priv->flushing
1709           && gst_aggregator_all_flush_stop_received (self, seqnum)) {
1710         GST_OBJECT_UNLOCK (self);
1711         /* That means we received FLUSH_STOP/FLUSH_STOP on
1712          * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1713         gst_aggregator_flush (self);
1714         gst_pad_push_event (self->srcpad, event);
1715         event = NULL;
1716         SRC_LOCK (self);
1717         priv->send_eos = TRUE;
1718         priv->got_eos_event = FALSE;
1719         SRC_BROADCAST (self);
1720         SRC_UNLOCK (self);
1721
1722         GST_INFO_OBJECT (self, "Flush stopped");
1723
1724         gst_aggregator_start_srcpad_task (self);
1725       } else {
1726         GST_OBJECT_UNLOCK (self);
1727       }
1728
1729       PAD_FLUSH_UNLOCK (aggpad);
1730
1731       /* We never forward the event */
1732       goto eat;
1733     }
1734     case GST_EVENT_EOS:
1735     {
1736       SRC_LOCK (self);
1737       PAD_LOCK (aggpad);
1738       aggpad->priv->eos = TRUE;
1739       PAD_UNLOCK (aggpad);
1740       SRC_BROADCAST (self);
1741       SRC_UNLOCK (self);
1742       goto eat;
1743     }
1744     case GST_EVENT_SEGMENT:
1745     {
1746       PAD_LOCK (aggpad);
1747       GST_OBJECT_LOCK (aggpad);
1748       gst_event_copy_segment (event, &aggpad->segment);
1749       /* We've got a new segment, tail_position is now meaningless
1750        * and may interfere with the time_level calculation
1751        */
1752       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1753       update_time_level (aggpad, FALSE);
1754       GST_OBJECT_UNLOCK (aggpad);
1755       PAD_UNLOCK (aggpad);
1756
1757       GST_OBJECT_LOCK (self);
1758       self->priv->seqnum = gst_event_get_seqnum (event);
1759       GST_OBJECT_UNLOCK (self);
1760       goto eat;
1761     }
1762     case GST_EVENT_STREAM_START:
1763     {
1764       PAD_LOCK (aggpad);
1765       aggpad->priv->eos = FALSE;
1766       PAD_UNLOCK (aggpad);
1767       goto eat;
1768     }
1769     case GST_EVENT_GAP:
1770     {
1771       GstClockTime pts, endpts;
1772       GstClockTime duration;
1773       GstBuffer *gapbuf;
1774       GstGapFlags flags = 0;
1775
1776       gst_event_parse_gap (event, &pts, &duration);
1777
1778       if (GST_CLOCK_TIME_IS_VALID (duration))
1779         endpts = pts + duration;
1780       else
1781         endpts = GST_CLOCK_TIME_NONE;
1782
1783       GST_OBJECT_LOCK (aggpad);
1784       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1785           &pts, &endpts);
1786       GST_OBJECT_UNLOCK (aggpad);
1787
1788       if (!res) {
1789         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1790         goto eat;
1791       }
1792
1793       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1794         duration = endpts - pts;
1795       else
1796         duration = GST_CLOCK_TIME_NONE;
1797
1798       gapbuf = gst_buffer_new ();
1799       GST_BUFFER_PTS (gapbuf) = pts;
1800       GST_BUFFER_DURATION (gapbuf) = duration;
1801       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1802       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1803
1804       gst_event_parse_gap_flags (event, &flags);
1805       if (flags & GST_GAP_FLAG_MISSING_DATA) {
1806         gst_buffer_add_custom_meta (gapbuf, "GstAggregatorMissingDataMeta");
1807       }
1808
1809       /* Remove GAP event so we can replace it with the buffer */
1810       PAD_LOCK (aggpad);
1811       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1812         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1813       PAD_UNLOCK (aggpad);
1814
1815       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1816           GST_FLOW_OK) {
1817         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1818         res = FALSE;
1819       }
1820
1821       goto eat;
1822     }
1823     case GST_EVENT_TAG:
1824       goto eat;
1825     default:
1826     {
1827       break;
1828     }
1829   }
1830
1831   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1832   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1833
1834 eat:
1835   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1836   if (event)
1837     gst_event_unref (event);
1838
1839   return res;
1840 }
1841
1842 /* Queue serialized events and let the others go through directly.
1843  * The queued events with be handled from the src-pad task in
1844  * gst_aggregator_do_events_and_queries().
1845  */
1846 static GstFlowReturn
1847 gst_aggregator_default_sink_event_pre_queue (GstAggregator * self,
1848     GstAggregatorPad * aggpad, GstEvent * event)
1849 {
1850   GstFlowReturn ret = GST_FLOW_OK;
1851
1852   if (GST_EVENT_IS_SERIALIZED (event)
1853       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
1854     SRC_LOCK (self);
1855     PAD_LOCK (aggpad);
1856
1857     if (aggpad->priv->flow_return != GST_FLOW_OK)
1858       goto flushing;
1859
1860     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1861       GST_OBJECT_LOCK (aggpad);
1862       gst_event_copy_segment (event, &aggpad->priv->head_segment);
1863       aggpad->priv->head_position = aggpad->priv->head_segment.position;
1864       update_time_level (aggpad, TRUE);
1865       GST_OBJECT_UNLOCK (aggpad);
1866     }
1867
1868     GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
1869     g_queue_push_head (&aggpad->priv->data, event);
1870     SRC_BROADCAST (self);
1871     PAD_UNLOCK (aggpad);
1872     SRC_UNLOCK (self);
1873   } else {
1874     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1875
1876     if (!klass->sink_event (self, aggpad, event)) {
1877       /* Copied from GstPad to convert boolean to a GstFlowReturn in
1878        * the event handling func */
1879       ret = GST_FLOW_ERROR;
1880     }
1881   }
1882
1883   return ret;
1884
1885 flushing:
1886   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
1887       gst_flow_get_name (aggpad->priv->flow_return));
1888   PAD_UNLOCK (aggpad);
1889   SRC_UNLOCK (self);
1890   if (GST_EVENT_IS_STICKY (event))
1891     gst_pad_store_sticky_event (GST_PAD (aggpad), event);
1892   gst_event_unref (event);
1893
1894   return aggpad->priv->flow_return;
1895 }
1896
1897 static gboolean
1898 gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
1899 {
1900   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
1901   GstAggregator *agg = GST_AGGREGATOR_CAST (self);
1902
1903   gst_aggregator_pad_flush (pad, agg);
1904
1905   PAD_LOCK (pad);
1906   pad->priv->flow_return = GST_FLOW_FLUSHING;
1907   pad->priv->negotiated = FALSE;
1908   PAD_BROADCAST_EVENT (pad);
1909   PAD_UNLOCK (pad);
1910
1911   return TRUE;
1912 }
1913
1914 static gboolean
1915 gst_aggregator_stop (GstAggregator * agg)
1916 {
1917   GstAggregatorClass *klass;
1918   gboolean result;
1919
1920   gst_aggregator_reset_flow_values (agg);
1921
1922   /* Application needs to make sure no pads are added while it shuts us down */
1923   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
1924       gst_aggregator_stop_pad, NULL);
1925
1926   klass = GST_AGGREGATOR_GET_CLASS (agg);
1927
1928   if (klass->stop)
1929     result = klass->stop (agg);
1930   else
1931     result = TRUE;
1932
1933   agg->priv->has_peer_latency = FALSE;
1934   agg->priv->peer_latency_live = FALSE;
1935   agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
1936   agg->priv->posted_latency_msg = FALSE;
1937
1938   if (agg->priv->tags)
1939     gst_tag_list_unref (agg->priv->tags);
1940   agg->priv->tags = NULL;
1941
1942   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1943
1944   if (agg->priv->running) {
1945     /* As sinkpads get deactivated after the src pad, we
1946      * may have restarted the source pad task after receiving
1947      * flush events on one of our sinkpads. Stop our src pad
1948      * task again if that is the case */
1949     gst_aggregator_stop_srcpad_task (agg, NULL);
1950   }
1951
1952   return result;
1953 }
1954
1955 /* GstElement vmethods implementations */
1956 static GstStateChangeReturn
1957 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1958 {
1959   GstStateChangeReturn ret;
1960   GstAggregator *self = GST_AGGREGATOR (element);
1961
1962   switch (transition) {
1963     case GST_STATE_CHANGE_READY_TO_PAUSED:
1964       if (!gst_aggregator_start (self))
1965         goto error_start;
1966       break;
1967     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1968       /* Wake up any waiting as now we have a clock and can do
1969        * proper waiting on the clock if necessary */
1970       SRC_LOCK (self);
1971       SRC_BROADCAST (self);
1972       SRC_UNLOCK (self);
1973       break;
1974     default:
1975       break;
1976   }
1977
1978   if ((ret =
1979           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1980               transition)) == GST_STATE_CHANGE_FAILURE)
1981     goto failure;
1982
1983
1984   switch (transition) {
1985     case GST_STATE_CHANGE_PAUSED_TO_READY:
1986       if (!gst_aggregator_stop (self)) {
1987         /* What to do in this case? Error out? */
1988         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1989       }
1990       break;
1991     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1992       /* Wake up any waiting as now clock might be gone and we might
1993        * need to wait on the condition variable again */
1994       SRC_LOCK (self);
1995       SRC_BROADCAST (self);
1996       SRC_UNLOCK (self);
1997       if (self->priv->force_live) {
1998         ret = GST_STATE_CHANGE_NO_PREROLL;
1999       }
2000       break;
2001     case GST_STATE_CHANGE_READY_TO_PAUSED:
2002       if (self->priv->force_live) {
2003         ret = GST_STATE_CHANGE_NO_PREROLL;
2004       }
2005       break;
2006     default:
2007       break;
2008   }
2009
2010   return ret;
2011
2012 /* ERRORS */
2013 failure:
2014   {
2015     GST_ERROR_OBJECT (element, "parent failed state change");
2016     return ret;
2017   }
2018 error_start:
2019   {
2020     GST_ERROR_OBJECT (element, "Subclass failed to start");
2021     return GST_STATE_CHANGE_FAILURE;
2022   }
2023 }
2024
2025 static void
2026 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
2027 {
2028   GstAggregator *self = GST_AGGREGATOR (element);
2029   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2030
2031   GST_INFO_OBJECT (pad, "Removing pad");
2032
2033   SRC_LOCK (self);
2034   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2035   PAD_LOCK (aggpad);
2036   gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
2037   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
2038   PAD_UNLOCK (aggpad);
2039   gst_element_remove_pad (element, pad);
2040
2041   self->priv->has_peer_latency = FALSE;
2042   SRC_BROADCAST (self);
2043   SRC_UNLOCK (self);
2044 }
2045
2046 static GstAggregatorPad *
2047 gst_aggregator_default_create_new_pad (GstAggregator * self,
2048     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
2049 {
2050   GstAggregatorPad *agg_pad;
2051   GstAggregatorPrivate *priv = self->priv;
2052   gint serial = 0;
2053   gchar *name = NULL;
2054   GType pad_type =
2055       GST_PAD_TEMPLATE_GTYPE (templ) ==
2056       G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
2057
2058   if (templ->direction != GST_PAD_SINK)
2059     goto not_sink;
2060
2061   if (templ->presence != GST_PAD_REQUEST)
2062     goto not_request;
2063
2064   GST_OBJECT_LOCK (self);
2065   if (req_name == NULL || strlen (req_name) < 6
2066       || !g_str_has_prefix (req_name, "sink_")
2067       || strrchr (req_name, '%') != NULL) {
2068     /* no name given when requesting the pad, use next available int */
2069     serial = ++priv->max_padserial;
2070   } else {
2071     gchar *endptr = NULL;
2072
2073     /* parse serial number from requested padname */
2074     serial = g_ascii_strtoull (&req_name[5], &endptr, 10);
2075     if (endptr != NULL && *endptr == '\0') {
2076       if (serial > priv->max_padserial) {
2077         priv->max_padserial = serial;
2078       }
2079     } else {
2080       serial = ++priv->max_padserial;
2081     }
2082   }
2083
2084   name = g_strdup_printf ("sink_%u", serial);
2085   g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
2086   agg_pad = g_object_new (pad_type,
2087       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
2088   g_free (name);
2089
2090   GST_OBJECT_UNLOCK (self);
2091
2092   return agg_pad;
2093
2094   /* errors */
2095 not_sink:
2096   {
2097     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
2098     return NULL;
2099   }
2100 not_request:
2101   {
2102     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
2103     return NULL;
2104   }
2105 }
2106
2107 static GstPad *
2108 gst_aggregator_request_new_pad (GstElement * element,
2109     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
2110 {
2111   GstAggregator *self;
2112   GstAggregatorPad *agg_pad;
2113   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
2114   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
2115
2116   self = GST_AGGREGATOR (element);
2117
2118   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
2119   if (!agg_pad) {
2120     GST_ERROR_OBJECT (element, "Couldn't create new pad");
2121     return NULL;
2122   }
2123
2124   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
2125
2126   if (priv->running)
2127     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
2128
2129   /* add the pad to the element */
2130   gst_element_add_pad (element, GST_PAD (agg_pad));
2131
2132   return GST_PAD (agg_pad);
2133 }
2134
2135 /* Must be called with SRC_LOCK held, temporarily releases it! */
2136
2137 static gboolean
2138 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
2139 {
2140   gboolean query_ret, live;
2141   GstClockTime our_latency, min, max;
2142
2143   /* Temporarily release the lock to do the query. */
2144   SRC_UNLOCK (self);
2145   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
2146   SRC_LOCK (self);
2147
2148   if (!query_ret) {
2149     GST_WARNING_OBJECT (self, "Latency query failed");
2150     return FALSE;
2151   }
2152
2153   gst_query_parse_latency (query, &live, &min, &max);
2154
2155   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
2156     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
2157         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
2158     return FALSE;
2159   }
2160
2161   if (self->priv->upstream_latency_min > min) {
2162     GstClockTimeDiff diff =
2163         GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
2164
2165     min += diff;
2166     if (GST_CLOCK_TIME_IS_VALID (max)) {
2167       max += diff;
2168     }
2169   }
2170
2171   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
2172     SRC_UNLOCK (self);
2173     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
2174         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
2175             GST_TIME_FORMAT ". Add queues or other buffering elements.",
2176             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
2177     SRC_LOCK (self);
2178     return FALSE;
2179   }
2180
2181   our_latency = self->priv->latency;
2182
2183   self->priv->peer_latency_live = live;
2184   self->priv->peer_latency_min = min;
2185   self->priv->peer_latency_max = max;
2186   self->priv->has_peer_latency = TRUE;
2187
2188   /* add our own */
2189   min += our_latency;
2190   min += self->priv->sub_latency_min;
2191   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
2192       && GST_CLOCK_TIME_IS_VALID (max))
2193     max += self->priv->sub_latency_max + our_latency;
2194   else
2195     max = GST_CLOCK_TIME_NONE;
2196
2197   SRC_BROADCAST (self);
2198
2199   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
2200       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
2201
2202   gst_query_set_latency (query, live, min, max);
2203
2204   return query_ret;
2205 }
2206
2207 /*
2208  * MUST be called with the src_lock held. Temporarily releases the lock inside
2209  * gst_aggregator_query_latency_unlocked() to do the actual query!
2210  *
2211  * See  gst_aggregator_get_latency() for doc
2212  */
2213 static GstClockTime
2214 gst_aggregator_get_latency_unlocked (GstAggregator * self)
2215 {
2216   GstClockTime latency;
2217
2218   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
2219
2220   if (!self->priv->has_peer_latency) {
2221     GstQuery *query = gst_query_new_latency ();
2222     gboolean ret;
2223
2224     ret = gst_aggregator_query_latency_unlocked (self, query);
2225     gst_query_unref (query);
2226     /* If we've been set to live, we don't wait for a peer latency, we will
2227      * simply query it again next time around */
2228     if (!ret && !self->priv->force_live)
2229       return GST_CLOCK_TIME_NONE;
2230   }
2231
2232   /* If we've been set to live, we don't wait for a peer latency, we will
2233    * simply query it again next time around */
2234   if (!self->priv->force_live) {
2235     if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
2236       return GST_CLOCK_TIME_NONE;
2237   }
2238
2239   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
2240   latency = self->priv->peer_latency_min;
2241
2242   /* add our own */
2243   latency += self->priv->latency;
2244   latency += self->priv->sub_latency_min;
2245
2246   return latency;
2247 }
2248
2249 /**
2250  * gst_aggregator_get_latency:
2251  * @self: a #GstAggregator
2252  *
2253  * Retrieves the latency values reported by @self in response to the latency
2254  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
2255  * will not wait for the clock.
2256  *
2257  * Typically only called by subclasses.
2258  *
2259  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
2260  */
2261 GstClockTime
2262 gst_aggregator_get_latency (GstAggregator * self)
2263 {
2264   GstClockTime ret;
2265
2266   SRC_LOCK (self);
2267   ret = gst_aggregator_get_latency_unlocked (self);
2268   SRC_UNLOCK (self);
2269
2270   return ret;
2271 }
2272
2273 static gboolean
2274 gst_aggregator_send_event (GstElement * element, GstEvent * event)
2275 {
2276   GstAggregator *self = GST_AGGREGATOR (element);
2277
2278   GST_STATE_LOCK (element);
2279   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
2280       GST_STATE (element) < GST_STATE_PAUSED) {
2281     gdouble rate;
2282     GstFormat fmt;
2283     GstSeekFlags flags;
2284     GstSeekType start_type, stop_type;
2285     gint64 start, stop;
2286
2287     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2288         &start, &stop_type, &stop);
2289
2290     GST_OBJECT_LOCK (self);
2291     gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2292         flags, start_type, start, stop_type, stop, NULL);
2293     self->priv->next_seqnum = gst_event_get_seqnum (event);
2294     self->priv->first_buffer = FALSE;
2295     GST_OBJECT_UNLOCK (self);
2296
2297     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
2298   }
2299
2300   GST_STATE_UNLOCK (element);
2301
2302
2303   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2304     SRC_LOCK (self);
2305     self->priv->got_eos_event = TRUE;
2306     SRC_BROADCAST (self);
2307     SRC_UNLOCK (self);
2308   }
2309
2310   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
2311       event);
2312 }
2313
2314 static gboolean
2315 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
2316 {
2317   gboolean res = TRUE;
2318
2319   switch (GST_QUERY_TYPE (query)) {
2320     case GST_QUERY_SEEKING:
2321     {
2322       GstFormat format;
2323
2324       /* don't pass it along as some (file)sink might claim it does
2325        * whereas with a collectpads in between that will not likely work */
2326       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2327       gst_query_set_seeking (query, format, FALSE, 0, -1);
2328       res = TRUE;
2329
2330       break;
2331     }
2332     case GST_QUERY_LATENCY:
2333       SRC_LOCK (self);
2334       res = gst_aggregator_query_latency_unlocked (self, query);
2335       SRC_UNLOCK (self);
2336       break;
2337     default:
2338       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
2339   }
2340
2341   return res;
2342 }
2343
2344 static gboolean
2345 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
2346 {
2347   EventData *evdata = user_data;
2348   gboolean ret = TRUE;
2349   GstPad *peer = gst_pad_get_peer (pad);
2350   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2351
2352   if (peer) {
2353     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
2354       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
2355       ret = TRUE;
2356     } else {
2357       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
2358       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
2359     }
2360   }
2361
2362   if (ret == FALSE) {
2363     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
2364       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
2365
2366       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
2367
2368       if (gst_pad_query (peer, seeking)) {
2369         gboolean seekable;
2370
2371         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
2372
2373         if (seekable == FALSE) {
2374           GST_INFO_OBJECT (pad,
2375               "Source not seekable, We failed but it does not matter!");
2376
2377           ret = TRUE;
2378         }
2379       } else {
2380         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
2381       }
2382
2383       gst_query_unref (seeking);
2384     }
2385   } else {
2386     evdata->one_actually_seeked = TRUE;
2387   }
2388
2389   evdata->result &= ret;
2390
2391   if (peer)
2392     gst_object_unref (peer);
2393
2394   /* Always send to all pads */
2395   return FALSE;
2396 }
2397
2398 static void
2399 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
2400     EventData * evdata)
2401 {
2402   evdata->result = TRUE;
2403   evdata->one_actually_seeked = FALSE;
2404
2405   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
2406
2407   gst_event_unref (evdata->event);
2408 }
2409
2410 static gboolean
2411 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
2412 {
2413   gdouble rate;
2414   GstFormat fmt;
2415   GstSeekFlags flags;
2416   GstSeekType start_type, stop_type;
2417   gint64 start, stop;
2418   gboolean flush;
2419   EventData evdata = { 0, };
2420   GstAggregatorPrivate *priv = self->priv;
2421
2422   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2423       &start, &stop_type, &stop);
2424
2425   GST_INFO_OBJECT (self, "starting SEEK");
2426
2427   flush = flags & GST_SEEK_FLAG_FLUSH;
2428
2429   GST_OBJECT_LOCK (self);
2430
2431   if (gst_event_get_seqnum (event) == self->priv->next_seqnum) {
2432     evdata.result = TRUE;
2433     GST_DEBUG_OBJECT (self, "Dropping duplicated seek event with seqnum %d",
2434         self->priv->next_seqnum);
2435     GST_OBJECT_UNLOCK (self);
2436     goto done;
2437   }
2438
2439   self->priv->next_seqnum = gst_event_get_seqnum (event);
2440
2441   gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2442       flags, start_type, start, stop_type, stop, NULL);
2443
2444   /* Seeking sets a position */
2445   self->priv->first_buffer = FALSE;
2446
2447   if (flush)
2448     priv->flushing = TRUE;
2449
2450   GST_OBJECT_UNLOCK (self);
2451
2452   if (flush) {
2453     GstEvent *event = gst_event_new_flush_start ();
2454
2455     gst_event_set_seqnum (event, self->priv->next_seqnum);
2456     gst_aggregator_stop_srcpad_task (self, event);
2457   }
2458
2459   /* forward the seek upstream */
2460   evdata.event = event;
2461   evdata.flush = flush;
2462   evdata.only_to_active_pads = FALSE;
2463   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2464   event = NULL;
2465
2466   if (!evdata.result || !evdata.one_actually_seeked) {
2467     GST_OBJECT_LOCK (self);
2468     priv->flushing = FALSE;
2469     GST_OBJECT_UNLOCK (self);
2470
2471     /* No flush stop is inbound for us to forward */
2472     if (flush) {
2473       GstEvent *event = gst_event_new_flush_stop (TRUE);
2474
2475       gst_event_set_seqnum (event, self->priv->next_seqnum);
2476       gst_pad_push_event (self->srcpad, event);
2477     }
2478   }
2479
2480 done:
2481   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2482
2483   return evdata.result;
2484 }
2485
2486 static gboolean
2487 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2488 {
2489   EventData evdata = { 0, };
2490
2491   switch (GST_EVENT_TYPE (event)) {
2492     case GST_EVENT_SEEK:
2493       /* _do_seek() unrefs the event. */
2494       return gst_aggregator_do_seek (self, event);
2495     case GST_EVENT_NAVIGATION:
2496       /* specific handling has to be implemented in subclasses */
2497       gst_event_unref (event);
2498       return FALSE;
2499     case GST_EVENT_RECONFIGURE:
2500       /* We will renegotiate with downstream, we don't
2501        * need to forward this further */
2502       gst_event_unref (event);
2503       return TRUE;
2504     default:
2505       break;
2506   }
2507
2508   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2509    * they will receive a QOS event that has earliest_time=0 (because we can't
2510    * have negative timestamps), and consider their buffer as too late */
2511   evdata.event = event;
2512   evdata.flush = FALSE;
2513   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2514   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2515   return evdata.result;
2516 }
2517
2518 static gboolean
2519 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2520     GstEvent * event)
2521 {
2522   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2523
2524   return klass->src_event (GST_AGGREGATOR (parent), event);
2525 }
2526
2527 static gboolean
2528 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2529     GstQuery * query)
2530 {
2531   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2532
2533   return klass->src_query (GST_AGGREGATOR (parent), query);
2534 }
2535
2536 static gboolean
2537 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2538     GstObject * parent, GstPadMode mode, gboolean active)
2539 {
2540   GstAggregator *self = GST_AGGREGATOR (parent);
2541   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2542
2543   if (klass->src_activate) {
2544     if (klass->src_activate (self, mode, active) == FALSE) {
2545       return FALSE;
2546     }
2547   }
2548
2549   if (active == TRUE) {
2550     switch (mode) {
2551       case GST_PAD_MODE_PUSH:
2552       {
2553         GST_INFO_OBJECT (pad, "Activating pad!");
2554         gst_aggregator_start_srcpad_task (self);
2555         return TRUE;
2556       }
2557       default:
2558       {
2559         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2560         return FALSE;
2561       }
2562     }
2563   }
2564
2565   /* deactivating */
2566   GST_INFO_OBJECT (self, "Deactivating srcpad");
2567
2568   gst_aggregator_stop_srcpad_task (self, FALSE);
2569
2570   return TRUE;
2571 }
2572
2573 static gboolean
2574 gst_aggregator_default_sink_query (GstAggregator * self,
2575     GstAggregatorPad * aggpad, GstQuery * query)
2576 {
2577   GstPad *pad = GST_PAD (aggpad);
2578
2579   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2580     GstQuery *decide_query = NULL;
2581     GstAggregatorClass *agg_class;
2582     gboolean ret;
2583
2584     GST_OBJECT_LOCK (self);
2585     PAD_LOCK (aggpad);
2586     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2587       GST_DEBUG_OBJECT (self,
2588           "not negotiated yet, can't answer ALLOCATION query");
2589       PAD_UNLOCK (aggpad);
2590       GST_OBJECT_UNLOCK (self);
2591
2592       return FALSE;
2593     }
2594
2595     if ((decide_query = self->priv->allocation_query))
2596       gst_query_ref (decide_query);
2597     PAD_UNLOCK (aggpad);
2598     GST_OBJECT_UNLOCK (self);
2599
2600     GST_DEBUG_OBJECT (self,
2601         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2602
2603     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2604
2605     /* pass the query to the propose_allocation vmethod if any */
2606     if (agg_class->propose_allocation)
2607       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2608     else
2609       ret = FALSE;
2610
2611     if (decide_query)
2612       gst_query_unref (decide_query);
2613
2614     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2615     return ret;
2616   }
2617
2618   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2619 }
2620
2621 static gboolean
2622 gst_aggregator_default_sink_query_pre_queue (GstAggregator * self,
2623     GstAggregatorPad * aggpad, GstQuery * query)
2624 {
2625   if (GST_QUERY_IS_SERIALIZED (query)) {
2626     GstStructure *s;
2627     gboolean ret = FALSE;
2628
2629     SRC_LOCK (self);
2630     PAD_LOCK (aggpad);
2631
2632     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2633       SRC_UNLOCK (self);
2634       goto flushing;
2635     }
2636
2637     g_queue_push_head (&aggpad->priv->data, query);
2638     SRC_BROADCAST (self);
2639     SRC_UNLOCK (self);
2640
2641     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2642         && aggpad->priv->flow_return == GST_FLOW_OK) {
2643       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2644       PAD_WAIT_EVENT (aggpad);
2645     }
2646
2647     s = gst_query_writable_structure (query);
2648     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2649       gst_structure_remove_field (s, "gst-aggregator-retval");
2650     else
2651       g_queue_remove (&aggpad->priv->data, query);
2652
2653     if (aggpad->priv->flow_return != GST_FLOW_OK)
2654       goto flushing;
2655
2656     PAD_UNLOCK (aggpad);
2657
2658     return ret;
2659   } else {
2660     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
2661
2662     return klass->sink_query (self, aggpad, query);
2663   }
2664
2665 flushing:
2666   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2667       gst_flow_get_name (aggpad->priv->flow_return));
2668   PAD_UNLOCK (aggpad);
2669
2670   return FALSE;
2671 }
2672
2673 static void
2674 gst_aggregator_constructed (GObject * object)
2675 {
2676   GstAggregator *agg = GST_AGGREGATOR (object);
2677
2678   if (agg->priv->force_live) {
2679     GST_OBJECT_FLAG_SET (agg, GST_ELEMENT_FLAG_SOURCE);
2680   }
2681 }
2682
2683 static void
2684 gst_aggregator_finalize (GObject * object)
2685 {
2686   GstAggregator *self = (GstAggregator *) object;
2687
2688   g_mutex_clear (&self->priv->src_lock);
2689   g_cond_clear (&self->priv->src_cond);
2690
2691   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2692 }
2693
2694 /*
2695  * gst_aggregator_set_latency_property:
2696  * @agg: a #GstAggregator
2697  * @latency: the new latency value (in nanoseconds).
2698  *
2699  * Sets the new latency value to @latency. This value is used to limit the
2700  * amount of time a pad waits for data to appear before considering the pad
2701  * as unresponsive.
2702  */
2703 static void
2704 gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
2705 {
2706   gboolean changed;
2707
2708   g_return_if_fail (GST_IS_AGGREGATOR (self));
2709   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2710
2711   SRC_LOCK (self);
2712   changed = (self->priv->latency != latency);
2713
2714   if (changed) {
2715     GList *item;
2716
2717     GST_OBJECT_LOCK (self);
2718     /* First lock all the pads */
2719     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2720       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2721       PAD_LOCK (aggpad);
2722     }
2723
2724     self->priv->latency = latency;
2725
2726     SRC_BROADCAST (self);
2727
2728     /* Now wake up the pads */
2729     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2730       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2731       PAD_BROADCAST_EVENT (aggpad);
2732       PAD_UNLOCK (aggpad);
2733     }
2734     GST_OBJECT_UNLOCK (self);
2735   }
2736
2737   SRC_UNLOCK (self);
2738
2739   if (changed)
2740     gst_element_post_message (GST_ELEMENT_CAST (self),
2741         gst_message_new_latency (GST_OBJECT_CAST (self)));
2742 }
2743
2744 /*
2745  * gst_aggregator_get_latency_property:
2746  * @agg: a #GstAggregator
2747  *
2748  * Gets the latency value. See gst_aggregator_set_latency for
2749  * more details.
2750  *
2751  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2752  * before a pad is deemed unresponsive. A value of -1 means an
2753  * unlimited time.
2754  */
2755 static GstClockTime
2756 gst_aggregator_get_latency_property (GstAggregator * agg)
2757 {
2758   GstClockTime res;
2759
2760   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
2761
2762   GST_OBJECT_LOCK (agg);
2763   res = agg->priv->latency;
2764   GST_OBJECT_UNLOCK (agg);
2765
2766   return res;
2767 }
2768
2769 static void
2770 gst_aggregator_set_property (GObject * object, guint prop_id,
2771     const GValue * value, GParamSpec * pspec)
2772 {
2773   GstAggregator *agg = GST_AGGREGATOR (object);
2774
2775   switch (prop_id) {
2776     case PROP_LATENCY:
2777       gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
2778       break;
2779     case PROP_MIN_UPSTREAM_LATENCY:
2780       SRC_LOCK (agg);
2781       agg->priv->upstream_latency_min = g_value_get_uint64 (value);
2782       SRC_UNLOCK (agg);
2783       break;
2784     case PROP_START_TIME_SELECTION:
2785       agg->priv->start_time_selection = g_value_get_enum (value);
2786       break;
2787     case PROP_START_TIME:
2788       agg->priv->start_time = g_value_get_uint64 (value);
2789       break;
2790     case PROP_EMIT_SIGNALS:
2791       agg->priv->emit_signals = g_value_get_boolean (value);
2792       break;
2793     default:
2794       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2795       break;
2796   }
2797 }
2798
2799 static void
2800 gst_aggregator_get_property (GObject * object, guint prop_id,
2801     GValue * value, GParamSpec * pspec)
2802 {
2803   GstAggregator *agg = GST_AGGREGATOR (object);
2804
2805   switch (prop_id) {
2806     case PROP_LATENCY:
2807       g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
2808       break;
2809     case PROP_MIN_UPSTREAM_LATENCY:
2810       SRC_LOCK (agg);
2811       g_value_set_uint64 (value, agg->priv->upstream_latency_min);
2812       SRC_UNLOCK (agg);
2813       break;
2814     case PROP_START_TIME_SELECTION:
2815       g_value_set_enum (value, agg->priv->start_time_selection);
2816       break;
2817     case PROP_START_TIME:
2818       g_value_set_uint64 (value, agg->priv->start_time);
2819       break;
2820     case PROP_EMIT_SIGNALS:
2821       g_value_set_boolean (value, agg->priv->emit_signals);
2822       break;
2823     default:
2824       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2825       break;
2826   }
2827 }
2828
2829 /* GObject vmethods implementations */
2830 static void
2831 gst_aggregator_class_init (GstAggregatorClass * klass)
2832 {
2833   GObjectClass *gobject_class = (GObjectClass *) klass;
2834   GstElementClass *gstelement_class = (GstElementClass *) klass;
2835   static const gchar *meta_tags[] = { NULL };
2836
2837   aggregator_parent_class = g_type_class_peek_parent (klass);
2838
2839   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2840       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2841
2842   if (aggregator_private_offset != 0)
2843     g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
2844
2845   klass->finish_buffer = gst_aggregator_default_finish_buffer;
2846   klass->finish_buffer_list = gst_aggregator_default_finish_buffer_list;
2847
2848   klass->sink_event = gst_aggregator_default_sink_event;
2849   klass->sink_query = gst_aggregator_default_sink_query;
2850
2851   klass->src_event = gst_aggregator_default_src_event;
2852   klass->src_query = gst_aggregator_default_src_query;
2853
2854   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2855   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2856   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2857   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2858
2859   klass->negotiate = gst_aggregator_default_negotiate;
2860
2861   klass->sink_event_pre_queue = gst_aggregator_default_sink_event_pre_queue;
2862   klass->sink_query_pre_queue = gst_aggregator_default_sink_query_pre_queue;
2863
2864   gstelement_class->request_new_pad =
2865       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2866   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2867   gstelement_class->release_pad =
2868       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2869   gstelement_class->change_state =
2870       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2871
2872   gobject_class->set_property = gst_aggregator_set_property;
2873   gobject_class->get_property = gst_aggregator_get_property;
2874   gobject_class->constructed = gst_aggregator_constructed;
2875   gobject_class->finalize = gst_aggregator_finalize;
2876
2877   g_object_class_install_property (gobject_class, PROP_LATENCY,
2878       g_param_spec_uint64 ("latency", "Buffer latency",
2879           "Additional latency in live mode to allow upstream "
2880           "to take longer to produce buffers for the current "
2881           "position (in nanoseconds)", 0, G_MAXUINT64,
2882           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2883
2884   /**
2885    * GstAggregator:min-upstream-latency:
2886    *
2887    * Force minimum upstream latency (in nanoseconds). When sources with a
2888    * higher latency are expected to be plugged in dynamically after the
2889    * aggregator has started playing, this allows overriding the minimum
2890    * latency reported by the initial source(s). This is only taken into
2891    * account when larger than the actually reported minimum latency.
2892    *
2893    * Since: 1.16
2894    */
2895   g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
2896       g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
2897           "When sources with a higher latency are expected to be plugged "
2898           "in dynamically after the aggregator has started playing, "
2899           "this allows overriding the minimum latency reported by the "
2900           "initial source(s). This is only taken into account when larger "
2901           "than the actually reported minimum latency. (nanoseconds)",
2902           0, G_MAXUINT64,
2903           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2904
2905   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2906       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2907           "Decides which start time is output",
2908           gst_aggregator_start_time_selection_get_type (),
2909           DEFAULT_START_TIME_SELECTION,
2910           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2911
2912   g_object_class_install_property (gobject_class, PROP_START_TIME,
2913       g_param_spec_uint64 ("start-time", "Start Time",
2914           "Start time to use if start-time-selection=set", 0,
2915           G_MAXUINT64,
2916           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2917
2918   /**
2919    * GstAggregator:emit-signals:
2920    *
2921    * Enables the emission of signals such as #GstAggregator::samples-selected
2922    *
2923    * Since: 1.18
2924    */
2925   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
2926       g_param_spec_boolean ("emit-signals", "Emit signals",
2927           "Send signals", DEFAULT_EMIT_SIGNALS,
2928           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2929
2930   /**
2931    * GstAggregator::samples-selected:
2932    * @aggregator: The #GstAggregator that emitted the signal
2933    * @segment: The #GstSegment the next output buffer is part of
2934    * @pts: The presentation timestamp of the next output buffer
2935    * @dts: The decoding timestamp of the next output buffer
2936    * @duration: The duration of the next output buffer
2937    * @info: (nullable): a #GstStructure containing additional information
2938    *
2939    * Signals that the #GstAggregator subclass has selected the next set
2940    * of input samples it will aggregate. Handlers may call
2941    * gst_aggregator_peek_next_sample() at that point.
2942    *
2943    * Since: 1.18
2944    */
2945   gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED] =
2946       g_signal_new ("samples-selected", G_TYPE_FROM_CLASS (klass),
2947       G_SIGNAL_RUN_FIRST, 0, NULL, NULL, NULL, G_TYPE_NONE, 5,
2948       GST_TYPE_SEGMENT | G_SIGNAL_TYPE_STATIC_SCOPE, GST_TYPE_CLOCK_TIME,
2949       GST_TYPE_CLOCK_TIME, GST_TYPE_CLOCK_TIME,
2950       GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
2951
2952   gst_meta_register_custom ("GstAggregatorMissingDataMeta", meta_tags, NULL,
2953       NULL, NULL);
2954 }
2955
2956 static inline gpointer
2957 gst_aggregator_get_instance_private (GstAggregator * self)
2958 {
2959   return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
2960 }
2961
2962 static void
2963 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2964 {
2965   GstPadTemplate *pad_template;
2966   GstAggregatorPrivate *priv;
2967   GType pad_type;
2968
2969   g_return_if_fail (klass->aggregate != NULL);
2970
2971   self->priv = gst_aggregator_get_instance_private (self);
2972
2973   priv = self->priv;
2974
2975   pad_template =
2976       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2977   g_return_if_fail (pad_template != NULL);
2978
2979   priv->max_padserial = -1;
2980   priv->tags_changed = FALSE;
2981   priv->ignore_inactive_pads = FALSE;
2982
2983   self->priv->peer_latency_live = FALSE;
2984   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2985   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2986   self->priv->has_peer_latency = FALSE;
2987
2988   pad_type =
2989       GST_PAD_TEMPLATE_GTYPE (pad_template) ==
2990       G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD :
2991       GST_PAD_TEMPLATE_GTYPE (pad_template);
2992   g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
2993   self->srcpad =
2994       g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC,
2995       "template", pad_template, NULL);
2996
2997   gst_aggregator_reset_flow_values (self);
2998
2999   gst_pad_set_event_function (self->srcpad,
3000       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
3001   gst_pad_set_query_function (self->srcpad,
3002       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
3003   gst_pad_set_activatemode_function (self->srcpad,
3004       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
3005
3006   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
3007
3008   self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
3009   self->priv->latency = DEFAULT_LATENCY;
3010   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
3011   self->priv->start_time = DEFAULT_START_TIME;
3012   self->priv->force_live = DEFAULT_FORCE_LIVE;
3013
3014   g_mutex_init (&self->priv->src_lock);
3015   g_cond_init (&self->priv->src_cond);
3016 }
3017
3018 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
3019  * method to get to the padtemplates */
3020 GType
3021 gst_aggregator_get_type (void)
3022 {
3023   static gsize type = 0;
3024
3025   if (g_once_init_enter (&type)) {
3026     GType _type;
3027     static const GTypeInfo info = {
3028       sizeof (GstAggregatorClass),
3029       NULL,
3030       NULL,
3031       (GClassInitFunc) gst_aggregator_class_init,
3032       NULL,
3033       NULL,
3034       sizeof (GstAggregator),
3035       0,
3036       (GInstanceInitFunc) gst_aggregator_init,
3037     };
3038
3039     _type = g_type_register_static (GST_TYPE_ELEMENT,
3040         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
3041
3042     aggregator_private_offset =
3043         g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
3044
3045     g_once_init_leave (&type, _type);
3046   }
3047   return type;
3048 }
3049
3050 /* Must be called with SRC lock and PAD lock held */
3051 static gboolean
3052 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
3053 {
3054   guint64 max_time_level;
3055
3056   GST_TRACE_OBJECT (aggpad, "Have %" GST_TIME_FORMAT " queued in %u buffers",
3057       GST_TIME_ARGS (aggpad->priv->time_level), aggpad->priv->num_buffers);
3058
3059   /* Empty queue always has space */
3060   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
3061     return TRUE;
3062
3063   /* We also want at least two buffers, one is being processed and one is ready
3064    * for the next iteration when we operate in live mode. */
3065   if (is_live_unlocked (self) && aggpad->priv->num_buffers < 2)
3066     return TRUE;
3067
3068   /* On top of our latency, we also want to allow buffering up to the
3069    * minimum upstream latency to allow queue free sources with lower then
3070    * upstream latency. */
3071   max_time_level = self->priv->latency + self->priv->peer_latency_min;
3072
3073   /* zero latency, if there is a buffer, it's full */
3074   if (max_time_level == 0)
3075     return FALSE;
3076
3077   GST_TRACE_OBJECT (aggpad, "Maximum queue level %" GST_TIME_FORMAT,
3078       GST_TIME_ARGS (max_time_level));
3079
3080   /* Allow no more buffers than the latency */
3081   return (aggpad->priv->time_level <= max_time_level);
3082 }
3083
3084 /* Must be called with the PAD_LOCK held */
3085 static void
3086 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
3087 {
3088   GstClockTime timestamp;
3089
3090   if (GST_BUFFER_DTS_IS_VALID (buffer))
3091     timestamp = GST_BUFFER_DTS (buffer);
3092   else
3093     timestamp = GST_BUFFER_PTS (buffer);
3094
3095   if (timestamp == GST_CLOCK_TIME_NONE) {
3096     if (head)
3097       timestamp = aggpad->priv->head_position;
3098     else
3099       timestamp = aggpad->priv->tail_position;
3100   }
3101
3102   /* add duration */
3103   if (GST_BUFFER_DURATION_IS_VALID (buffer))
3104     timestamp += GST_BUFFER_DURATION (buffer);
3105
3106   if (head)
3107     aggpad->priv->head_position = timestamp;
3108   else
3109     aggpad->priv->tail_position = timestamp;
3110
3111   GST_OBJECT_LOCK (aggpad);
3112   update_time_level (aggpad, head);
3113   GST_OBJECT_UNLOCK (aggpad);
3114 }
3115
3116 /*
3117  * Can be called either from the sinkpad's chain function or from the srcpad's
3118  * thread in the case of a buffer synthetized from a GAP event.
3119  * Because of this second case, FLUSH_LOCK can't be used here.
3120  */
3121
3122 static GstFlowReturn
3123 gst_aggregator_pad_chain_internal (GstAggregator * self,
3124     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
3125 {
3126   GstFlowReturn flow_return;
3127   GstClockTime buf_pts;
3128
3129   GST_TRACE_OBJECT (aggpad,
3130       "entering chain internal with %" GST_PTR_FORMAT, buffer);
3131
3132   PAD_LOCK (aggpad);
3133   flow_return = aggpad->priv->flow_return;
3134   if (flow_return != GST_FLOW_OK)
3135     goto flushing;
3136
3137   PAD_UNLOCK (aggpad);
3138
3139   buf_pts = GST_BUFFER_PTS (buffer);
3140
3141   for (;;) {
3142     SRC_LOCK (self);
3143     GST_OBJECT_LOCK (self);
3144     PAD_LOCK (aggpad);
3145
3146     if (aggpad->priv->first_buffer) {
3147       self->priv->has_peer_latency = FALSE;
3148       aggpad->priv->first_buffer = FALSE;
3149     }
3150
3151     if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
3152         && aggpad->priv->flow_return == GST_FLOW_OK) {
3153       if (head) {
3154         GST_DEBUG_OBJECT (aggpad, "Enqueuing %" GST_PTR_FORMAT, buffer);
3155         g_queue_push_head (&aggpad->priv->data, buffer);
3156       } else {
3157         g_queue_push_tail (&aggpad->priv->data, buffer);
3158       }
3159       apply_buffer (aggpad, buffer, head);
3160       aggpad->priv->num_buffers++;
3161       buffer = NULL;
3162       SRC_BROADCAST (self);
3163       break;
3164     }
3165
3166     flow_return = aggpad->priv->flow_return;
3167     if (flow_return != GST_FLOW_OK) {
3168       GST_OBJECT_UNLOCK (self);
3169       SRC_UNLOCK (self);
3170       goto flushing;
3171     }
3172     GST_DEBUG_OBJECT (aggpad,
3173         "Waiting for buffer to be consumed (chain) before enqueueing %"
3174         GST_PTR_FORMAT, buffer);
3175     GST_OBJECT_UNLOCK (self);
3176     SRC_UNLOCK (self);
3177     PAD_WAIT_EVENT (aggpad);
3178
3179     PAD_UNLOCK (aggpad);
3180   }
3181
3182   if (self->priv->first_buffer) {
3183     GstClockTime start_time;
3184     GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3185
3186     switch (self->priv->start_time_selection) {
3187       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
3188       default:
3189         start_time = 0;
3190         break;
3191       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
3192         GST_OBJECT_LOCK (aggpad);
3193         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
3194           start_time = buf_pts;
3195           if (start_time != -1) {
3196             start_time = MAX (start_time, aggpad->priv->head_segment.start);
3197             start_time =
3198                 gst_segment_to_running_time (&aggpad->priv->head_segment,
3199                 GST_FORMAT_TIME, start_time);
3200           }
3201         } else {
3202           start_time = 0;
3203           GST_WARNING_OBJECT (aggpad,
3204               "Ignoring request of selecting the first start time "
3205               "as the segment is a %s segment instead of a time segment",
3206               gst_format_get_name (aggpad->priv->head_segment.format));
3207         }
3208         GST_OBJECT_UNLOCK (aggpad);
3209         break;
3210       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
3211         start_time = self->priv->start_time;
3212         if (start_time == -1)
3213           start_time = 0;
3214         break;
3215     }
3216
3217     if (start_time != -1) {
3218       if (srcpad->segment.position == -1)
3219         srcpad->segment.position = start_time;
3220       else
3221         srcpad->segment.position = MIN (start_time, srcpad->segment.position);
3222
3223       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
3224           GST_TIME_ARGS (start_time));
3225     }
3226   }
3227
3228   PAD_UNLOCK (aggpad);
3229   GST_OBJECT_UNLOCK (self);
3230   SRC_UNLOCK (self);
3231
3232   GST_TRACE_OBJECT (aggpad, "Done chaining");
3233
3234   return flow_return;
3235
3236 flushing:
3237   PAD_UNLOCK (aggpad);
3238
3239   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
3240       gst_flow_get_name (flow_return));
3241   if (buffer)
3242     gst_buffer_unref (buffer);
3243
3244   return flow_return;
3245 }
3246
3247 static GstFlowReturn
3248 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
3249 {
3250   GstFlowReturn ret;
3251   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3252
3253   GST_TRACE_OBJECT (aggpad, "entering chain");
3254
3255   PAD_FLUSH_LOCK (aggpad);
3256
3257   ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
3258       aggpad, buffer, TRUE);
3259
3260   PAD_FLUSH_UNLOCK (aggpad);
3261
3262   return ret;
3263 }
3264
3265 static gboolean
3266 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
3267     GstQuery * query)
3268 {
3269   GstAggregator *self = GST_AGGREGATOR (parent);
3270   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
3271   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3272
3273   g_assert (klass->sink_query_pre_queue);
3274   return klass->sink_query_pre_queue (self, aggpad, query);
3275 }
3276
3277 static GstFlowReturn
3278 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
3279     GstEvent * event)
3280 {
3281   GstAggregator *self = GST_AGGREGATOR (parent);
3282   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
3283   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3284
3285   g_assert (klass->sink_event_pre_queue);
3286   return klass->sink_event_pre_queue (self, aggpad, event);
3287 }
3288
3289 static gboolean
3290 gst_aggregator_pad_activate_mode_func (GstPad * pad,
3291     GstObject * parent, GstPadMode mode, gboolean active)
3292 {
3293   GstAggregator *self = GST_AGGREGATOR (parent);
3294   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3295
3296   if (active == FALSE) {
3297     SRC_LOCK (self);
3298     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
3299     SRC_BROADCAST (self);
3300     SRC_UNLOCK (self);
3301   } else {
3302     PAD_LOCK (aggpad);
3303     aggpad->priv->flow_return = GST_FLOW_OK;
3304     PAD_BROADCAST_EVENT (aggpad);
3305     PAD_UNLOCK (aggpad);
3306   }
3307
3308   return TRUE;
3309 }
3310
3311 /***********************************
3312  * GstAggregatorPad implementation  *
3313  ************************************/
3314 G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
3315
3316 #define DEFAULT_PAD_EMIT_SIGNALS FALSE
3317
3318 enum
3319 {
3320   PAD_PROP_0,
3321   PAD_PROP_EMIT_SIGNALS,
3322 };
3323
3324 enum
3325 {
3326   PAD_SIGNAL_BUFFER_CONSUMED,
3327   PAD_LAST_SIGNAL,
3328 };
3329
3330 static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 };
3331
3332 static void
3333 gst_aggregator_pad_constructed (GObject * object)
3334 {
3335   GstPad *pad = GST_PAD (object);
3336
3337   if (GST_PAD_IS_SINK (pad)) {
3338     gst_pad_set_chain_function (pad,
3339         GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
3340     gst_pad_set_event_full_function_full (pad,
3341         GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
3342     gst_pad_set_query_function (pad,
3343         GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
3344     gst_pad_set_activatemode_function (pad,
3345         GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
3346   }
3347 }
3348
3349 static void
3350 gst_aggregator_pad_finalize (GObject * object)
3351 {
3352   GstAggregatorPad *pad = (GstAggregatorPad *) object;
3353
3354   gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
3355   g_cond_clear (&pad->priv->event_cond);
3356   g_mutex_clear (&pad->priv->flush_lock);
3357   g_mutex_clear (&pad->priv->lock);
3358
3359   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
3360 }
3361
3362 static void
3363 gst_aggregator_pad_dispose (GObject * object)
3364 {
3365   GstAggregatorPad *pad = (GstAggregatorPad *) object;
3366
3367   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
3368
3369   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
3370 }
3371
3372 static void
3373 gst_aggregator_pad_set_property (GObject * object, guint prop_id,
3374     const GValue * value, GParamSpec * pspec)
3375 {
3376   GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
3377
3378   switch (prop_id) {
3379     case PAD_PROP_EMIT_SIGNALS:
3380       pad->priv->emit_signals = g_value_get_boolean (value);
3381       break;
3382     default:
3383       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3384       break;
3385   }
3386 }
3387
3388 static void
3389 gst_aggregator_pad_get_property (GObject * object, guint prop_id,
3390     GValue * value, GParamSpec * pspec)
3391 {
3392   GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
3393
3394   switch (prop_id) {
3395     case PAD_PROP_EMIT_SIGNALS:
3396       g_value_set_boolean (value, pad->priv->emit_signals);
3397       break;
3398     default:
3399       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3400       break;
3401   }
3402 }
3403
3404 static void
3405 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
3406 {
3407   GObjectClass *gobject_class = (GObjectClass *) klass;
3408
3409   gobject_class->constructed = gst_aggregator_pad_constructed;
3410   gobject_class->finalize = gst_aggregator_pad_finalize;
3411   gobject_class->dispose = gst_aggregator_pad_dispose;
3412   gobject_class->set_property = gst_aggregator_pad_set_property;
3413   gobject_class->get_property = gst_aggregator_pad_get_property;
3414
3415   /**
3416    * GstAggregatorPad:buffer-consumed:
3417    * @aggregator: The #GstAggregator that emitted the signal
3418    * @buffer: The buffer that was consumed
3419    *
3420    * Signals that a buffer was consumed. As aggregator pads store buffers
3421    * in an internal queue, there is no direct match between input and output
3422    * buffers at any given time. This signal can be useful to forward metas
3423    * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time.
3424    *
3425    * Since: 1.16
3426    */
3427   gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] =
3428       g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass),
3429       G_SIGNAL_RUN_FIRST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_BUFFER);
3430
3431   /**
3432    * GstAggregatorPad:emit-signals:
3433    *
3434    * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed
3435    *
3436    * Since: 1.16
3437    */
3438   g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS,
3439       g_param_spec_boolean ("emit-signals", "Emit signals",
3440           "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
3441           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
3442 }
3443
3444 static void
3445 gst_aggregator_pad_init (GstAggregatorPad * pad)
3446 {
3447   pad->priv = gst_aggregator_pad_get_instance_private (pad);
3448
3449   g_queue_init (&pad->priv->data);
3450   g_cond_init (&pad->priv->event_cond);
3451
3452   g_mutex_init (&pad->priv->flush_lock);
3453   g_mutex_init (&pad->priv->lock);
3454
3455   gst_aggregator_pad_reset_unlocked (pad);
3456   pad->priv->negotiated = FALSE;
3457   pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS;
3458 }
3459
3460 /* Must be called with the PAD_LOCK held */
3461 static void
3462 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer,
3463     gboolean dequeued)
3464 {
3465   if (dequeued)
3466     pad->priv->num_buffers--;
3467
3468   if (buffer && pad->priv->emit_signals) {
3469     g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
3470         0, buffer);
3471   }
3472   PAD_BROADCAST_EVENT (pad);
3473 }
3474
3475 /* Must be called with the PAD_LOCK held */
3476 static void
3477 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
3478 {
3479   GstAggregator *self = NULL;
3480   GstAggregatorClass *aggclass = NULL;
3481   GstBuffer *buffer = NULL;
3482
3483   while (pad->priv->clipped_buffer == NULL &&
3484       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
3485     buffer = g_queue_pop_tail (&pad->priv->data);
3486
3487     apply_buffer (pad, buffer, FALSE);
3488
3489     /* We only take the parent here so that it's not taken if the buffer is
3490      * already clipped or if the queue is empty.
3491      */
3492     if (self == NULL) {
3493       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
3494       if (self == NULL) {
3495         gst_buffer_unref (buffer);
3496         return;
3497       }
3498
3499       aggclass = GST_AGGREGATOR_GET_CLASS (self);
3500     }
3501
3502     if (aggclass->clip) {
3503       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
3504
3505       buffer = aggclass->clip (self, pad, buffer);
3506
3507       if (buffer == NULL) {
3508         gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
3509         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
3510       }
3511     }
3512
3513     pad->priv->clipped_buffer = buffer;
3514   }
3515
3516   if (self)
3517     gst_object_unref (self);
3518 }
3519
3520 /**
3521  * gst_aggregator_pad_pop_buffer:
3522  * @pad: the pad to get buffer from
3523  *
3524  * Steal the ref to the buffer currently queued in @pad.
3525  *
3526  * Returns: (nullable) (transfer full): The buffer in @pad or NULL if no buffer was
3527  *   queued. You should unref the buffer after usage.
3528  */
3529 GstBuffer *
3530 gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
3531 {
3532   GstBuffer *buffer = NULL;
3533
3534   PAD_LOCK (pad);
3535
3536   /* If the subclass has already peeked a buffer, we guarantee
3537    * that it receives the same buffer, no matter if the pad has
3538    * errored out / been flushed in the meantime.
3539    */
3540   if (pad->priv->peeked_buffer) {
3541     buffer = pad->priv->peeked_buffer;
3542     goto done;
3543   }
3544
3545   if (pad->priv->flow_return != GST_FLOW_OK)
3546     goto done;
3547
3548   gst_aggregator_pad_clip_buffer_unlocked (pad);
3549   buffer = pad->priv->clipped_buffer;
3550
3551 done:
3552   if (buffer) {
3553     if (pad->priv->clipped_buffer != NULL) {
3554       /* Here we still hold a reference to both the clipped buffer
3555        * and possibly the peeked buffer, we transfer the first and
3556        * potentially release the second
3557        */
3558       gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
3559       pad->priv->clipped_buffer = NULL;
3560       gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
3561     } else {
3562       /* Here our clipped buffer has already been released, for
3563        * example because of a flush. We thus transfer the reference
3564        * to the peeked buffer to the caller, and we don't decrement
3565        * pad.num_buffers as it has already been done elsewhere
3566        */
3567       gst_aggregator_pad_buffer_consumed (pad, buffer, FALSE);
3568       pad->priv->peeked_buffer = NULL;
3569     }
3570     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
3571   }
3572
3573   PAD_UNLOCK (pad);
3574
3575   return buffer;
3576 }
3577
3578 /**
3579  * gst_aggregator_pad_drop_buffer:
3580  * @pad: the pad where to drop any pending buffer
3581  *
3582  * Drop the buffer currently queued in @pad.
3583  *
3584  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
3585  */
3586 gboolean
3587 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
3588 {
3589   GstBuffer *buf;
3590
3591   buf = gst_aggregator_pad_pop_buffer (pad);
3592
3593   if (buf == NULL)
3594     return FALSE;
3595
3596   gst_buffer_unref (buf);
3597   return TRUE;
3598 }
3599
3600 /**
3601  * gst_aggregator_pad_peek_buffer:
3602  * @pad: the pad to get buffer from
3603  *
3604  * Returns: (nullable) (transfer full): A reference to the buffer in @pad or
3605  * NULL if no buffer was queued. You should unref the buffer after
3606  * usage.
3607  */
3608 GstBuffer *
3609 gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
3610 {
3611   GstBuffer *buffer = NULL;
3612
3613   PAD_LOCK (pad);
3614
3615   if (pad->priv->peeked_buffer) {
3616     buffer = gst_buffer_ref (pad->priv->peeked_buffer);
3617     goto done;
3618   }
3619
3620   if (pad->priv->flow_return != GST_FLOW_OK)
3621     goto done;
3622
3623   gst_aggregator_pad_clip_buffer_unlocked (pad);
3624
3625   if (pad->priv->clipped_buffer) {
3626     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3627     pad->priv->peeked_buffer = gst_buffer_ref (buffer);
3628   } else {
3629     buffer = NULL;
3630   }
3631
3632 done:
3633   PAD_UNLOCK (pad);
3634   return buffer;
3635 }
3636
3637 /**
3638  * gst_aggregator_pad_has_buffer:
3639  * @pad: the pad to check the buffer on
3640  *
3641  * This checks if a pad has a buffer available that will be returned by
3642  * a call to gst_aggregator_pad_peek_buffer() or
3643  * gst_aggregator_pad_pop_buffer().
3644  *
3645  * Returns: %TRUE if the pad has a buffer available as the next thing.
3646  *
3647  * Since: 1.14.1
3648  */
3649 gboolean
3650 gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
3651 {
3652   gboolean has_buffer;
3653
3654   PAD_LOCK (pad);
3655
3656   if (pad->priv->peeked_buffer) {
3657     has_buffer = TRUE;
3658   } else {
3659     gst_aggregator_pad_clip_buffer_unlocked (pad);
3660     has_buffer = (pad->priv->clipped_buffer != NULL);
3661     if (has_buffer)
3662       pad->priv->peeked_buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3663   }
3664   PAD_UNLOCK (pad);
3665
3666   return has_buffer;
3667 }
3668
3669 /**
3670  * gst_aggregator_pad_is_eos:
3671  * @pad: an aggregator pad
3672  *
3673  * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
3674  */
3675 gboolean
3676 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
3677 {
3678   gboolean is_eos;
3679
3680   PAD_LOCK (pad);
3681   is_eos = pad->priv->eos;
3682   PAD_UNLOCK (pad);
3683
3684   return is_eos;
3685 }
3686
3687 /**
3688  * gst_aggregator_pad_is_inactive:
3689  * @pad: an aggregator pad
3690  *
3691  * It is only valid to call this method from #GstAggregatorClass::aggregate()
3692  *
3693  * Returns: %TRUE if the pad is inactive, %FALSE otherwise.
3694  *   See gst_aggregator_ignore_inactive_pads() for more info.
3695  * Since: 1.20
3696  */
3697 gboolean
3698 gst_aggregator_pad_is_inactive (GstAggregatorPad * pad)
3699 {
3700   GstAggregator *self;
3701   gboolean inactive;
3702
3703   self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
3704
3705   g_assert_nonnull (self);
3706
3707   PAD_LOCK (pad);
3708   inactive = self->priv->ignore_inactive_pads && is_live_unlocked (self)
3709       && pad->priv->first_buffer;
3710   PAD_UNLOCK (pad);
3711
3712   gst_object_unref (self);
3713
3714   return inactive;
3715 }
3716
3717 #if 0
3718 /*
3719  * gst_aggregator_merge_tags:
3720  * @self: a #GstAggregator
3721  * @tags: a #GstTagList to merge
3722  * @mode: the #GstTagMergeMode to use
3723  *
3724  * Adds tags to so-called pending tags, which will be processed
3725  * before pushing out data downstream.
3726  *
3727  * Note that this is provided for convenience, and the subclass is
3728  * not required to use this and can still do tag handling on its own.
3729  *
3730  * MT safe.
3731  */
3732 void
3733 gst_aggregator_merge_tags (GstAggregator * self,
3734     const GstTagList * tags, GstTagMergeMode mode)
3735 {
3736   GstTagList *otags;
3737
3738   g_return_if_fail (GST_IS_AGGREGATOR (self));
3739   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3740
3741   /* FIXME Check if we can use OBJECT lock here! */
3742   GST_OBJECT_LOCK (self);
3743   if (tags)
3744     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3745   otags = self->priv->tags;
3746   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3747   if (otags)
3748     gst_tag_list_unref (otags);
3749   self->priv->tags_changed = TRUE;
3750   GST_OBJECT_UNLOCK (self);
3751 }
3752 #endif
3753
3754 /**
3755  * gst_aggregator_set_latency:
3756  * @self: a #GstAggregator
3757  * @min_latency: minimum latency
3758  * @max_latency: maximum latency
3759  *
3760  * Lets #GstAggregator sub-classes tell the baseclass what their internal
3761  * latency is. Will also post a LATENCY message on the bus so the pipeline
3762  * can reconfigure its global latency if the values changed.
3763  */
3764 void
3765 gst_aggregator_set_latency (GstAggregator * self,
3766     GstClockTime min_latency, GstClockTime max_latency)
3767 {
3768   gboolean changed = FALSE;
3769
3770   g_return_if_fail (GST_IS_AGGREGATOR (self));
3771   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3772   g_return_if_fail (max_latency >= min_latency);
3773
3774   SRC_LOCK (self);
3775   if (self->priv->sub_latency_min != min_latency) {
3776     self->priv->sub_latency_min = min_latency;
3777     changed = TRUE;
3778   }
3779   if (self->priv->sub_latency_max != max_latency) {
3780     self->priv->sub_latency_max = max_latency;
3781     changed = TRUE;
3782   }
3783   if (!self->priv->posted_latency_msg) {
3784     self->priv->posted_latency_msg = TRUE;
3785     changed = TRUE;
3786   }
3787
3788   if (changed)
3789     SRC_BROADCAST (self);
3790   SRC_UNLOCK (self);
3791
3792   if (changed) {
3793     gst_element_post_message (GST_ELEMENT_CAST (self),
3794         gst_message_new_latency (GST_OBJECT_CAST (self)));
3795   }
3796 }
3797
3798 /**
3799  * gst_aggregator_get_buffer_pool:
3800  * @self: a #GstAggregator
3801  *
3802  * Returns: (transfer full) (nullable): the instance of the #GstBufferPool used
3803  * by @trans; free it after use it
3804  */
3805 GstBufferPool *
3806 gst_aggregator_get_buffer_pool (GstAggregator * self)
3807 {
3808   GstBufferPool *pool;
3809
3810   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3811
3812   GST_OBJECT_LOCK (self);
3813   pool = self->priv->pool;
3814   if (pool)
3815     gst_object_ref (pool);
3816   GST_OBJECT_UNLOCK (self);
3817
3818   return pool;
3819 }
3820
3821 /**
3822  * gst_aggregator_get_allocator:
3823  * @self: a #GstAggregator
3824  * @allocator: (out) (optional) (nullable) (transfer full): the #GstAllocator
3825  * used
3826  * @params: (out caller-allocates) (optional): the
3827  * #GstAllocationParams of @allocator
3828  *
3829  * Lets #GstAggregator sub-classes get the memory @allocator
3830  * acquired by the base class and its @params.
3831  *
3832  * Unref the @allocator after use it.
3833  */
3834 void
3835 gst_aggregator_get_allocator (GstAggregator * self,
3836     GstAllocator ** allocator, GstAllocationParams * params)
3837 {
3838   g_return_if_fail (GST_IS_AGGREGATOR (self));
3839
3840   if (allocator)
3841     *allocator = self->priv->allocator ?
3842         gst_object_ref (self->priv->allocator) : NULL;
3843
3844   if (params)
3845     *params = self->priv->allocation_params;
3846 }
3847
3848 /**
3849  * gst_aggregator_simple_get_next_time:
3850  * @self: A #GstAggregator
3851  *
3852  * This is a simple #GstAggregatorClass::get_next_time implementation that
3853  * just looks at the #GstSegment on the srcpad of the aggregator and bases
3854  * the next time on the running time there.
3855  *
3856  * This is the desired behaviour in most cases where you have a live source
3857  * and you have a dead line based aggregator subclass.
3858  *
3859  * Returns: The running time based on the position
3860  *
3861  * Since: 1.16
3862  */
3863 GstClockTime
3864 gst_aggregator_simple_get_next_time (GstAggregator * self)
3865 {
3866   GstClockTime next_time;
3867   GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3868   GstSegment *segment = &srcpad->segment;
3869
3870   GST_OBJECT_LOCK (self);
3871   if (segment->position == -1 || segment->position < segment->start)
3872     next_time = segment->start;
3873   else
3874     next_time = segment->position;
3875
3876   if (segment->stop != -1 && next_time > segment->stop)
3877     next_time = segment->stop;
3878
3879   next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
3880   GST_OBJECT_UNLOCK (self);
3881
3882   return next_time;
3883 }
3884
3885 /**
3886  * gst_aggregator_update_segment:
3887  *
3888  * Subclasses should use this to update the segment on their
3889  * source pad, instead of directly pushing new segment events
3890  * downstream.
3891  *
3892  * Subclasses MUST call this before gst_aggregator_selected_samples(),
3893  * if it is used at all.
3894  *
3895  * Since: 1.18
3896  */
3897 void
3898 gst_aggregator_update_segment (GstAggregator * self, const GstSegment * segment)
3899 {
3900   g_return_if_fail (GST_IS_AGGREGATOR (self));
3901   g_return_if_fail (segment != NULL);
3902
3903   GST_INFO_OBJECT (self, "Updating srcpad segment: %" GST_SEGMENT_FORMAT,
3904       segment);
3905
3906   GST_OBJECT_LOCK (self);
3907   GST_AGGREGATOR_PAD (self->srcpad)->segment = *segment;
3908   self->priv->send_segment = TRUE;
3909   /* we have a segment from the subclass now and really shouldn't override
3910    * anything in that segment anymore, like the segment.position */
3911   self->priv->first_buffer = FALSE;
3912   GST_OBJECT_UNLOCK (self);
3913 }
3914
3915 /**
3916  * gst_aggregator_selected_samples:
3917  * @pts: The presentation timestamp of the next output buffer
3918  * @dts: The decoding timestamp of the next output buffer
3919  * @duration: The duration of the next output buffer
3920  * @info: (nullable): a #GstStructure containing additional information
3921  *
3922  * Subclasses should call this when they have prepared the
3923  * buffers they will aggregate for each of their sink pads, but
3924  * before using any of the properties of the pads that govern
3925  * *how* aggregation should be performed, for example z-index
3926  * for video aggregators.
3927  *
3928  * If gst_aggregator_update_segment() is used by the subclass,
3929  * it MUST be called before gst_aggregator_selected_samples().
3930  *
3931  * This function MUST only be called from the #GstAggregatorClass::aggregate()
3932  * function.
3933  *
3934  * Since: 1.18
3935  */
3936 void
3937 gst_aggregator_selected_samples (GstAggregator * self,
3938     GstClockTime pts, GstClockTime dts, GstClockTime duration,
3939     GstStructure * info)
3940 {
3941   g_return_if_fail (GST_IS_AGGREGATOR (self));
3942
3943   if (self->priv->emit_signals) {
3944     g_signal_emit (self, gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED], 0,
3945         &GST_AGGREGATOR_PAD (self->srcpad)->segment, pts, dts, duration, info);
3946   }
3947
3948   self->priv->selected_samples_called_or_warned = TRUE;
3949 }
3950
3951 /**
3952  * gst_aggregator_set_ignore_inactive_pads:
3953  * @ignore: whether inactive pads should not be waited on
3954  *
3955  * Subclasses should call this when they don't want to time out
3956  * waiting for a pad that hasn't yet received any buffers in live
3957  * mode.
3958  *
3959  * #GstAggregator will still wait once on each newly-added pad, making
3960  * sure upstream has had a fair chance to start up.
3961  *
3962  * Since: 1.20
3963  */
3964 void
3965 gst_aggregator_set_ignore_inactive_pads (GstAggregator * self, gboolean ignore)
3966 {
3967   g_return_if_fail (GST_IS_AGGREGATOR (self));
3968
3969   GST_OBJECT_LOCK (self);
3970   self->priv->ignore_inactive_pads = ignore;
3971   GST_OBJECT_UNLOCK (self);
3972 }
3973
3974 /**
3975  * gst_aggregator_get_ignore_inactive_pads:
3976  *
3977  * Returns: whether inactive pads will not be waited on
3978  * Since: 1.20
3979  */
3980 gboolean
3981 gst_aggregator_get_ignore_inactive_pads (GstAggregator * self)
3982 {
3983   gboolean ret;
3984
3985   g_return_val_if_fail (GST_IS_AGGREGATOR (self), FALSE);
3986
3987   GST_OBJECT_LOCK (self);
3988   ret = self->priv->ignore_inactive_pads;
3989   GST_OBJECT_UNLOCK (self);
3990
3991   return ret;
3992 }
3993
3994 /**
3995  * gst_aggregator_get_force_live:
3996  *
3997  * Subclasses may use the return value to inform whether they should return
3998  * %GST_FLOW_EOS from their aggregate implementation.
3999  *
4000  * Returns: whether live status was forced on @self.
4001  *
4002  * Since: 1.22
4003  */
4004 gboolean
4005 gst_aggregator_get_force_live (GstAggregator * self)
4006 {
4007   return self->priv->force_live;
4008 }
4009
4010 /**
4011  * gst_aggregator_set_force_live:
4012  *
4013  * Subclasses should call this at construction time in order for @self to
4014  * aggregate on a timeout even when no live source is connected.
4015  *
4016  * Since: 1.22
4017  */
4018 void
4019 gst_aggregator_set_force_live (GstAggregator * self, gboolean force_live)
4020 {
4021   self->priv->force_live = force_live;
4022 }