aggregator: Don't leak peer pad of inactive pads when (not) forwarding QoS events...
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: Base class for mixers and muxers, manages a set of input
26  *     pads and aggregates their streams
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *
35  *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36  *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
37  *    Subclasses should not overwrite those, but instead implement
38  *    #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
39  *    needed.
40  *
41  *  * When data is queued on all pads, the aggregate vmethod is called.
42  *
43  *  * One can peek at the data on any given GstAggregatorPad with the
44  *    gst_aggregator_pad_peek_buffer () method, and remove it from the pad
45  *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
46  *    has been taken with pop_buffer (), a new buffer can be queued
47  *    on that pad.
48  *
49  *  * If the subclass wishes to push a buffer downstream in its aggregate
50  *    implementation, it should do so through the
51  *    gst_aggregator_finish_buffer () method. This method will take care
52  *    of sending and ordering mandatory events such as stream start, caps
53  *    and segment.
54  *
55  *  * Same goes for EOS events, which should not be pushed directly by the
56  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
57  *    implementation.
58  *
59  *  * Note that the aggregator logic regarding gap event handling is to turn
60  *    these into gap buffers with matching PTS and duration. It will also
61  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
62  *    to ease their identification and subsequent processing.
63  *
64  *  * Subclasses must use (a subclass of) #GstAggregatorPad for both their
65  *    sink and source pads.
66  *    See gst_element_class_add_static_pad_template_with_gtype().
67  *
68  * This class used to live in gst-plugins-bad and was moved to core.
69  *
70  * Since: 1.14
71  */
72
73 /**
74  * SECTION: gstaggregatorpad
75  * @title: GstAggregatorPad
76  * @short_description: #GstPad subclass for pads managed by #GstAggregator
77  * @see_also: gstcollectpads for historical reasons.
78  *
79  * Pads managed by a #GstAggregor subclass.
80  *
81  * This class used to live in gst-plugins-bad and was moved to core.
82  *
83  * Since: 1.14
84  */
85
86 #ifdef HAVE_CONFIG_H
87 #  include "config.h"
88 #endif
89
90 #include <string.h>             /* strlen */
91
92 #include "gstaggregator.h"
93
94 typedef enum
95 {
96   GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
97   GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
98   GST_AGGREGATOR_START_TIME_SELECTION_SET
99 } GstAggregatorStartTimeSelection;
100
101 static GType
102 gst_aggregator_start_time_selection_get_type (void)
103 {
104   static GType gtype = 0;
105
106   if (gtype == 0) {
107     static const GEnumValue values[] = {
108       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
109           "Start at 0 running time (default)", "zero"},
110       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
111           "Start at first observed input running time", "first"},
112       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
113           "Set start time with start-time property", "set"},
114       {0, NULL, NULL}
115     };
116
117     gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
118   }
119   return gtype;
120 }
121
122 /*  Might become API */
123 #if 0
124 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
125     const GstTagList * tags, GstTagMergeMode mode);
126 #endif
127 static void gst_aggregator_set_latency_property (GstAggregator * agg,
128     GstClockTime latency);
129 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
130
131 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
132
133 static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
134
135 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
136 #define GST_CAT_DEFAULT aggregator_debug
137
138 /* Locking order, locks in this element must always be taken in this order
139  *
140  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
141  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
142  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
143  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
144  * standard element object lock -> GST_OBJECT_LOCK(agg)
145  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
146  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
147  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
148  */
149
150 /* GstAggregatorPad definitions */
151 #define PAD_LOCK(pad)   G_STMT_START {                                  \
152   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
153         g_thread_self());                                               \
154   g_mutex_lock(&pad->priv->lock);                                       \
155   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
156         g_thread_self());                                               \
157   } G_STMT_END
158
159 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
160   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
161       g_thread_self());                                                 \
162   g_mutex_unlock(&pad->priv->lock);                                     \
163   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
164         g_thread_self());                                               \
165   } G_STMT_END
166
167
168 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
169   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
170         g_thread_self());                                               \
171   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
172       (&((GstAggregatorPad*)pad)->priv->lock));                         \
173   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
174         g_thread_self());                                               \
175   } G_STMT_END
176
177 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
178   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
179         g_thread_self());                                              \
180   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
181   } G_STMT_END
182
183
184 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
185   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
186         g_thread_self());                                               \
187   g_mutex_lock(&pad->priv->flush_lock);                                 \
188   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
189         g_thread_self());                                               \
190   } G_STMT_END
191
192 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
193   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
194         g_thread_self());                                               \
195   g_mutex_unlock(&pad->priv->flush_lock);                               \
196   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
197         g_thread_self());                                               \
198   } G_STMT_END
199
200 #define SRC_LOCK(self)   G_STMT_START {                             \
201   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
202       g_thread_self());                                             \
203   g_mutex_lock(&self->priv->src_lock);                              \
204   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
205         g_thread_self());                                           \
206   } G_STMT_END
207
208 #define SRC_UNLOCK(self)  G_STMT_START {                            \
209   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
210         g_thread_self());                                           \
211   g_mutex_unlock(&self->priv->src_lock);                            \
212   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
213         g_thread_self());                                           \
214   } G_STMT_END
215
216 #define SRC_WAIT(self) G_STMT_START {                               \
217   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
218         g_thread_self());                                           \
219   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
220   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
221         g_thread_self());                                           \
222   } G_STMT_END
223
224 #define SRC_BROADCAST(self) G_STMT_START {                          \
225     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
226         g_thread_self());                                           \
227     if (self->priv->aggregate_id)                                   \
228       gst_clock_id_unschedule (self->priv->aggregate_id);           \
229     g_cond_broadcast(&(self->priv->src_cond));                      \
230   } G_STMT_END
231
232 struct _GstAggregatorPadPrivate
233 {
234   /* Following fields are protected by the PAD_LOCK */
235   GstFlowReturn flow_return;
236   gboolean pending_flush_start;
237   gboolean pending_flush_stop;
238
239   gboolean first_buffer;
240
241   GQueue data;                  /* buffers, events and queries */
242   GstBuffer *clipped_buffer;
243   guint num_buffers;
244
245   /* used to track fill state of queues, only used with live-src and when
246    * latency property is set to > 0 */
247   GstClockTime head_position;
248   GstClockTime tail_position;
249   GstClockTime head_time;       /* running time */
250   GstClockTime tail_time;
251   GstClockTime time_level;      /* how much head is ahead of tail */
252   GstSegment head_segment;      /* segment before the queue */
253
254   gboolean negotiated;
255
256   gboolean eos;
257
258   GMutex lock;
259   GCond event_cond;
260   /* This lock prevents a flush start processing happening while
261    * the chain function is also happening.
262    */
263   GMutex flush_lock;
264 };
265
266 /* Must be called with PAD_LOCK held */
267 static void
268 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
269 {
270   aggpad->priv->eos = FALSE;
271   aggpad->priv->flow_return = GST_FLOW_OK;
272   GST_OBJECT_LOCK (aggpad);
273   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
274   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
275   GST_OBJECT_UNLOCK (aggpad);
276   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
277   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
278   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
279   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
280   aggpad->priv->time_level = 0;
281   aggpad->priv->first_buffer = TRUE;
282 }
283
284 static gboolean
285 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
286 {
287   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
288
289   PAD_LOCK (aggpad);
290   gst_aggregator_pad_reset_unlocked (aggpad);
291   PAD_UNLOCK (aggpad);
292
293   if (klass->flush)
294     return klass->flush (aggpad, agg);
295
296   return TRUE;
297 }
298
299 /*************************************
300  * GstAggregator implementation  *
301  *************************************/
302 static GstElementClass *aggregator_parent_class = NULL;
303 static gint aggregator_private_offset = 0;
304
305 /* All members are protected by the object lock unless otherwise noted */
306
307 struct _GstAggregatorPrivate
308 {
309   gint max_padserial;
310
311   /* Our state is >= PAUSED */
312   gboolean running;             /* protected by src_lock */
313
314   /* seqnum from seek or segment,
315    * to be applied to synthetic segment/eos events */
316   gint seqnum;
317   gboolean send_stream_start;   /* protected by srcpad stream lock */
318   gboolean send_segment;
319   gboolean flush_seeking;
320   gboolean pending_flush_start;
321   gboolean send_eos;            /* protected by srcpad stream lock */
322
323   GstCaps *srccaps;             /* protected by the srcpad stream lock */
324
325   GstTagList *tags;
326   gboolean tags_changed;
327
328   gboolean peer_latency_live;   /* protected by src_lock */
329   GstClockTime peer_latency_min;        /* protected by src_lock */
330   GstClockTime peer_latency_max;        /* protected by src_lock */
331   gboolean has_peer_latency;    /* protected by src_lock */
332
333   GstClockTime sub_latency_min; /* protected by src_lock */
334   GstClockTime sub_latency_max; /* protected by src_lock */
335
336   /* aggregate */
337   GstClockID aggregate_id;      /* protected by src_lock */
338   GMutex src_lock;
339   GCond src_cond;
340
341   gboolean first_buffer;        /* protected by object lock */
342   GstAggregatorStartTimeSelection start_time_selection;
343   GstClockTime start_time;
344
345   /* protected by the object lock */
346   GstQuery *allocation_query;
347   GstAllocator *allocator;
348   GstBufferPool *pool;
349   GstAllocationParams allocation_params;
350
351   /* properties */
352   gint64 latency;               /* protected by both src_lock and all pad locks */
353 };
354
355 /* Seek event forwarding helper */
356 typedef struct
357 {
358   /* parameters */
359   GstEvent *event;
360   gboolean flush;
361   gboolean only_to_active_pads;
362
363   /* results */
364   gboolean result;
365   gboolean one_actually_seeked;
366 } EventData;
367
368 #define DEFAULT_LATENCY              0
369 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
370 #define DEFAULT_START_TIME           (-1)
371
372 enum
373 {
374   PROP_0,
375   PROP_LATENCY,
376   PROP_START_TIME_SELECTION,
377   PROP_START_TIME,
378   PROP_LAST
379 };
380
381 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
382     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
383
384 static gboolean
385 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
386 {
387   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
388       pad->priv->clipped_buffer == NULL);
389 }
390
391 static gboolean
392 gst_aggregator_check_pads_ready (GstAggregator * self)
393 {
394   GstAggregatorPad *pad = NULL;
395   GList *l, *sinkpads;
396   gboolean have_buffer = TRUE;
397   gboolean have_event_or_query = FALSE;
398
399   GST_LOG_OBJECT (self, "checking pads");
400
401   GST_OBJECT_LOCK (self);
402
403   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
404   if (sinkpads == NULL)
405     goto no_sinkpads;
406
407   for (l = sinkpads; l != NULL; l = l->next) {
408     pad = l->data;
409
410     PAD_LOCK (pad);
411
412     if (pad->priv->num_buffers == 0) {
413       if (!gst_aggregator_pad_queue_is_empty (pad))
414         have_event_or_query = TRUE;
415       if (!pad->priv->eos) {
416         have_buffer = FALSE;
417
418         /* If not live we need data on all pads, so leave the loop */
419         if (!self->priv->peer_latency_live) {
420           PAD_UNLOCK (pad);
421           goto pad_not_ready;
422         }
423       }
424     } else if (self->priv->peer_latency_live) {
425       /* In live mode, having a single pad with buffers is enough to
426        * generate a start time from it. In non-live mode all pads need
427        * to have a buffer
428        */
429       self->priv->first_buffer = FALSE;
430     }
431
432     PAD_UNLOCK (pad);
433   }
434
435   if (!have_buffer && !have_event_or_query)
436     goto pad_not_ready;
437
438   if (have_buffer)
439     self->priv->first_buffer = FALSE;
440
441   GST_OBJECT_UNLOCK (self);
442   GST_LOG_OBJECT (self, "pads are ready");
443   return TRUE;
444
445 no_sinkpads:
446   {
447     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
448     GST_OBJECT_UNLOCK (self);
449     return FALSE;
450   }
451 pad_not_ready:
452   {
453     if (have_event_or_query)
454       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
455           " but waking up for serialized event");
456     else
457       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
458     GST_OBJECT_UNLOCK (self);
459     return have_event_or_query;
460   }
461 }
462
463 static void
464 gst_aggregator_reset_flow_values (GstAggregator * self)
465 {
466   GST_OBJECT_LOCK (self);
467   self->priv->send_stream_start = TRUE;
468   self->priv->send_segment = TRUE;
469   gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
470       GST_FORMAT_TIME);
471   self->priv->first_buffer = TRUE;
472   GST_OBJECT_UNLOCK (self);
473 }
474
475 static inline void
476 gst_aggregator_push_mandatory_events (GstAggregator * self)
477 {
478   GstAggregatorPrivate *priv = self->priv;
479   GstEvent *segment = NULL;
480   GstEvent *tags = NULL;
481
482   if (self->priv->send_stream_start) {
483     gchar s_id[32];
484
485     GST_INFO_OBJECT (self, "pushing stream start");
486     /* stream-start (FIXME: create id based on input ids) */
487     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
488     if (!gst_pad_push_event (GST_PAD (self->srcpad),
489             gst_event_new_stream_start (s_id))) {
490       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
491     }
492     self->priv->send_stream_start = FALSE;
493   }
494
495   if (self->priv->srccaps) {
496
497     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
498         self->priv->srccaps);
499     if (!gst_pad_push_event (GST_PAD (self->srcpad),
500             gst_event_new_caps (self->priv->srccaps))) {
501       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
502     }
503     gst_caps_unref (self->priv->srccaps);
504     self->priv->srccaps = NULL;
505   }
506
507   GST_OBJECT_LOCK (self);
508   if (self->priv->send_segment && !self->priv->flush_seeking) {
509     segment =
510         gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
511
512     if (!self->priv->seqnum)
513       /* This code-path is in preparation to be able to run without a source
514        * connected. Then we won't have a seq-num from a segment event. */
515       self->priv->seqnum = gst_event_get_seqnum (segment);
516     else
517       gst_event_set_seqnum (segment, self->priv->seqnum);
518     self->priv->send_segment = FALSE;
519
520     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
521   }
522
523   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
524     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
525     priv->tags_changed = FALSE;
526   }
527   GST_OBJECT_UNLOCK (self);
528
529   if (segment)
530     gst_pad_push_event (self->srcpad, segment);
531   if (tags)
532     gst_pad_push_event (self->srcpad, tags);
533
534 }
535
536 /**
537  * gst_aggregator_set_src_caps:
538  * @self: The #GstAggregator
539  * @caps: The #GstCaps to set on the src pad.
540  *
541  * Sets the caps to be used on the src pad.
542  */
543 void
544 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
545 {
546   GST_PAD_STREAM_LOCK (self->srcpad);
547   gst_caps_replace (&self->priv->srccaps, caps);
548   gst_aggregator_push_mandatory_events (self);
549   GST_PAD_STREAM_UNLOCK (self->srcpad);
550 }
551
552 static GstFlowReturn
553 gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
554 {
555   gst_aggregator_push_mandatory_events (self);
556
557   GST_OBJECT_LOCK (self);
558   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
559     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
560     GST_OBJECT_UNLOCK (self);
561     return gst_pad_push (self->srcpad, buffer);
562   } else {
563     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
564         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
565     GST_OBJECT_UNLOCK (self);
566     gst_buffer_unref (buffer);
567     return GST_FLOW_OK;
568   }
569 }
570
571 /**
572  * gst_aggregator_finish_buffer:
573  * @aggregator: The #GstAggregator
574  * @buffer: (transfer full): the #GstBuffer to push.
575  *
576  * This method will push the provided output buffer downstream. If needed,
577  * mandatory events such as stream-start, caps, and segment events will be
578  * sent before pushing the buffer.
579  */
580 GstFlowReturn
581 gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
582 {
583   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
584
585   g_assert (klass->finish_buffer != NULL);
586
587   return klass->finish_buffer (aggregator, buffer);
588 }
589
590 static void
591 gst_aggregator_push_eos (GstAggregator * self)
592 {
593   GstEvent *event;
594   gst_aggregator_push_mandatory_events (self);
595
596   event = gst_event_new_eos ();
597
598   GST_OBJECT_LOCK (self);
599   self->priv->send_eos = FALSE;
600   gst_event_set_seqnum (event, self->priv->seqnum);
601   GST_OBJECT_UNLOCK (self);
602
603   gst_pad_push_event (self->srcpad, event);
604 }
605
606 static GstClockTime
607 gst_aggregator_get_next_time (GstAggregator * self)
608 {
609   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
610
611   if (klass->get_next_time)
612     return klass->get_next_time (self);
613
614   return GST_CLOCK_TIME_NONE;
615 }
616
617 static gboolean
618 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
619 {
620   GstClockTime latency;
621   GstClockTime start;
622   gboolean res;
623
624   *timeout = FALSE;
625
626   SRC_LOCK (self);
627
628   latency = gst_aggregator_get_latency_unlocked (self);
629
630   if (gst_aggregator_check_pads_ready (self)) {
631     GST_DEBUG_OBJECT (self, "all pads have data");
632     SRC_UNLOCK (self);
633
634     return TRUE;
635   }
636
637   /* Before waiting, check if we're actually still running */
638   if (!self->priv->running || !self->priv->send_eos) {
639     SRC_UNLOCK (self);
640
641     return FALSE;
642   }
643
644   start = gst_aggregator_get_next_time (self);
645
646   /* If we're not live, or if we use the running time
647    * of the first buffer as start time, we wait until
648    * all pads have buffers.
649    * Otherwise (i.e. if we are live!), we wait on the clock
650    * and if a pad does not have a buffer in time we ignore
651    * that pad.
652    */
653   GST_OBJECT_LOCK (self);
654   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
655       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
656       !GST_CLOCK_TIME_IS_VALID (start) ||
657       (self->priv->first_buffer
658           && self->priv->start_time_selection ==
659           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
660     /* We wake up here when something happened, and below
661      * then check if we're ready now. If we return FALSE,
662      * we will be directly called again.
663      */
664     GST_OBJECT_UNLOCK (self);
665     SRC_WAIT (self);
666   } else {
667     GstClockTime base_time, time;
668     GstClock *clock;
669     GstClockReturn status;
670     GstClockTimeDiff jitter;
671
672     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
673         GST_TIME_ARGS (start));
674
675     base_time = GST_ELEMENT_CAST (self)->base_time;
676     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
677     GST_OBJECT_UNLOCK (self);
678
679     time = base_time + start;
680     time += latency;
681
682     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
683         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
684         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
685         GST_TIME_ARGS (time),
686         GST_TIME_ARGS (base_time),
687         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
688         GST_TIME_ARGS (gst_clock_get_time (clock)));
689
690     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
691     gst_object_unref (clock);
692     SRC_UNLOCK (self);
693
694     jitter = 0;
695     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
696
697     SRC_LOCK (self);
698     if (self->priv->aggregate_id) {
699       gst_clock_id_unref (self->priv->aggregate_id);
700       self->priv->aggregate_id = NULL;
701     }
702
703     GST_DEBUG_OBJECT (self,
704         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
705         status, GST_STIME_ARGS (jitter));
706
707     /* we timed out */
708     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
709       SRC_UNLOCK (self);
710       *timeout = TRUE;
711       return TRUE;
712     }
713   }
714
715   res = gst_aggregator_check_pads_ready (self);
716   SRC_UNLOCK (self);
717
718   return res;
719 }
720
721 static gboolean
722 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
723     gpointer user_data)
724 {
725   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
726   GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
727   GstEvent *event = NULL;
728   GstQuery *query = NULL;
729   GstAggregatorClass *klass = NULL;
730   gboolean *processed_event = user_data;
731
732   do {
733     event = NULL;
734     query = NULL;
735
736     PAD_LOCK (pad);
737     if (pad->priv->clipped_buffer == NULL &&
738         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
739       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
740         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
741       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
742         query = g_queue_peek_tail (&pad->priv->data);
743     }
744     PAD_UNLOCK (pad);
745     if (event || query) {
746       gboolean ret;
747
748       if (processed_event)
749         *processed_event = TRUE;
750       if (klass == NULL)
751         klass = GST_AGGREGATOR_GET_CLASS (self);
752
753       if (event) {
754         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
755         gst_event_ref (event);
756         ret = klass->sink_event (aggregator, pad, event);
757
758         PAD_LOCK (pad);
759         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
760           pad->priv->negotiated = ret;
761         if (g_queue_peek_tail (&pad->priv->data) == event)
762           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
763         gst_event_unref (event);
764       } else if (query) {
765         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
766         ret = klass->sink_query (aggregator, pad, query);
767
768         PAD_LOCK (pad);
769         if (g_queue_peek_tail (&pad->priv->data) == query) {
770           GstStructure *s;
771
772           s = gst_query_writable_structure (query);
773           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
774               NULL);
775           g_queue_pop_tail (&pad->priv->data);
776         }
777       }
778
779       PAD_BROADCAST_EVENT (pad);
780       PAD_UNLOCK (pad);
781     }
782   } while (event || query);
783
784   return TRUE;
785 }
786
787 static gboolean
788 gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
789     gpointer user_data)
790 {
791   GList *item;
792   GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
793   GstAggregator *agg = (GstAggregator *) self;
794   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
795
796   if (!klass->skip_buffer)
797     return FALSE;
798
799   PAD_LOCK (aggpad);
800
801   item = g_queue_peek_head_link (&aggpad->priv->data);
802   while (item) {
803     GList *next = item->next;
804
805     if (GST_IS_BUFFER (item->data)
806         && klass->skip_buffer (aggpad, agg, item->data)) {
807       GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
808       gst_aggregator_pad_buffer_consumed (aggpad);
809       gst_buffer_unref (item->data);
810       g_queue_delete_link (&aggpad->priv->data, item);
811     } else {
812       break;
813     }
814
815     item = next;
816   }
817
818   PAD_UNLOCK (aggpad);
819
820   return TRUE;
821 }
822
823 static void
824 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
825     GstFlowReturn flow_return, gboolean full)
826 {
827   GList *item;
828
829   PAD_LOCK (aggpad);
830   if (flow_return == GST_FLOW_NOT_LINKED)
831     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
832   else
833     aggpad->priv->flow_return = flow_return;
834
835   item = g_queue_peek_head_link (&aggpad->priv->data);
836   while (item) {
837     GList *next = item->next;
838
839     /* In partial flush, we do like the pad, we get rid of non-sticky events
840      * and EOS/SEGMENT.
841      */
842     if (full || GST_IS_BUFFER (item->data) ||
843         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
844         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
845         !GST_EVENT_IS_STICKY (item->data)) {
846       if (!GST_IS_QUERY (item->data))
847         gst_mini_object_unref (item->data);
848       g_queue_delete_link (&aggpad->priv->data, item);
849     }
850     item = next;
851   }
852   aggpad->priv->num_buffers = 0;
853   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
854
855   PAD_BROADCAST_EVENT (aggpad);
856   PAD_UNLOCK (aggpad);
857 }
858
859 static GstFlowReturn
860 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
861     GstCaps ** ret)
862 {
863   *ret = gst_caps_ref (caps);
864
865   return GST_FLOW_OK;
866 }
867
868 static GstCaps *
869 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
870 {
871   caps = gst_caps_fixate (caps);
872
873   return caps;
874 }
875
876 static gboolean
877 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
878 {
879   return TRUE;
880 }
881
882
883 /* takes ownership of the pool, allocator and query */
884 static gboolean
885 gst_aggregator_set_allocation (GstAggregator * self,
886     GstBufferPool * pool, GstAllocator * allocator,
887     GstAllocationParams * params, GstQuery * query)
888 {
889   GstAllocator *oldalloc;
890   GstBufferPool *oldpool;
891   GstQuery *oldquery;
892
893   GST_DEBUG ("storing allocation query");
894
895   GST_OBJECT_LOCK (self);
896   oldpool = self->priv->pool;
897   self->priv->pool = pool;
898
899   oldalloc = self->priv->allocator;
900   self->priv->allocator = allocator;
901
902   oldquery = self->priv->allocation_query;
903   self->priv->allocation_query = query;
904
905   if (params)
906     self->priv->allocation_params = *params;
907   else
908     gst_allocation_params_init (&self->priv->allocation_params);
909   GST_OBJECT_UNLOCK (self);
910
911   if (oldpool) {
912     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
913     gst_buffer_pool_set_active (oldpool, FALSE);
914     gst_object_unref (oldpool);
915   }
916   if (oldalloc) {
917     gst_object_unref (oldalloc);
918   }
919   if (oldquery) {
920     gst_query_unref (oldquery);
921   }
922   return TRUE;
923 }
924
925
926 static gboolean
927 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
928 {
929   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
930
931   if (aggclass->decide_allocation)
932     if (!aggclass->decide_allocation (self, query))
933       return FALSE;
934
935   return TRUE;
936 }
937
938 static gboolean
939 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
940 {
941   GstQuery *query;
942   gboolean result = TRUE;
943   GstBufferPool *pool = NULL;
944   GstAllocator *allocator;
945   GstAllocationParams params;
946
947   /* find a pool for the negotiated caps now */
948   GST_DEBUG_OBJECT (self, "doing allocation query");
949   query = gst_query_new_allocation (caps, TRUE);
950   if (!gst_pad_peer_query (self->srcpad, query)) {
951     /* not a problem, just debug a little */
952     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
953   }
954
955   GST_DEBUG_OBJECT (self, "calling decide_allocation");
956   result = gst_aggregator_decide_allocation (self, query);
957
958   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
959       query);
960
961   if (!result)
962     goto no_decide_allocation;
963
964   /* we got configuration from our peer or the decide_allocation method,
965    * parse them */
966   if (gst_query_get_n_allocation_params (query) > 0) {
967     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
968   } else {
969     allocator = NULL;
970     gst_allocation_params_init (&params);
971   }
972
973   if (gst_query_get_n_allocation_pools (query) > 0)
974     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
975
976   /* now store */
977   result =
978       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
979
980   return result;
981
982   /* Errors */
983 no_decide_allocation:
984   {
985     GST_WARNING_OBJECT (self, "Failed to decide allocation");
986     gst_query_unref (query);
987
988     return result;
989   }
990
991 }
992
993 /* WITH SRC_LOCK held */
994 static GstFlowReturn
995 gst_aggregator_update_src_caps (GstAggregator * self)
996 {
997   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
998   GstCaps *downstream_caps, *template_caps, *caps = NULL;
999   GstFlowReturn ret = GST_FLOW_OK;
1000
1001   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1002   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1003
1004   if (gst_caps_is_empty (downstream_caps)) {
1005     GST_INFO_OBJECT (self, "Downstream caps (%"
1006         GST_PTR_FORMAT ") not compatible with pad template caps (%"
1007         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1008     ret = GST_FLOW_NOT_NEGOTIATED;
1009     goto done;
1010   }
1011
1012   g_assert (agg_klass->update_src_caps);
1013   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1014       downstream_caps);
1015   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1016   if (ret < GST_FLOW_OK) {
1017     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1018     goto done;
1019   }
1020   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1021     ret = GST_FLOW_NOT_NEGOTIATED;
1022     goto done;
1023   }
1024   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1025
1026 #ifdef GST_ENABLE_EXTRA_CHECKS
1027   if (!gst_caps_is_subset (caps, template_caps)) {
1028     GstCaps *intersection;
1029
1030     GST_ERROR_OBJECT (self,
1031         "update_src_caps returned caps %" GST_PTR_FORMAT
1032         " which are not a real subset of the template caps %"
1033         GST_PTR_FORMAT, caps, template_caps);
1034     g_warning ("%s: update_src_caps returned caps which are not a real "
1035         "subset of the filter caps", GST_ELEMENT_NAME (self));
1036
1037     intersection =
1038         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1039     gst_caps_unref (caps);
1040     caps = intersection;
1041   }
1042 #endif
1043
1044   if (gst_caps_is_any (caps)) {
1045     goto done;
1046   }
1047
1048   if (!gst_caps_is_fixed (caps)) {
1049     g_assert (agg_klass->fixate_src_caps);
1050
1051     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1052     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1053       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1054       ret = GST_FLOW_NOT_NEGOTIATED;
1055       goto done;
1056     }
1057     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1058   }
1059
1060   if (agg_klass->negotiated_src_caps) {
1061     if (!agg_klass->negotiated_src_caps (self, caps)) {
1062       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1063       ret = GST_FLOW_NOT_NEGOTIATED;
1064       goto done;
1065     }
1066   }
1067
1068   gst_aggregator_set_src_caps (self, caps);
1069
1070   if (!gst_aggregator_do_allocation (self, caps)) {
1071     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1072     ret = GST_FLOW_NOT_NEGOTIATED;
1073   }
1074
1075 done:
1076   gst_caps_unref (downstream_caps);
1077   gst_caps_unref (template_caps);
1078
1079   if (caps)
1080     gst_caps_unref (caps);
1081
1082   return ret;
1083 }
1084
1085 static void
1086 gst_aggregator_aggregate_func (GstAggregator * self)
1087 {
1088   GstAggregatorPrivate *priv = self->priv;
1089   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1090   gboolean timeout = FALSE;
1091
1092   if (self->priv->running == FALSE) {
1093     GST_DEBUG_OBJECT (self, "Not running anymore");
1094     return;
1095   }
1096
1097   GST_LOG_OBJECT (self, "Checking aggregate");
1098   while (priv->send_eos && priv->running) {
1099     GstFlowReturn flow_return = GST_FLOW_OK;
1100     gboolean processed_event = FALSE;
1101
1102     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1103         gst_aggregator_do_events_and_queries, NULL);
1104
1105     if (self->priv->peer_latency_live)
1106       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1107           gst_aggregator_pad_skip_buffers, NULL);
1108
1109     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
1110      * the queue */
1111     if (!gst_aggregator_wait_and_check (self, &timeout))
1112       continue;
1113
1114     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1115         gst_aggregator_do_events_and_queries, &processed_event);
1116
1117     if (processed_event)
1118       continue;
1119
1120     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1121       flow_return = gst_aggregator_update_src_caps (self);
1122       if (flow_return != GST_FLOW_OK)
1123         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1124     }
1125
1126     if (timeout || flow_return >= GST_FLOW_OK) {
1127       GST_TRACE_OBJECT (self, "Actually aggregating!");
1128       flow_return = klass->aggregate (self, timeout);
1129     }
1130
1131     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1132       continue;
1133
1134     GST_OBJECT_LOCK (self);
1135     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
1136       /* We don't want to set the pads to flushing, but we want to
1137        * stop the thread, so just break here */
1138       GST_OBJECT_UNLOCK (self);
1139       break;
1140     }
1141     GST_OBJECT_UNLOCK (self);
1142
1143     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1144       gst_aggregator_push_eos (self);
1145     }
1146
1147     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1148
1149     if (flow_return != GST_FLOW_OK) {
1150       GList *item;
1151
1152       GST_OBJECT_LOCK (self);
1153       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1154         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1155
1156         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1157       }
1158       GST_OBJECT_UNLOCK (self);
1159       break;
1160     }
1161   }
1162
1163   /* Pause the task here, the only ways to get here are:
1164    * 1) We're stopping, in which case the task is stopped anyway
1165    * 2) We got a flow error above, in which case it might take
1166    *    some time to forward the flow return upstream and we
1167    *    would otherwise call the task function over and over
1168    *    again without doing anything
1169    */
1170   gst_pad_pause_task (self->srcpad);
1171 }
1172
1173 static gboolean
1174 gst_aggregator_start (GstAggregator * self)
1175 {
1176   GstAggregatorClass *klass;
1177   gboolean result;
1178
1179   self->priv->send_stream_start = TRUE;
1180   self->priv->send_segment = TRUE;
1181   self->priv->send_eos = TRUE;
1182   self->priv->srccaps = NULL;
1183
1184   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1185
1186   klass = GST_AGGREGATOR_GET_CLASS (self);
1187
1188   if (klass->start)
1189     result = klass->start (self);
1190   else
1191     result = TRUE;
1192
1193   return result;
1194 }
1195
1196 static gboolean
1197 _check_pending_flush_stop (GstAggregatorPad * pad)
1198 {
1199   gboolean res;
1200
1201   PAD_LOCK (pad);
1202   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1203   PAD_UNLOCK (pad);
1204
1205   return res;
1206 }
1207
1208 static gboolean
1209 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1210 {
1211   gboolean res = TRUE;
1212
1213   GST_INFO_OBJECT (self, "%s srcpad task",
1214       flush_start ? "Pausing" : "Stopping");
1215
1216   SRC_LOCK (self);
1217   self->priv->running = FALSE;
1218   SRC_BROADCAST (self);
1219   SRC_UNLOCK (self);
1220
1221   if (flush_start) {
1222     res = gst_pad_push_event (self->srcpad, flush_start);
1223   }
1224
1225   gst_pad_stop_task (self->srcpad);
1226
1227   return res;
1228 }
1229
1230 static void
1231 gst_aggregator_start_srcpad_task (GstAggregator * self)
1232 {
1233   GST_INFO_OBJECT (self, "Starting srcpad task");
1234
1235   self->priv->running = TRUE;
1236   gst_pad_start_task (GST_PAD (self->srcpad),
1237       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1238 }
1239
1240 static GstFlowReturn
1241 gst_aggregator_flush (GstAggregator * self)
1242 {
1243   GstFlowReturn ret = GST_FLOW_OK;
1244   GstAggregatorPrivate *priv = self->priv;
1245   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1246
1247   GST_DEBUG_OBJECT (self, "Flushing everything");
1248   GST_OBJECT_LOCK (self);
1249   priv->send_segment = TRUE;
1250   priv->flush_seeking = FALSE;
1251   priv->tags_changed = FALSE;
1252   GST_OBJECT_UNLOCK (self);
1253   if (klass->flush)
1254     ret = klass->flush (self);
1255
1256   return ret;
1257 }
1258
1259
1260 /* Called with GstAggregator's object lock held */
1261
1262 static gboolean
1263 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1264 {
1265   GList *tmp;
1266   GstAggregatorPad *tmppad;
1267
1268   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1269     tmppad = (GstAggregatorPad *) tmp->data;
1270
1271     if (_check_pending_flush_stop (tmppad) == FALSE) {
1272       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1273           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1274       return FALSE;
1275     }
1276   }
1277
1278   return TRUE;
1279 }
1280
1281 static void
1282 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1283     GstEvent * event)
1284 {
1285   GstAggregatorPrivate *priv = self->priv;
1286   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1287
1288   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1289
1290   PAD_FLUSH_LOCK (aggpad);
1291   PAD_LOCK (aggpad);
1292   if (padpriv->pending_flush_start) {
1293     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1294
1295     padpriv->pending_flush_start = FALSE;
1296     padpriv->pending_flush_stop = TRUE;
1297   }
1298   PAD_UNLOCK (aggpad);
1299
1300   GST_OBJECT_LOCK (self);
1301   if (priv->flush_seeking) {
1302     /* If flush_seeking we forward the first FLUSH_START */
1303     if (priv->pending_flush_start) {
1304       priv->pending_flush_start = FALSE;
1305       GST_OBJECT_UNLOCK (self);
1306
1307       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1308       gst_aggregator_stop_srcpad_task (self, event);
1309
1310       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1311       GST_PAD_STREAM_LOCK (self->srcpad);
1312       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1313       event = NULL;
1314     } else {
1315       GST_OBJECT_UNLOCK (self);
1316       gst_event_unref (event);
1317     }
1318   } else {
1319     GST_OBJECT_UNLOCK (self);
1320     gst_event_unref (event);
1321   }
1322   PAD_FLUSH_UNLOCK (aggpad);
1323 }
1324
1325 /* Must be called with the the PAD_LOCK held */
1326 static void
1327 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1328 {
1329   GstAggregatorPadPrivate *priv = aggpad->priv;
1330
1331   if (head) {
1332     if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
1333         priv->head_segment.format == GST_FORMAT_TIME)
1334       priv->head_time = gst_segment_to_running_time (&priv->head_segment,
1335           GST_FORMAT_TIME, priv->head_position);
1336     else
1337       priv->head_time = GST_CLOCK_TIME_NONE;
1338
1339     if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
1340       priv->tail_time = priv->head_time;
1341   } else {
1342     if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
1343         aggpad->segment.format == GST_FORMAT_TIME)
1344       priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
1345           GST_FORMAT_TIME, priv->tail_position);
1346     else
1347       priv->tail_time = priv->head_time;
1348   }
1349
1350   if (priv->head_time == GST_CLOCK_TIME_NONE ||
1351       priv->tail_time == GST_CLOCK_TIME_NONE) {
1352     priv->time_level = 0;
1353     return;
1354   }
1355
1356   if (priv->tail_time > priv->head_time)
1357     priv->time_level = 0;
1358   else
1359     priv->time_level = priv->head_time - priv->tail_time;
1360 }
1361
1362
1363 /* GstAggregator vmethods default implementations */
1364 static gboolean
1365 gst_aggregator_default_sink_event (GstAggregator * self,
1366     GstAggregatorPad * aggpad, GstEvent * event)
1367 {
1368   gboolean res = TRUE;
1369   GstPad *pad = GST_PAD (aggpad);
1370   GstAggregatorPrivate *priv = self->priv;
1371
1372   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1373
1374   switch (GST_EVENT_TYPE (event)) {
1375     case GST_EVENT_FLUSH_START:
1376     {
1377       gst_aggregator_flush_start (self, aggpad, event);
1378       /* We forward only in one case: right after flush_seeking */
1379       event = NULL;
1380       goto eat;
1381     }
1382     case GST_EVENT_FLUSH_STOP:
1383     {
1384       gst_aggregator_pad_flush (aggpad, self);
1385       GST_OBJECT_LOCK (self);
1386       if (priv->flush_seeking) {
1387         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1388         if (gst_aggregator_all_flush_stop_received_locked (self)) {
1389           GST_OBJECT_UNLOCK (self);
1390           /* That means we received FLUSH_STOP/FLUSH_STOP on
1391            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1392           gst_aggregator_flush (self);
1393           gst_pad_push_event (self->srcpad, event);
1394           event = NULL;
1395           SRC_LOCK (self);
1396           priv->send_eos = TRUE;
1397           SRC_BROADCAST (self);
1398           SRC_UNLOCK (self);
1399
1400           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1401           GST_PAD_STREAM_UNLOCK (self->srcpad);
1402           gst_aggregator_start_srcpad_task (self);
1403         } else {
1404           GST_OBJECT_UNLOCK (self);
1405         }
1406       } else {
1407         GST_OBJECT_UNLOCK (self);
1408       }
1409
1410       /* We never forward the event */
1411       goto eat;
1412     }
1413     case GST_EVENT_EOS:
1414     {
1415       SRC_LOCK (self);
1416       PAD_LOCK (aggpad);
1417       g_assert (aggpad->priv->num_buffers == 0);
1418       aggpad->priv->eos = TRUE;
1419       PAD_UNLOCK (aggpad);
1420       SRC_BROADCAST (self);
1421       SRC_UNLOCK (self);
1422       goto eat;
1423     }
1424     case GST_EVENT_SEGMENT:
1425     {
1426       PAD_LOCK (aggpad);
1427       GST_OBJECT_LOCK (aggpad);
1428       gst_event_copy_segment (event, &aggpad->segment);
1429       /* We've got a new segment, tail_position is now meaningless
1430        * and may interfere with the time_level calculation
1431        */
1432       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1433       update_time_level (aggpad, FALSE);
1434       GST_OBJECT_UNLOCK (aggpad);
1435       PAD_UNLOCK (aggpad);
1436
1437       GST_OBJECT_LOCK (self);
1438       self->priv->seqnum = gst_event_get_seqnum (event);
1439       GST_OBJECT_UNLOCK (self);
1440       goto eat;
1441     }
1442     case GST_EVENT_STREAM_START:
1443     {
1444       goto eat;
1445     }
1446     case GST_EVENT_GAP:
1447     {
1448       GstClockTime pts, endpts;
1449       GstClockTime duration;
1450       GstBuffer *gapbuf;
1451
1452       gst_event_parse_gap (event, &pts, &duration);
1453       gapbuf = gst_buffer_new ();
1454
1455       if (GST_CLOCK_TIME_IS_VALID (duration))
1456         endpts = pts + duration;
1457       else
1458         endpts = GST_CLOCK_TIME_NONE;
1459
1460       GST_OBJECT_LOCK (aggpad);
1461       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1462           &pts, &endpts);
1463       GST_OBJECT_UNLOCK (aggpad);
1464
1465       if (!res) {
1466         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1467         goto eat;
1468       }
1469
1470       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1471         duration = endpts - pts;
1472       else
1473         duration = GST_CLOCK_TIME_NONE;
1474
1475       GST_BUFFER_PTS (gapbuf) = pts;
1476       GST_BUFFER_DURATION (gapbuf) = duration;
1477       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1478       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1479
1480       /* Remove GAP event so we can replace it with the buffer */
1481       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1482         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1483
1484       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1485           GST_FLOW_OK) {
1486         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1487         res = FALSE;
1488       }
1489
1490       goto eat;
1491     }
1492     case GST_EVENT_TAG:
1493       goto eat;
1494     default:
1495     {
1496       break;
1497     }
1498   }
1499
1500   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1501   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1502
1503 eat:
1504   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1505   if (event)
1506     gst_event_unref (event);
1507
1508   return res;
1509 }
1510
1511 static gboolean
1512 gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
1513 {
1514   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
1515   GstAggregator *agg = GST_AGGREGATOR_CAST (self);
1516
1517   gst_aggregator_pad_flush (pad, agg);
1518
1519   PAD_LOCK (pad);
1520   pad->priv->flow_return = GST_FLOW_FLUSHING;
1521   pad->priv->negotiated = FALSE;
1522   PAD_BROADCAST_EVENT (pad);
1523   PAD_UNLOCK (pad);
1524
1525   return TRUE;
1526 }
1527
1528 static gboolean
1529 gst_aggregator_stop (GstAggregator * agg)
1530 {
1531   GstAggregatorClass *klass;
1532   gboolean result;
1533
1534   gst_aggregator_reset_flow_values (agg);
1535
1536   /* Application needs to make sure no pads are added while it shuts us down */
1537   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
1538       gst_aggregator_stop_pad, NULL);
1539
1540   klass = GST_AGGREGATOR_GET_CLASS (agg);
1541
1542   if (klass->stop)
1543     result = klass->stop (agg);
1544   else
1545     result = TRUE;
1546
1547   agg->priv->has_peer_latency = FALSE;
1548   agg->priv->peer_latency_live = FALSE;
1549   agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
1550
1551   if (agg->priv->tags)
1552     gst_tag_list_unref (agg->priv->tags);
1553   agg->priv->tags = NULL;
1554
1555   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1556
1557   return result;
1558 }
1559
1560 /* GstElement vmethods implementations */
1561 static GstStateChangeReturn
1562 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1563 {
1564   GstStateChangeReturn ret;
1565   GstAggregator *self = GST_AGGREGATOR (element);
1566
1567   switch (transition) {
1568     case GST_STATE_CHANGE_READY_TO_PAUSED:
1569       if (!gst_aggregator_start (self))
1570         goto error_start;
1571       break;
1572     default:
1573       break;
1574   }
1575
1576   if ((ret =
1577           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1578               transition)) == GST_STATE_CHANGE_FAILURE)
1579     goto failure;
1580
1581
1582   switch (transition) {
1583     case GST_STATE_CHANGE_PAUSED_TO_READY:
1584       if (!gst_aggregator_stop (self)) {
1585         /* What to do in this case? Error out? */
1586         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1587       }
1588       break;
1589     default:
1590       break;
1591   }
1592
1593   return ret;
1594
1595 /* ERRORS */
1596 failure:
1597   {
1598     GST_ERROR_OBJECT (element, "parent failed state change");
1599     return ret;
1600   }
1601 error_start:
1602   {
1603     GST_ERROR_OBJECT (element, "Subclass failed to start");
1604     return GST_STATE_CHANGE_FAILURE;
1605   }
1606 }
1607
1608 static void
1609 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1610 {
1611   GstAggregator *self = GST_AGGREGATOR (element);
1612   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1613
1614   GST_INFO_OBJECT (pad, "Removing pad");
1615
1616   SRC_LOCK (self);
1617   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1618   gst_element_remove_pad (element, pad);
1619
1620   self->priv->has_peer_latency = FALSE;
1621   SRC_BROADCAST (self);
1622   SRC_UNLOCK (self);
1623 }
1624
1625 static GstAggregatorPad *
1626 gst_aggregator_default_create_new_pad (GstAggregator * self,
1627     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1628 {
1629   GstAggregatorPad *agg_pad;
1630   GstAggregatorPrivate *priv = self->priv;
1631   gint serial = 0;
1632   gchar *name = NULL;
1633   GType pad_type =
1634       GST_PAD_TEMPLATE_GTYPE (templ) ==
1635       G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
1636
1637   if (templ->direction != GST_PAD_SINK)
1638     goto not_sink;
1639
1640   if (templ->presence != GST_PAD_REQUEST)
1641     goto not_request;
1642
1643   GST_OBJECT_LOCK (self);
1644   if (req_name == NULL || strlen (req_name) < 6
1645       || !g_str_has_prefix (req_name, "sink_")) {
1646     /* no name given when requesting the pad, use next available int */
1647     serial = ++priv->max_padserial;
1648   } else {
1649     /* parse serial number from requested padname */
1650     serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1651     if (serial > priv->max_padserial)
1652       priv->max_padserial = serial;
1653   }
1654
1655   name = g_strdup_printf ("sink_%u", serial);
1656   agg_pad = g_object_new (pad_type,
1657       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1658   g_free (name);
1659
1660   GST_OBJECT_UNLOCK (self);
1661
1662   return agg_pad;
1663
1664   /* errors */
1665 not_sink:
1666   {
1667     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
1668     return NULL;
1669   }
1670 not_request:
1671   {
1672     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
1673     return NULL;
1674   }
1675 }
1676
1677 static GstPad *
1678 gst_aggregator_request_new_pad (GstElement * element,
1679     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1680 {
1681   GstAggregator *self;
1682   GstAggregatorPad *agg_pad;
1683   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1684   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1685
1686   self = GST_AGGREGATOR (element);
1687
1688   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1689   if (!agg_pad) {
1690     GST_ERROR_OBJECT (element, "Couldn't create new pad");
1691     return NULL;
1692   }
1693
1694   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1695
1696   if (priv->running)
1697     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1698
1699   /* add the pad to the element */
1700   gst_element_add_pad (element, GST_PAD (agg_pad));
1701
1702   return GST_PAD (agg_pad);
1703 }
1704
1705 /* Must be called with SRC_LOCK held */
1706
1707 static gboolean
1708 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1709 {
1710   gboolean query_ret, live;
1711   GstClockTime our_latency, min, max;
1712
1713   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1714
1715   if (!query_ret) {
1716     GST_WARNING_OBJECT (self, "Latency query failed");
1717     return FALSE;
1718   }
1719
1720   gst_query_parse_latency (query, &live, &min, &max);
1721
1722   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1723     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1724         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1725     return FALSE;
1726   }
1727
1728   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1729     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1730         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1731             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1732             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1733     return FALSE;
1734   }
1735
1736   our_latency = self->priv->latency;
1737
1738   self->priv->peer_latency_live = live;
1739   self->priv->peer_latency_min = min;
1740   self->priv->peer_latency_max = max;
1741   self->priv->has_peer_latency = TRUE;
1742
1743   /* add our own */
1744   min += our_latency;
1745   min += self->priv->sub_latency_min;
1746   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1747       && GST_CLOCK_TIME_IS_VALID (max))
1748     max += self->priv->sub_latency_max + our_latency;
1749   else
1750     max = GST_CLOCK_TIME_NONE;
1751
1752   SRC_BROADCAST (self);
1753
1754   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1755       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1756
1757   gst_query_set_latency (query, live, min, max);
1758
1759   return query_ret;
1760 }
1761
1762 /*
1763  * MUST be called with the src_lock held.
1764  *
1765  * See  gst_aggregator_get_latency() for doc
1766  */
1767 static GstClockTime
1768 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1769 {
1770   GstClockTime latency;
1771
1772   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1773
1774   if (!self->priv->has_peer_latency) {
1775     GstQuery *query = gst_query_new_latency ();
1776     gboolean ret;
1777
1778     ret = gst_aggregator_query_latency_unlocked (self, query);
1779     gst_query_unref (query);
1780     if (!ret)
1781       return GST_CLOCK_TIME_NONE;
1782   }
1783
1784   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1785     return GST_CLOCK_TIME_NONE;
1786
1787   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1788   latency = self->priv->peer_latency_min;
1789
1790   /* add our own */
1791   latency += self->priv->latency;
1792   latency += self->priv->sub_latency_min;
1793
1794   return latency;
1795 }
1796
1797 /**
1798  * gst_aggregator_get_latency:
1799  * @self: a #GstAggregator
1800  *
1801  * Retrieves the latency values reported by @self in response to the latency
1802  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1803  * will not wait for the clock.
1804  *
1805  * Typically only called by subclasses.
1806  *
1807  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1808  */
1809 GstClockTime
1810 gst_aggregator_get_latency (GstAggregator * self)
1811 {
1812   GstClockTime ret;
1813
1814   SRC_LOCK (self);
1815   ret = gst_aggregator_get_latency_unlocked (self);
1816   SRC_UNLOCK (self);
1817
1818   return ret;
1819 }
1820
1821 static gboolean
1822 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1823 {
1824   GstAggregator *self = GST_AGGREGATOR (element);
1825
1826   GST_STATE_LOCK (element);
1827   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1828       GST_STATE (element) < GST_STATE_PAUSED) {
1829     gdouble rate;
1830     GstFormat fmt;
1831     GstSeekFlags flags;
1832     GstSeekType start_type, stop_type;
1833     gint64 start, stop;
1834
1835     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1836         &start, &stop_type, &stop);
1837
1838     GST_OBJECT_LOCK (self);
1839     gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
1840         flags, start_type, start, stop_type, stop, NULL);
1841     self->priv->seqnum = gst_event_get_seqnum (event);
1842     self->priv->first_buffer = FALSE;
1843     GST_OBJECT_UNLOCK (self);
1844
1845     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1846   }
1847   GST_STATE_UNLOCK (element);
1848
1849
1850   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1851       event);
1852 }
1853
1854 static gboolean
1855 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1856 {
1857   gboolean res = TRUE;
1858
1859   switch (GST_QUERY_TYPE (query)) {
1860     case GST_QUERY_SEEKING:
1861     {
1862       GstFormat format;
1863
1864       /* don't pass it along as some (file)sink might claim it does
1865        * whereas with a collectpads in between that will not likely work */
1866       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1867       gst_query_set_seeking (query, format, FALSE, 0, -1);
1868       res = TRUE;
1869
1870       break;
1871     }
1872     case GST_QUERY_LATENCY:
1873       SRC_LOCK (self);
1874       res = gst_aggregator_query_latency_unlocked (self, query);
1875       SRC_UNLOCK (self);
1876       break;
1877     default:
1878       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1879   }
1880
1881   return res;
1882 }
1883
1884 static gboolean
1885 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1886 {
1887   EventData *evdata = user_data;
1888   gboolean ret = TRUE;
1889   GstPad *peer = gst_pad_get_peer (pad);
1890   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1891
1892   if (peer) {
1893     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1894       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1895       ret = TRUE;
1896     } else {
1897       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1898       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1899     }
1900   }
1901
1902   if (ret == FALSE) {
1903     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1904       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1905
1906       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1907
1908       if (gst_pad_query (peer, seeking)) {
1909         gboolean seekable;
1910
1911         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1912
1913         if (seekable == FALSE) {
1914           GST_INFO_OBJECT (pad,
1915               "Source not seekable, We failed but it does not matter!");
1916
1917           ret = TRUE;
1918         }
1919       } else {
1920         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1921       }
1922
1923       gst_query_unref (seeking);
1924     }
1925
1926     if (evdata->flush) {
1927       PAD_LOCK (aggpad);
1928       aggpad->priv->pending_flush_start = FALSE;
1929       aggpad->priv->pending_flush_stop = FALSE;
1930       PAD_UNLOCK (aggpad);
1931     }
1932   } else {
1933     evdata->one_actually_seeked = TRUE;
1934   }
1935
1936   evdata->result &= ret;
1937
1938   if (peer)
1939     gst_object_unref (peer);
1940
1941   /* Always send to all pads */
1942   return FALSE;
1943 }
1944
1945 static void
1946 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1947     EventData * evdata)
1948 {
1949   evdata->result = TRUE;
1950   evdata->one_actually_seeked = FALSE;
1951
1952   /* We first need to set all pads as flushing in a first pass
1953    * as flush_start flush_stop is sometimes sent synchronously
1954    * while we send the seek event */
1955   if (evdata->flush) {
1956     GList *l;
1957
1958     GST_OBJECT_LOCK (self);
1959     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1960       GstAggregatorPad *pad = l->data;
1961
1962       PAD_LOCK (pad);
1963       pad->priv->pending_flush_start = TRUE;
1964       pad->priv->pending_flush_stop = FALSE;
1965       PAD_UNLOCK (pad);
1966     }
1967     GST_OBJECT_UNLOCK (self);
1968   }
1969
1970   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
1971
1972   gst_event_unref (evdata->event);
1973 }
1974
1975 static gboolean
1976 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1977 {
1978   gdouble rate;
1979   GstFormat fmt;
1980   GstSeekFlags flags;
1981   GstSeekType start_type, stop_type;
1982   gint64 start, stop;
1983   gboolean flush;
1984   EventData evdata = { 0, };
1985   GstAggregatorPrivate *priv = self->priv;
1986
1987   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1988       &start, &stop_type, &stop);
1989
1990   GST_INFO_OBJECT (self, "starting SEEK");
1991
1992   flush = flags & GST_SEEK_FLAG_FLUSH;
1993
1994   GST_OBJECT_LOCK (self);
1995   if (flush) {
1996     priv->pending_flush_start = TRUE;
1997     priv->flush_seeking = TRUE;
1998   }
1999
2000   gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2001       flags, start_type, start, stop_type, stop, NULL);
2002
2003   /* Seeking sets a position */
2004   self->priv->first_buffer = FALSE;
2005   GST_OBJECT_UNLOCK (self);
2006
2007   /* forward the seek upstream */
2008   evdata.event = event;
2009   evdata.flush = flush;
2010   evdata.only_to_active_pads = FALSE;
2011   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2012   event = NULL;
2013
2014   if (!evdata.result || !evdata.one_actually_seeked) {
2015     GST_OBJECT_LOCK (self);
2016     priv->flush_seeking = FALSE;
2017     priv->pending_flush_start = FALSE;
2018     GST_OBJECT_UNLOCK (self);
2019   }
2020
2021   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2022
2023   return evdata.result;
2024 }
2025
2026 static gboolean
2027 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2028 {
2029   EventData evdata = { 0, };
2030
2031   switch (GST_EVENT_TYPE (event)) {
2032     case GST_EVENT_SEEK:
2033       /* _do_seek() unrefs the event. */
2034       return gst_aggregator_do_seek (self, event);
2035     case GST_EVENT_NAVIGATION:
2036       /* navigation is rather pointless. */
2037       gst_event_unref (event);
2038       return FALSE;
2039     default:
2040       break;
2041   }
2042
2043   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2044    * they will receive a QOS event that has earliest_time=0 (because we can't
2045    * have negative timestamps), and consider their buffer as too late */
2046   evdata.event = event;
2047   evdata.flush = FALSE;
2048   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2049   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2050   return evdata.result;
2051 }
2052
2053 static gboolean
2054 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2055     GstEvent * event)
2056 {
2057   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2058
2059   return klass->src_event (GST_AGGREGATOR (parent), event);
2060 }
2061
2062 static gboolean
2063 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2064     GstQuery * query)
2065 {
2066   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2067
2068   return klass->src_query (GST_AGGREGATOR (parent), query);
2069 }
2070
2071 static gboolean
2072 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2073     GstObject * parent, GstPadMode mode, gboolean active)
2074 {
2075   GstAggregator *self = GST_AGGREGATOR (parent);
2076   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2077
2078   if (klass->src_activate) {
2079     if (klass->src_activate (self, mode, active) == FALSE) {
2080       return FALSE;
2081     }
2082   }
2083
2084   if (active == TRUE) {
2085     switch (mode) {
2086       case GST_PAD_MODE_PUSH:
2087       {
2088         GST_INFO_OBJECT (pad, "Activating pad!");
2089         gst_aggregator_start_srcpad_task (self);
2090         return TRUE;
2091       }
2092       default:
2093       {
2094         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2095         return FALSE;
2096       }
2097     }
2098   }
2099
2100   /* deactivating */
2101   GST_INFO_OBJECT (self, "Deactivating srcpad");
2102   gst_aggregator_stop_srcpad_task (self, FALSE);
2103
2104   return TRUE;
2105 }
2106
2107 static gboolean
2108 gst_aggregator_default_sink_query (GstAggregator * self,
2109     GstAggregatorPad * aggpad, GstQuery * query)
2110 {
2111   GstPad *pad = GST_PAD (aggpad);
2112
2113   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2114     GstQuery *decide_query = NULL;
2115     GstAggregatorClass *agg_class;
2116     gboolean ret;
2117
2118     GST_OBJECT_LOCK (self);
2119     PAD_LOCK (aggpad);
2120     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2121       GST_DEBUG_OBJECT (self,
2122           "not negotiated yet, can't answer ALLOCATION query");
2123       PAD_UNLOCK (aggpad);
2124       GST_OBJECT_UNLOCK (self);
2125
2126       return FALSE;
2127     }
2128
2129     if ((decide_query = self->priv->allocation_query))
2130       gst_query_ref (decide_query);
2131     PAD_UNLOCK (aggpad);
2132     GST_OBJECT_UNLOCK (self);
2133
2134     GST_DEBUG_OBJECT (self,
2135         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2136
2137     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2138
2139     /* pass the query to the propose_allocation vmethod if any */
2140     if (agg_class->propose_allocation)
2141       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2142     else
2143       ret = FALSE;
2144
2145     if (decide_query)
2146       gst_query_unref (decide_query);
2147
2148     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2149     return ret;
2150   }
2151
2152   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2153 }
2154
2155 static void
2156 gst_aggregator_finalize (GObject * object)
2157 {
2158   GstAggregator *self = (GstAggregator *) object;
2159
2160   g_mutex_clear (&self->priv->src_lock);
2161   g_cond_clear (&self->priv->src_cond);
2162
2163   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2164 }
2165
2166 /*
2167  * gst_aggregator_set_latency_property:
2168  * @agg: a #GstAggregator
2169  * @latency: the new latency value (in nanoseconds).
2170  *
2171  * Sets the new latency value to @latency. This value is used to limit the
2172  * amount of time a pad waits for data to appear before considering the pad
2173  * as unresponsive.
2174  */
2175 static void
2176 gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
2177 {
2178   gboolean changed;
2179
2180   g_return_if_fail (GST_IS_AGGREGATOR (self));
2181   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2182
2183   SRC_LOCK (self);
2184   changed = (self->priv->latency != latency);
2185
2186   if (changed) {
2187     GList *item;
2188
2189     GST_OBJECT_LOCK (self);
2190     /* First lock all the pads */
2191     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2192       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2193       PAD_LOCK (aggpad);
2194     }
2195
2196     self->priv->latency = latency;
2197
2198     SRC_BROADCAST (self);
2199
2200     /* Now wake up the pads */
2201     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2202       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2203       PAD_BROADCAST_EVENT (aggpad);
2204       PAD_UNLOCK (aggpad);
2205     }
2206     GST_OBJECT_UNLOCK (self);
2207   }
2208
2209   SRC_UNLOCK (self);
2210
2211   if (changed)
2212     gst_element_post_message (GST_ELEMENT_CAST (self),
2213         gst_message_new_latency (GST_OBJECT_CAST (self)));
2214 }
2215
2216 /*
2217  * gst_aggregator_get_latency_property:
2218  * @agg: a #GstAggregator
2219  *
2220  * Gets the latency value. See gst_aggregator_set_latency for
2221  * more details.
2222  *
2223  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2224  * before a pad is deemed unresponsive. A value of -1 means an
2225  * unlimited time.
2226  */
2227 static GstClockTime
2228 gst_aggregator_get_latency_property (GstAggregator * agg)
2229 {
2230   GstClockTime res;
2231
2232   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
2233
2234   GST_OBJECT_LOCK (agg);
2235   res = agg->priv->latency;
2236   GST_OBJECT_UNLOCK (agg);
2237
2238   return res;
2239 }
2240
2241 static void
2242 gst_aggregator_set_property (GObject * object, guint prop_id,
2243     const GValue * value, GParamSpec * pspec)
2244 {
2245   GstAggregator *agg = GST_AGGREGATOR (object);
2246
2247   switch (prop_id) {
2248     case PROP_LATENCY:
2249       gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
2250       break;
2251     case PROP_START_TIME_SELECTION:
2252       agg->priv->start_time_selection = g_value_get_enum (value);
2253       break;
2254     case PROP_START_TIME:
2255       agg->priv->start_time = g_value_get_uint64 (value);
2256       break;
2257     default:
2258       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2259       break;
2260   }
2261 }
2262
2263 static void
2264 gst_aggregator_get_property (GObject * object, guint prop_id,
2265     GValue * value, GParamSpec * pspec)
2266 {
2267   GstAggregator *agg = GST_AGGREGATOR (object);
2268
2269   switch (prop_id) {
2270     case PROP_LATENCY:
2271       g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
2272       break;
2273     case PROP_START_TIME_SELECTION:
2274       g_value_set_enum (value, agg->priv->start_time_selection);
2275       break;
2276     case PROP_START_TIME:
2277       g_value_set_uint64 (value, agg->priv->start_time);
2278       break;
2279     default:
2280       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2281       break;
2282   }
2283 }
2284
2285 /* GObject vmethods implementations */
2286 static void
2287 gst_aggregator_class_init (GstAggregatorClass * klass)
2288 {
2289   GObjectClass *gobject_class = (GObjectClass *) klass;
2290   GstElementClass *gstelement_class = (GstElementClass *) klass;
2291
2292   aggregator_parent_class = g_type_class_peek_parent (klass);
2293
2294   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2295       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2296
2297   if (aggregator_private_offset != 0)
2298     g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
2299
2300   klass->finish_buffer = gst_aggregator_default_finish_buffer;
2301
2302   klass->sink_event = gst_aggregator_default_sink_event;
2303   klass->sink_query = gst_aggregator_default_sink_query;
2304
2305   klass->src_event = gst_aggregator_default_src_event;
2306   klass->src_query = gst_aggregator_default_src_query;
2307
2308   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2309   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2310   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2311   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2312
2313   gstelement_class->request_new_pad =
2314       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2315   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2316   gstelement_class->release_pad =
2317       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2318   gstelement_class->change_state =
2319       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2320
2321   gobject_class->set_property = gst_aggregator_set_property;
2322   gobject_class->get_property = gst_aggregator_get_property;
2323   gobject_class->finalize = gst_aggregator_finalize;
2324
2325   g_object_class_install_property (gobject_class, PROP_LATENCY,
2326       g_param_spec_uint64 ("latency", "Buffer latency",
2327           "Additional latency in live mode to allow upstream "
2328           "to take longer to produce buffers for the current "
2329           "position (in nanoseconds)", 0, G_MAXUINT64,
2330           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2331
2332   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2333       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2334           "Decides which start time is output",
2335           gst_aggregator_start_time_selection_get_type (),
2336           DEFAULT_START_TIME_SELECTION,
2337           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2338
2339   g_object_class_install_property (gobject_class, PROP_START_TIME,
2340       g_param_spec_uint64 ("start-time", "Start Time",
2341           "Start time to use if start-time-selection=set", 0,
2342           G_MAXUINT64,
2343           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2344 }
2345
2346 static inline gpointer
2347 gst_aggregator_get_instance_private (GstAggregator * self)
2348 {
2349   return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
2350 }
2351
2352 static void
2353 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2354 {
2355   GstPadTemplate *pad_template;
2356   GstAggregatorPrivate *priv;
2357
2358   g_return_if_fail (klass->aggregate != NULL);
2359
2360   self->priv = gst_aggregator_get_instance_private (self);
2361
2362   priv = self->priv;
2363
2364   pad_template =
2365       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2366   g_return_if_fail (pad_template != NULL);
2367
2368   priv->max_padserial = -1;
2369   priv->tags_changed = FALSE;
2370
2371   self->priv->peer_latency_live = FALSE;
2372   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2373   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2374   self->priv->has_peer_latency = FALSE;
2375
2376   self->srcpad = gst_pad_new_from_template (pad_template, "src");
2377
2378   gst_aggregator_reset_flow_values (self);
2379
2380   gst_pad_set_event_function (self->srcpad,
2381       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2382   gst_pad_set_query_function (self->srcpad,
2383       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2384   gst_pad_set_activatemode_function (self->srcpad,
2385       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2386
2387   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2388
2389   self->priv->latency = DEFAULT_LATENCY;
2390   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2391   self->priv->start_time = DEFAULT_START_TIME;
2392
2393   g_mutex_init (&self->priv->src_lock);
2394   g_cond_init (&self->priv->src_cond);
2395 }
2396
2397 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2398  * method to get to the padtemplates */
2399 GType
2400 gst_aggregator_get_type (void)
2401 {
2402   static volatile gsize type = 0;
2403
2404   if (g_once_init_enter (&type)) {
2405     GType _type;
2406     static const GTypeInfo info = {
2407       sizeof (GstAggregatorClass),
2408       NULL,
2409       NULL,
2410       (GClassInitFunc) gst_aggregator_class_init,
2411       NULL,
2412       NULL,
2413       sizeof (GstAggregator),
2414       0,
2415       (GInstanceInitFunc) gst_aggregator_init,
2416     };
2417
2418     _type = g_type_register_static (GST_TYPE_ELEMENT,
2419         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2420
2421     aggregator_private_offset =
2422         g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
2423
2424     g_once_init_leave (&type, _type);
2425   }
2426   return type;
2427 }
2428
2429 /* Must be called with SRC lock and PAD lock held */
2430 static gboolean
2431 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2432 {
2433   /* Empty queue always has space */
2434   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2435     return TRUE;
2436
2437   /* We also want at least two buffers, one is being processed and one is ready
2438    * for the next iteration when we operate in live mode. */
2439   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2440     return TRUE;
2441
2442   /* zero latency, if there is a buffer, it's full */
2443   if (self->priv->latency == 0)
2444     return FALSE;
2445
2446   /* Allow no more buffers than the latency */
2447   return (aggpad->priv->time_level <= self->priv->latency);
2448 }
2449
2450 /* Must be called with the PAD_LOCK held */
2451 static void
2452 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2453 {
2454   GstClockTime timestamp;
2455
2456   if (GST_BUFFER_DTS_IS_VALID (buffer))
2457     timestamp = GST_BUFFER_DTS (buffer);
2458   else
2459     timestamp = GST_BUFFER_PTS (buffer);
2460
2461   if (timestamp == GST_CLOCK_TIME_NONE) {
2462     if (head)
2463       timestamp = aggpad->priv->head_position;
2464     else
2465       timestamp = aggpad->priv->tail_position;
2466   }
2467
2468   /* add duration */
2469   if (GST_BUFFER_DURATION_IS_VALID (buffer))
2470     timestamp += GST_BUFFER_DURATION (buffer);
2471
2472   if (head)
2473     aggpad->priv->head_position = timestamp;
2474   else
2475     aggpad->priv->tail_position = timestamp;
2476
2477   update_time_level (aggpad, head);
2478 }
2479
2480 /*
2481  * Can be called either from the sinkpad's chain function or from the srcpad's
2482  * thread in the case of a buffer synthetized from a GAP event.
2483  * Because of this second case, FLUSH_LOCK can't be used here.
2484  */
2485
2486 static GstFlowReturn
2487 gst_aggregator_pad_chain_internal (GstAggregator * self,
2488     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2489 {
2490   GstFlowReturn flow_return;
2491   GstClockTime buf_pts;
2492
2493   PAD_LOCK (aggpad);
2494   flow_return = aggpad->priv->flow_return;
2495   if (flow_return != GST_FLOW_OK)
2496     goto flushing;
2497
2498   PAD_UNLOCK (aggpad);
2499
2500   buf_pts = GST_BUFFER_PTS (buffer);
2501
2502   for (;;) {
2503     SRC_LOCK (self);
2504     GST_OBJECT_LOCK (self);
2505     PAD_LOCK (aggpad);
2506
2507     if (aggpad->priv->first_buffer) {
2508       self->priv->has_peer_latency = FALSE;
2509       aggpad->priv->first_buffer = FALSE;
2510     }
2511
2512     if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
2513         && aggpad->priv->flow_return == GST_FLOW_OK) {
2514       if (head)
2515         g_queue_push_head (&aggpad->priv->data, buffer);
2516       else
2517         g_queue_push_tail (&aggpad->priv->data, buffer);
2518       apply_buffer (aggpad, buffer, head);
2519       aggpad->priv->num_buffers++;
2520       buffer = NULL;
2521       SRC_BROADCAST (self);
2522       break;
2523     }
2524
2525     flow_return = aggpad->priv->flow_return;
2526     if (flow_return != GST_FLOW_OK) {
2527       GST_OBJECT_UNLOCK (self);
2528       SRC_UNLOCK (self);
2529       goto flushing;
2530     }
2531     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2532     GST_OBJECT_UNLOCK (self);
2533     SRC_UNLOCK (self);
2534     PAD_WAIT_EVENT (aggpad);
2535
2536     PAD_UNLOCK (aggpad);
2537   }
2538
2539   if (self->priv->first_buffer) {
2540     GstClockTime start_time;
2541     GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
2542
2543     switch (self->priv->start_time_selection) {
2544       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2545       default:
2546         start_time = 0;
2547         break;
2548       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2549         GST_OBJECT_LOCK (aggpad);
2550         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2551           start_time = buf_pts;
2552           if (start_time != -1) {
2553             start_time = MAX (start_time, aggpad->priv->head_segment.start);
2554             start_time =
2555                 gst_segment_to_running_time (&aggpad->priv->head_segment,
2556                 GST_FORMAT_TIME, start_time);
2557           }
2558         } else {
2559           start_time = 0;
2560           GST_WARNING_OBJECT (aggpad,
2561               "Ignoring request of selecting the first start time "
2562               "as the segment is a %s segment instead of a time segment",
2563               gst_format_get_name (aggpad->segment.format));
2564         }
2565         GST_OBJECT_UNLOCK (aggpad);
2566         break;
2567       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2568         start_time = self->priv->start_time;
2569         if (start_time == -1)
2570           start_time = 0;
2571         break;
2572     }
2573
2574     if (start_time != -1) {
2575       if (srcpad->segment.position == -1)
2576         srcpad->segment.position = start_time;
2577       else
2578         srcpad->segment.position = MIN (start_time, srcpad->segment.position);
2579
2580       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2581           GST_TIME_ARGS (start_time));
2582     }
2583   }
2584
2585   PAD_UNLOCK (aggpad);
2586   GST_OBJECT_UNLOCK (self);
2587   SRC_UNLOCK (self);
2588
2589   GST_DEBUG_OBJECT (aggpad, "Done chaining");
2590
2591   return flow_return;
2592
2593 flushing:
2594   PAD_UNLOCK (aggpad);
2595
2596   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2597       gst_flow_get_name (flow_return));
2598   if (buffer)
2599     gst_buffer_unref (buffer);
2600
2601   return flow_return;
2602 }
2603
2604 static GstFlowReturn
2605 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2606 {
2607   GstFlowReturn ret;
2608   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2609
2610   PAD_FLUSH_LOCK (aggpad);
2611
2612   ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2613       aggpad, buffer, TRUE);
2614
2615   PAD_FLUSH_UNLOCK (aggpad);
2616
2617   return ret;
2618 }
2619
2620 static gboolean
2621 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2622     GstQuery * query)
2623 {
2624   GstAggregator *self = GST_AGGREGATOR (parent);
2625   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2626
2627   if (GST_QUERY_IS_SERIALIZED (query)) {
2628     GstStructure *s;
2629     gboolean ret = FALSE;
2630
2631     SRC_LOCK (self);
2632     PAD_LOCK (aggpad);
2633
2634     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2635       SRC_UNLOCK (self);
2636       goto flushing;
2637     }
2638
2639     g_queue_push_head (&aggpad->priv->data, query);
2640     SRC_BROADCAST (self);
2641     SRC_UNLOCK (self);
2642
2643     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2644         && aggpad->priv->flow_return == GST_FLOW_OK) {
2645       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2646       PAD_WAIT_EVENT (aggpad);
2647     }
2648
2649     s = gst_query_writable_structure (query);
2650     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2651       gst_structure_remove_field (s, "gst-aggregator-retval");
2652     else
2653       g_queue_remove (&aggpad->priv->data, query);
2654
2655     if (aggpad->priv->flow_return != GST_FLOW_OK)
2656       goto flushing;
2657
2658     PAD_UNLOCK (aggpad);
2659
2660     return ret;
2661   } else {
2662     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2663
2664     return klass->sink_query (self, aggpad, query);
2665   }
2666
2667 flushing:
2668   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2669       gst_flow_get_name (aggpad->priv->flow_return));
2670   PAD_UNLOCK (aggpad);
2671
2672   return FALSE;
2673 }
2674
2675 /* Queue serialized events and let the others go through directly.
2676  * The queued events with be handled from the src-pad task in
2677  * gst_aggregator_do_events_and_queries().
2678  */
2679 static GstFlowReturn
2680 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2681     GstEvent * event)
2682 {
2683   GstFlowReturn ret = GST_FLOW_OK;
2684   GstAggregator *self = GST_AGGREGATOR (parent);
2685   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2686
2687   if (GST_EVENT_IS_SERIALIZED (event)
2688       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2689     SRC_LOCK (self);
2690     PAD_LOCK (aggpad);
2691
2692     if (aggpad->priv->flow_return != GST_FLOW_OK)
2693       goto flushing;
2694
2695     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2696       GST_OBJECT_LOCK (aggpad);
2697       gst_event_copy_segment (event, &aggpad->priv->head_segment);
2698       aggpad->priv->head_position = aggpad->priv->head_segment.position;
2699       update_time_level (aggpad, TRUE);
2700       GST_OBJECT_UNLOCK (aggpad);
2701     }
2702
2703     GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
2704     g_queue_push_head (&aggpad->priv->data, event);
2705     SRC_BROADCAST (self);
2706     PAD_UNLOCK (aggpad);
2707     SRC_UNLOCK (self);
2708   } else {
2709     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2710
2711     if (!klass->sink_event (self, aggpad, event)) {
2712       /* Copied from GstPad to convert boolean to a GstFlowReturn in
2713        * the event handling func */
2714       ret = GST_FLOW_ERROR;
2715     }
2716   }
2717
2718   return ret;
2719
2720 flushing:
2721   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2722       gst_flow_get_name (aggpad->priv->flow_return));
2723   PAD_UNLOCK (aggpad);
2724   SRC_UNLOCK (self);
2725   if (GST_EVENT_IS_STICKY (event))
2726     gst_pad_store_sticky_event (pad, event);
2727   gst_event_unref (event);
2728
2729   return aggpad->priv->flow_return;
2730 }
2731
2732 static gboolean
2733 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2734     GstObject * parent, GstPadMode mode, gboolean active)
2735 {
2736   GstAggregator *self = GST_AGGREGATOR (parent);
2737   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2738
2739   if (active == FALSE) {
2740     SRC_LOCK (self);
2741     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2742     SRC_BROADCAST (self);
2743     SRC_UNLOCK (self);
2744   } else {
2745     PAD_LOCK (aggpad);
2746     aggpad->priv->flow_return = GST_FLOW_OK;
2747     PAD_BROADCAST_EVENT (aggpad);
2748     PAD_UNLOCK (aggpad);
2749   }
2750
2751   return TRUE;
2752 }
2753
2754 /***********************************
2755  * GstAggregatorPad implementation  *
2756  ************************************/
2757 G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2758
2759 static void
2760 gst_aggregator_pad_constructed (GObject * object)
2761 {
2762   GstPad *pad = GST_PAD (object);
2763
2764   if (GST_PAD_IS_SINK (pad)) {
2765     gst_pad_set_chain_function (pad,
2766         GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2767     gst_pad_set_event_full_function_full (pad,
2768         GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2769     gst_pad_set_query_function (pad,
2770         GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2771     gst_pad_set_activatemode_function (pad,
2772         GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2773   }
2774 }
2775
2776 static void
2777 gst_aggregator_pad_finalize (GObject * object)
2778 {
2779   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2780
2781   g_cond_clear (&pad->priv->event_cond);
2782   g_mutex_clear (&pad->priv->flush_lock);
2783   g_mutex_clear (&pad->priv->lock);
2784
2785   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2786 }
2787
2788 static void
2789 gst_aggregator_pad_dispose (GObject * object)
2790 {
2791   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2792
2793   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2794
2795   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2796 }
2797
2798 static void
2799 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2800 {
2801   GObjectClass *gobject_class = (GObjectClass *) klass;
2802
2803   gobject_class->constructed = gst_aggregator_pad_constructed;
2804   gobject_class->finalize = gst_aggregator_pad_finalize;
2805   gobject_class->dispose = gst_aggregator_pad_dispose;
2806 }
2807
2808 static void
2809 gst_aggregator_pad_init (GstAggregatorPad * pad)
2810 {
2811   pad->priv = gst_aggregator_pad_get_instance_private (pad);
2812
2813   g_queue_init (&pad->priv->data);
2814   g_cond_init (&pad->priv->event_cond);
2815
2816   g_mutex_init (&pad->priv->flush_lock);
2817   g_mutex_init (&pad->priv->lock);
2818
2819   gst_aggregator_pad_reset_unlocked (pad);
2820   pad->priv->negotiated = FALSE;
2821 }
2822
2823 /* Must be called with the PAD_LOCK held */
2824 static void
2825 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
2826 {
2827   pad->priv->num_buffers--;
2828   GST_TRACE_OBJECT (pad, "Consuming buffer");
2829   PAD_BROADCAST_EVENT (pad);
2830 }
2831
2832 /* Must be called with the PAD_LOCK held */
2833 static void
2834 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
2835 {
2836   GstAggregator *self = NULL;
2837   GstAggregatorClass *aggclass = NULL;
2838   GstBuffer *buffer = NULL;
2839
2840   while (pad->priv->clipped_buffer == NULL &&
2841       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
2842     buffer = g_queue_pop_tail (&pad->priv->data);
2843
2844     apply_buffer (pad, buffer, FALSE);
2845
2846     /* We only take the parent here so that it's not taken if the buffer is
2847      * already clipped or if the queue is empty.
2848      */
2849     if (self == NULL) {
2850       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
2851       if (self == NULL) {
2852         gst_buffer_unref (buffer);
2853         return;
2854       }
2855
2856       aggclass = GST_AGGREGATOR_GET_CLASS (self);
2857     }
2858
2859     if (aggclass->clip) {
2860       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
2861
2862       buffer = aggclass->clip (self, pad, buffer);
2863
2864       if (buffer == NULL) {
2865         gst_aggregator_pad_buffer_consumed (pad);
2866         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
2867       }
2868     }
2869
2870     pad->priv->clipped_buffer = buffer;
2871   }
2872
2873   if (self)
2874     gst_object_unref (self);
2875 }
2876
2877 /**
2878  * gst_aggregator_pad_pop_buffer:
2879  * @pad: the pad to get buffer from
2880  *
2881  * Steal the ref to the buffer currently queued in @pad.
2882  *
2883  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2884  *   queued. You should unref the buffer after usage.
2885  */
2886 GstBuffer *
2887 gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
2888 {
2889   GstBuffer *buffer;
2890
2891   PAD_LOCK (pad);
2892
2893   gst_aggregator_pad_clip_buffer_unlocked (pad);
2894
2895   buffer = pad->priv->clipped_buffer;
2896
2897   if (buffer) {
2898     pad->priv->clipped_buffer = NULL;
2899     gst_aggregator_pad_buffer_consumed (pad);
2900     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2901   }
2902
2903   PAD_UNLOCK (pad);
2904
2905   return buffer;
2906 }
2907
2908 /**
2909  * gst_aggregator_pad_drop_buffer:
2910  * @pad: the pad where to drop any pending buffer
2911  *
2912  * Drop the buffer currently queued in @pad.
2913  *
2914  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2915  */
2916 gboolean
2917 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2918 {
2919   GstBuffer *buf;
2920
2921   buf = gst_aggregator_pad_pop_buffer (pad);
2922
2923   if (buf == NULL)
2924     return FALSE;
2925
2926   gst_buffer_unref (buf);
2927   return TRUE;
2928 }
2929
2930 /**
2931  * gst_aggregator_pad_peek_buffer:
2932  * @pad: the pad to get buffer from
2933  *
2934  * Returns: (transfer full): A reference to the buffer in @pad or
2935  * NULL if no buffer was queued. You should unref the buffer after
2936  * usage.
2937  */
2938 GstBuffer *
2939 gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
2940 {
2941   GstBuffer *buffer;
2942
2943   PAD_LOCK (pad);
2944
2945   gst_aggregator_pad_clip_buffer_unlocked (pad);
2946
2947   if (pad->priv->clipped_buffer) {
2948     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
2949   } else {
2950     buffer = NULL;
2951   }
2952   PAD_UNLOCK (pad);
2953
2954   return buffer;
2955 }
2956
2957 /**
2958  * gst_aggregator_pad_has_buffer:
2959  * @pad: the pad to check the buffer on
2960  *
2961  * This checks if a pad has a buffer available that will be returned by
2962  * a call to gst_aggregator_pad_peek_buffer() or
2963  * gst_aggregator_pad_pop_buffer().
2964  *
2965  * Returns: %TRUE if the pad has a buffer available as the next thing.
2966  *
2967  * Since: 1.14.1
2968  */
2969 gboolean
2970 gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
2971 {
2972   gboolean has_buffer;
2973
2974   PAD_LOCK (pad);
2975   gst_aggregator_pad_clip_buffer_unlocked (pad);
2976   has_buffer = (pad->priv->clipped_buffer != NULL);
2977   PAD_UNLOCK (pad);
2978
2979   return has_buffer;
2980 }
2981
2982 /**
2983  * gst_aggregator_pad_is_eos:
2984  * @pad: an aggregator pad
2985  *
2986  * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
2987  */
2988 gboolean
2989 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2990 {
2991   gboolean is_eos;
2992
2993   PAD_LOCK (pad);
2994   is_eos = pad->priv->eos;
2995   PAD_UNLOCK (pad);
2996
2997   return is_eos;
2998 }
2999
3000 #if 0
3001 /*
3002  * gst_aggregator_merge_tags:
3003  * @self: a #GstAggregator
3004  * @tags: a #GstTagList to merge
3005  * @mode: the #GstTagMergeMode to use
3006  *
3007  * Adds tags to so-called pending tags, which will be processed
3008  * before pushing out data downstream.
3009  *
3010  * Note that this is provided for convenience, and the subclass is
3011  * not required to use this and can still do tag handling on its own.
3012  *
3013  * MT safe.
3014  */
3015 void
3016 gst_aggregator_merge_tags (GstAggregator * self,
3017     const GstTagList * tags, GstTagMergeMode mode)
3018 {
3019   GstTagList *otags;
3020
3021   g_return_if_fail (GST_IS_AGGREGATOR (self));
3022   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3023
3024   /* FIXME Check if we can use OBJECT lock here! */
3025   GST_OBJECT_LOCK (self);
3026   if (tags)
3027     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3028   otags = self->priv->tags;
3029   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3030   if (otags)
3031     gst_tag_list_unref (otags);
3032   self->priv->tags_changed = TRUE;
3033   GST_OBJECT_UNLOCK (self);
3034 }
3035 #endif
3036
3037 /**
3038  * gst_aggregator_set_latency:
3039  * @self: a #GstAggregator
3040  * @min_latency: minimum latency
3041  * @max_latency: maximum latency
3042  *
3043  * Lets #GstAggregator sub-classes tell the baseclass what their internal
3044  * latency is. Will also post a LATENCY message on the bus so the pipeline
3045  * can reconfigure its global latency.
3046  */
3047 void
3048 gst_aggregator_set_latency (GstAggregator * self,
3049     GstClockTime min_latency, GstClockTime max_latency)
3050 {
3051   gboolean changed = FALSE;
3052
3053   g_return_if_fail (GST_IS_AGGREGATOR (self));
3054   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3055   g_return_if_fail (max_latency >= min_latency);
3056
3057   SRC_LOCK (self);
3058   if (self->priv->sub_latency_min != min_latency) {
3059     self->priv->sub_latency_min = min_latency;
3060     changed = TRUE;
3061   }
3062   if (self->priv->sub_latency_max != max_latency) {
3063     self->priv->sub_latency_max = max_latency;
3064     changed = TRUE;
3065   }
3066
3067   if (changed)
3068     SRC_BROADCAST (self);
3069   SRC_UNLOCK (self);
3070
3071   if (changed) {
3072     gst_element_post_message (GST_ELEMENT_CAST (self),
3073         gst_message_new_latency (GST_OBJECT_CAST (self)));
3074   }
3075 }
3076
3077 /**
3078  * gst_aggregator_get_buffer_pool:
3079  * @self: a #GstAggregator
3080  *
3081  * Returns: (transfer full): the instance of the #GstBufferPool used
3082  * by @trans; free it after use it
3083  */
3084 GstBufferPool *
3085 gst_aggregator_get_buffer_pool (GstAggregator * self)
3086 {
3087   GstBufferPool *pool;
3088
3089   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3090
3091   GST_OBJECT_LOCK (self);
3092   pool = self->priv->pool;
3093   if (pool)
3094     gst_object_ref (pool);
3095   GST_OBJECT_UNLOCK (self);
3096
3097   return pool;
3098 }
3099
3100 /**
3101  * gst_aggregator_get_allocator:
3102  * @self: a #GstAggregator
3103  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3104  * used
3105  * @params: (out) (allow-none) (transfer full): the
3106  * #GstAllocationParams of @allocator
3107  *
3108  * Lets #GstAggregator sub-classes get the memory @allocator
3109  * acquired by the base class and its @params.
3110  *
3111  * Unref the @allocator after use it.
3112  */
3113 void
3114 gst_aggregator_get_allocator (GstAggregator * self,
3115     GstAllocator ** allocator, GstAllocationParams * params)
3116 {
3117   g_return_if_fail (GST_IS_AGGREGATOR (self));
3118
3119   if (allocator)
3120     *allocator = self->priv->allocator ?
3121         gst_object_ref (self->priv->allocator) : NULL;
3122
3123   if (params)
3124     *params = self->priv->allocation_params;
3125 }
3126
3127 /**
3128  * gst_aggregator_simple_get_next_time:
3129  * @self: A #GstAggregator
3130  *
3131  * This is a simple #GstAggregator::get_next_time implementation that
3132  * just looks at the #GstSegment on the srcpad of the aggregator and bases
3133  * the next time on the running there there.
3134  *
3135  * This is the desired behaviour in most cases where you have a live source
3136  * and you have a dead line based aggregator subclass.
3137  *
3138  * Returns: The running time based on the position
3139  *
3140  * Since: 1.16
3141  */
3142 GstClockTime
3143 gst_aggregator_simple_get_next_time (GstAggregator * self)
3144 {
3145   GstClockTime next_time;
3146   GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3147   GstSegment *segment = &srcpad->segment;
3148
3149   GST_OBJECT_LOCK (self);
3150   if (segment->position == -1 || segment->position < segment->start)
3151     next_time = segment->start;
3152   else
3153     next_time = segment->position;
3154
3155   if (segment->stop != -1 && next_time > segment->stop)
3156     next_time = segment->stop;
3157
3158   next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
3159   GST_OBJECT_UNLOCK (self);
3160
3161   return next_time;
3162 }