aggregator: cleanup event forwarding
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: manages a set of pads with the purpose of
26  * aggregating their buffers.
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *
35  *  * When data is queued on all pads, the aggregate vmethod is called.
36  *
37  *  * One can peek at the data on any given GstAggregatorPad with the
38  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
39  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
40  *    has been taken with steal_buffer (), a new buffer can be queued
41  *    on that pad.
42  *
43  *  * If the subclass wishes to push a buffer downstream in its aggregate
44  *    implementation, it should do so through the
45  *    gst_aggregator_finish_buffer () method. This method will take care
46  *    of sending and ordering mandatory events such as stream start, caps
47  *    and segment.
48  *
49  *  * Same goes for EOS events, which should not be pushed directly by the
50  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
51  *    implementation.
52  *
53  *  * Note that the aggregator logic regarding gap event handling is to turn
54  *    these into gap buffers with matching PTS and duration. It will also
55  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
56  *    to ease their identification and subsequent processing.
57  *
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68 typedef enum
69 {
70   GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
71   GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
72   GST_AGGREGATOR_START_TIME_SELECTION_SET
73 } GstAggregatorStartTimeSelection;
74
75 static GType
76 gst_aggregator_start_time_selection_get_type (void)
77 {
78   static GType gtype = 0;
79
80   if (gtype == 0) {
81     static const GEnumValue values[] = {
82       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
83           "Start at 0 running time (default)", "zero"},
84       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
85           "Start at first observed input running time", "first"},
86       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
87           "Set start time with start-time property", "set"},
88       {0, NULL, NULL}
89     };
90
91     gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
92   }
93   return gtype;
94 }
95
96 /*  Might become API */
97 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
98     const GstTagList * tags, GstTagMergeMode mode);
99 static void gst_aggregator_set_latency_property (GstAggregator * agg,
100     gint64 latency);
101 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
102
103
104 /* Locking order, locks in this element must always be taken in this order
105  *
106  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
107  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
108  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
109  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
110  * standard element object lock -> GST_OBJECT_LOCK(agg)
111  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
112  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
113  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
114  */
115
116
117 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
118
119 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
120 #define GST_CAT_DEFAULT aggregator_debug
121
122 /* GstAggregatorPad definitions */
123 #define PAD_LOCK(pad)   G_STMT_START {                                  \
124   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
125         g_thread_self());                                               \
126   g_mutex_lock(&pad->priv->lock);                                       \
127   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
128         g_thread_self());                                               \
129   } G_STMT_END
130
131 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
132   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
133       g_thread_self());                                                 \
134   g_mutex_unlock(&pad->priv->lock);                                     \
135   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
136         g_thread_self());                                               \
137   } G_STMT_END
138
139
140 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
141   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
142         g_thread_self());                                               \
143   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
144       (&((GstAggregatorPad*)pad)->priv->lock));                         \
145   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
146         g_thread_self());                                               \
147   } G_STMT_END
148
149 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
150   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
151         g_thread_self());                                              \
152   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
153   } G_STMT_END
154
155
156 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
157   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
158         g_thread_self());                                               \
159   g_mutex_lock(&pad->priv->flush_lock);                                 \
160   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
161         g_thread_self());                                               \
162   } G_STMT_END
163
164 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
165   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
166         g_thread_self());                                               \
167   g_mutex_unlock(&pad->priv->flush_lock);                               \
168   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
169         g_thread_self());                                               \
170   } G_STMT_END
171
172 #define SRC_LOCK(self)   G_STMT_START {                             \
173   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
174       g_thread_self());                                             \
175   g_mutex_lock(&self->priv->src_lock);                              \
176   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
177         g_thread_self());                                           \
178   } G_STMT_END
179
180 #define SRC_UNLOCK(self)  G_STMT_START {                            \
181   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
182         g_thread_self());                                           \
183   g_mutex_unlock(&self->priv->src_lock);                            \
184   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
185         g_thread_self());                                           \
186   } G_STMT_END
187
188 #define SRC_WAIT(self) G_STMT_START {                               \
189   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
190         g_thread_self());                                           \
191   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
192   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
193         g_thread_self());                                           \
194   } G_STMT_END
195
196 #define SRC_BROADCAST(self) G_STMT_START {                          \
197     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
198         g_thread_self());                                           \
199     if (self->priv->aggregate_id)                                   \
200       gst_clock_id_unschedule (self->priv->aggregate_id);           \
201     g_cond_broadcast(&(self->priv->src_cond));                      \
202   } G_STMT_END
203
204 struct _GstAggregatorPadPrivate
205 {
206   /* Following fields are protected by the PAD_LOCK */
207   GstFlowReturn flow_return;
208   gboolean pending_flush_start;
209   gboolean pending_flush_stop;
210   gboolean pending_eos;
211
212   gboolean first_buffer;
213
214   GQueue data;                  /* buffers, events and queries */
215   GstBuffer *clipped_buffer;
216   guint num_buffers;
217   GstClockTime head_position;
218   GstClockTime tail_position;
219   GstClockTime head_time;       /* running time */
220   GstClockTime tail_time;
221   GstClockTime time_level;      /* how much head is ahead of tail */
222   GstSegment head_segment;      /* segment before the queue */
223
224   gboolean negotiated;
225
226   gboolean eos;
227
228   GMutex lock;
229   GCond event_cond;
230   /* This lock prevents a flush start processing happening while
231    * the chain function is also happening.
232    */
233   GMutex flush_lock;
234 };
235
236 /* Must be called with PAD_LOCK held */
237 static void
238 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
239 {
240   aggpad->priv->pending_eos = FALSE;
241   aggpad->priv->eos = FALSE;
242   aggpad->priv->flow_return = GST_FLOW_OK;
243   GST_OBJECT_LOCK (aggpad);
244   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
245   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
246   GST_OBJECT_UNLOCK (aggpad);
247   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
248   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
249   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
250   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
251   aggpad->priv->time_level = 0;
252   aggpad->priv->first_buffer = TRUE;
253 }
254
255 static gboolean
256 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
257 {
258   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
259
260   PAD_LOCK (aggpad);
261   gst_aggregator_pad_reset_unlocked (aggpad);
262   PAD_UNLOCK (aggpad);
263
264   if (klass->flush)
265     return klass->flush (aggpad, agg);
266
267   return TRUE;
268 }
269
270 /*************************************
271  * GstAggregator implementation  *
272  *************************************/
273 static GstElementClass *aggregator_parent_class = NULL;
274
275 /* All members are protected by the object lock unless otherwise noted */
276
277 struct _GstAggregatorPrivate
278 {
279   gint max_padserial;
280
281   /* Our state is >= PAUSED */
282   gboolean running;             /* protected by src_lock */
283
284   gint seqnum;
285   gboolean send_stream_start;   /* protected by srcpad stream lock */
286   gboolean send_segment;
287   gboolean flush_seeking;
288   gboolean pending_flush_start;
289   gboolean send_eos;            /* protected by srcpad stream lock */
290
291   GstCaps *srccaps;             /* protected by the srcpad stream lock */
292
293   GstTagList *tags;
294   gboolean tags_changed;
295
296   gboolean peer_latency_live;   /* protected by src_lock */
297   GstClockTime peer_latency_min;        /* protected by src_lock */
298   GstClockTime peer_latency_max;        /* protected by src_lock */
299   gboolean has_peer_latency;    /* protected by src_lock */
300
301   GstClockTime sub_latency_min; /* protected by src_lock */
302   GstClockTime sub_latency_max; /* protected by src_lock */
303
304   /* aggregate */
305   GstClockID aggregate_id;      /* protected by src_lock */
306   GMutex src_lock;
307   GCond src_cond;
308
309   gboolean first_buffer;        /* protected by object lock */
310   GstAggregatorStartTimeSelection start_time_selection;
311   GstClockTime start_time;
312
313   /* protected by the object lock */
314   GstQuery *allocation_query;
315   GstAllocator *allocator;
316   GstBufferPool *pool;
317   GstAllocationParams allocation_params;
318
319   /* properties */
320   gint64 latency;               /* protected by both src_lock and all pad locks */
321 };
322
323 /* Seek event forwarding helper */
324 typedef struct
325 {
326   /* parameters */
327   GstEvent *event;
328   gboolean flush;
329   gboolean only_to_active_pads;
330
331   /* results */
332   gboolean result;
333   gboolean one_actually_seeked;
334 } EventData;
335
336 #define DEFAULT_LATENCY              0
337 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
338 #define DEFAULT_START_TIME           (-1)
339
340 enum
341 {
342   PROP_0,
343   PROP_LATENCY,
344   PROP_START_TIME_SELECTION,
345   PROP_START_TIME,
346   PROP_LAST
347 };
348
349 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
350     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
351
352 /**
353  * gst_aggregator_iterate_sinkpads:
354  * @self: The #GstAggregator
355  * @func: (scope call): The function to call.
356  * @user_data: (closure): The data to pass to @func.
357  *
358  * Iterate the sinkpads of aggregator to call a function on them.
359  *
360  * This method guarantees that @func will be called only once for each
361  * sink pad.
362  *
363  * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE
364  */
365 gboolean
366 gst_aggregator_iterate_sinkpads (GstAggregator * self,
367     GstAggregatorPadForeachFunc func, gpointer user_data)
368 {
369   gboolean result = FALSE;
370   GstIterator *iter;
371   gboolean done = FALSE;
372   GValue item = { 0, };
373   GList *seen_pads = NULL;
374
375   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
376
377   if (!iter)
378     goto no_iter;
379
380   while (!done) {
381     switch (gst_iterator_next (iter, &item)) {
382       case GST_ITERATOR_OK:
383       {
384         GstAggregatorPad *pad;
385
386         pad = g_value_get_object (&item);
387
388         /* if already pushed, skip. FIXME, find something faster to tag pads */
389         if (pad == NULL || g_list_find (seen_pads, pad)) {
390           g_value_reset (&item);
391           break;
392         }
393
394         GST_LOG_OBJECT (pad, "calling function %s on pad",
395             GST_DEBUG_FUNCPTR_NAME (func));
396
397         result = func (self, pad, user_data);
398
399         done = !result;
400
401         seen_pads = g_list_prepend (seen_pads, pad);
402
403         g_value_reset (&item);
404         break;
405       }
406       case GST_ITERATOR_RESYNC:
407         gst_iterator_resync (iter);
408         break;
409       case GST_ITERATOR_ERROR:
410         GST_ERROR_OBJECT (self,
411             "Could not iterate over internally linked pads");
412         done = TRUE;
413         break;
414       case GST_ITERATOR_DONE:
415         done = TRUE;
416         break;
417     }
418   }
419   g_value_unset (&item);
420   gst_iterator_free (iter);
421
422   if (seen_pads == NULL) {
423     GST_DEBUG_OBJECT (self, "No pad seen");
424     return FALSE;
425   }
426
427   g_list_free (seen_pads);
428
429 no_iter:
430   return result;
431 }
432
433 static gboolean
434 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
435 {
436   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
437       pad->priv->clipped_buffer == NULL);
438 }
439
440 static gboolean
441 gst_aggregator_check_pads_ready (GstAggregator * self)
442 {
443   GstAggregatorPad *pad;
444   GList *l, *sinkpads;
445   gboolean have_buffer = TRUE;
446   gboolean have_event = FALSE;
447
448   GST_LOG_OBJECT (self, "checking pads");
449
450   GST_OBJECT_LOCK (self);
451
452   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
453   if (sinkpads == NULL)
454     goto no_sinkpads;
455
456   for (l = sinkpads; l != NULL; l = l->next) {
457     pad = l->data;
458
459     PAD_LOCK (pad);
460
461     if (pad->priv->num_buffers == 0) {
462       if (!gst_aggregator_pad_queue_is_empty (pad))
463         have_event = TRUE;
464       if (!pad->priv->eos) {
465         have_buffer = FALSE;
466
467         /* If not live we need data on all pads, so leave the loop */
468         if (!self->priv->peer_latency_live) {
469           PAD_UNLOCK (pad);
470           goto pad_not_ready;
471         }
472       }
473     } else if (self->priv->peer_latency_live) {
474       /* In live mode, having a single pad with buffers is enough to
475        * generate a start time from it. In non-live mode all pads need
476        * to have a buffer
477        */
478       self->priv->first_buffer = FALSE;
479     }
480
481     PAD_UNLOCK (pad);
482   }
483
484   if (!have_buffer && !have_event)
485     goto pad_not_ready;
486
487   if (have_buffer)
488     self->priv->first_buffer = FALSE;
489
490   GST_OBJECT_UNLOCK (self);
491   GST_LOG_OBJECT (self, "pads are ready");
492   return TRUE;
493
494 no_sinkpads:
495   {
496     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
497     GST_OBJECT_UNLOCK (self);
498     return FALSE;
499   }
500 pad_not_ready:
501   {
502     if (have_event)
503       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
504           " but waking up for serialized event");
505     else
506       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
507     GST_OBJECT_UNLOCK (self);
508     return have_event;
509   }
510 }
511
512 static void
513 gst_aggregator_reset_flow_values (GstAggregator * self)
514 {
515   GST_OBJECT_LOCK (self);
516   self->priv->send_stream_start = TRUE;
517   self->priv->send_segment = TRUE;
518   gst_segment_init (&self->segment, GST_FORMAT_TIME);
519   self->priv->first_buffer = TRUE;
520   GST_OBJECT_UNLOCK (self);
521 }
522
523 static inline void
524 gst_aggregator_push_mandatory_events (GstAggregator * self)
525 {
526   GstAggregatorPrivate *priv = self->priv;
527   GstEvent *segment = NULL;
528   GstEvent *tags = NULL;
529
530   if (self->priv->send_stream_start) {
531     gchar s_id[32];
532
533     GST_INFO_OBJECT (self, "pushing stream start");
534     /* stream-start (FIXME: create id based on input ids) */
535     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
536     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
537       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
538     }
539     self->priv->send_stream_start = FALSE;
540   }
541
542   if (self->priv->srccaps) {
543
544     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
545         self->priv->srccaps);
546     if (!gst_pad_push_event (self->srcpad,
547             gst_event_new_caps (self->priv->srccaps))) {
548       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
549     }
550     gst_caps_unref (self->priv->srccaps);
551     self->priv->srccaps = NULL;
552   }
553
554   GST_OBJECT_LOCK (self);
555   if (self->priv->send_segment && !self->priv->flush_seeking) {
556     segment = gst_event_new_segment (&self->segment);
557
558     if (!self->priv->seqnum)
559       self->priv->seqnum = gst_event_get_seqnum (segment);
560     else
561       gst_event_set_seqnum (segment, self->priv->seqnum);
562     self->priv->send_segment = FALSE;
563
564     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
565   }
566
567   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
568     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
569     priv->tags_changed = FALSE;
570   }
571   GST_OBJECT_UNLOCK (self);
572
573   if (segment)
574     gst_pad_push_event (self->srcpad, segment);
575   if (tags)
576     gst_pad_push_event (self->srcpad, tags);
577
578 }
579
580 /**
581  * gst_aggregator_set_src_caps:
582  * @self: The #GstAggregator
583  * @caps: The #GstCaps to set on the src pad.
584  *
585  * Sets the caps to be used on the src pad.
586  */
587 void
588 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
589 {
590   GST_PAD_STREAM_LOCK (self->srcpad);
591   gst_caps_replace (&self->priv->srccaps, caps);
592   gst_aggregator_push_mandatory_events (self);
593   GST_PAD_STREAM_UNLOCK (self->srcpad);
594 }
595
596 /**
597  * gst_aggregator_finish_buffer:
598  * @self: The #GstAggregator
599  * @buffer: (transfer full): the #GstBuffer to push.
600  *
601  * This method will push the provided output buffer downstream. If needed,
602  * mandatory events such as stream-start, caps, and segment events will be
603  * sent before pushing the buffer.
604  */
605 GstFlowReturn
606 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
607 {
608   gst_aggregator_push_mandatory_events (self);
609
610   GST_OBJECT_LOCK (self);
611   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
612     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
613     GST_OBJECT_UNLOCK (self);
614     return gst_pad_push (self->srcpad, buffer);
615   } else {
616     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
617         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
618     GST_OBJECT_UNLOCK (self);
619     gst_buffer_unref (buffer);
620     return GST_FLOW_OK;
621   }
622 }
623
624 static void
625 gst_aggregator_push_eos (GstAggregator * self)
626 {
627   GstEvent *event;
628   gst_aggregator_push_mandatory_events (self);
629
630   event = gst_event_new_eos ();
631
632   GST_OBJECT_LOCK (self);
633   self->priv->send_eos = FALSE;
634   gst_event_set_seqnum (event, self->priv->seqnum);
635   GST_OBJECT_UNLOCK (self);
636
637   gst_pad_push_event (self->srcpad, event);
638 }
639
640 static GstClockTime
641 gst_aggregator_get_next_time (GstAggregator * self)
642 {
643   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
644
645   if (klass->get_next_time)
646     return klass->get_next_time (self);
647
648   return GST_CLOCK_TIME_NONE;
649 }
650
651 static gboolean
652 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
653 {
654   GstClockTime latency;
655   GstClockTime start;
656   gboolean res;
657
658   *timeout = FALSE;
659
660   SRC_LOCK (self);
661
662   latency = gst_aggregator_get_latency_unlocked (self);
663
664   if (gst_aggregator_check_pads_ready (self)) {
665     GST_DEBUG_OBJECT (self, "all pads have data");
666     SRC_UNLOCK (self);
667
668     return TRUE;
669   }
670
671   /* Before waiting, check if we're actually still running */
672   if (!self->priv->running || !self->priv->send_eos) {
673     SRC_UNLOCK (self);
674
675     return FALSE;
676   }
677
678   start = gst_aggregator_get_next_time (self);
679
680   /* If we're not live, or if we use the running time
681    * of the first buffer as start time, we wait until
682    * all pads have buffers.
683    * Otherwise (i.e. if we are live!), we wait on the clock
684    * and if a pad does not have a buffer in time we ignore
685    * that pad.
686    */
687   GST_OBJECT_LOCK (self);
688   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
689       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
690       !GST_CLOCK_TIME_IS_VALID (start) ||
691       (self->priv->first_buffer
692           && self->priv->start_time_selection ==
693           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
694     /* We wake up here when something happened, and below
695      * then check if we're ready now. If we return FALSE,
696      * we will be directly called again.
697      */
698     GST_OBJECT_UNLOCK (self);
699     SRC_WAIT (self);
700   } else {
701     GstClockTime base_time, time;
702     GstClock *clock;
703     GstClockReturn status;
704     GstClockTimeDiff jitter;
705
706     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
707         GST_TIME_ARGS (start));
708
709     base_time = GST_ELEMENT_CAST (self)->base_time;
710     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
711     GST_OBJECT_UNLOCK (self);
712
713     time = base_time + start;
714     time += latency;
715
716     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
717         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
718         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
719         GST_TIME_ARGS (time),
720         GST_TIME_ARGS (base_time),
721         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
722         GST_TIME_ARGS (gst_clock_get_time (clock)));
723
724     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
725     gst_object_unref (clock);
726     SRC_UNLOCK (self);
727
728     jitter = 0;
729     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
730
731     SRC_LOCK (self);
732     if (self->priv->aggregate_id) {
733       gst_clock_id_unref (self->priv->aggregate_id);
734       self->priv->aggregate_id = NULL;
735     }
736
737     GST_DEBUG_OBJECT (self,
738         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
739         status, GST_STIME_ARGS (jitter));
740
741     /* we timed out */
742     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
743       SRC_UNLOCK (self);
744       *timeout = TRUE;
745       return TRUE;
746     }
747   }
748
749   res = gst_aggregator_check_pads_ready (self);
750   SRC_UNLOCK (self);
751
752   return res;
753 }
754
755 static gboolean
756 gst_aggregator_do_events_and_queries (GstAggregator * self,
757     GstAggregatorPad * pad, gpointer user_data)
758 {
759   GstEvent *event = NULL;
760   GstQuery *query = NULL;
761   GstAggregatorClass *klass = NULL;
762   gboolean *processed_event = user_data;
763
764   do {
765     event = NULL;
766     query = NULL;
767
768     PAD_LOCK (pad);
769     if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
770       pad->priv->pending_eos = FALSE;
771       pad->priv->eos = TRUE;
772     }
773     if (pad->priv->clipped_buffer == NULL &&
774         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
775       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
776         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
777       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
778         query = g_queue_peek_tail (&pad->priv->data);
779     }
780     PAD_UNLOCK (pad);
781     if (event || query) {
782       gboolean ret;
783
784       if (processed_event)
785         *processed_event = TRUE;
786       if (klass == NULL)
787         klass = GST_AGGREGATOR_GET_CLASS (self);
788
789       if (event) {
790         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
791         gst_event_ref (event);
792         ret = klass->sink_event (self, pad, event);
793
794         PAD_LOCK (pad);
795         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
796           pad->priv->negotiated = ret;
797         if (g_queue_peek_tail (&pad->priv->data) == event)
798           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
799         gst_event_unref (event);
800       } else if (query) {
801         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
802         ret = klass->sink_query (self, pad, query);
803
804         PAD_LOCK (pad);
805         if (g_queue_peek_tail (&pad->priv->data) == query) {
806           GstStructure *s;
807
808           s = gst_query_writable_structure (query);
809           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
810               NULL);
811           g_queue_pop_tail (&pad->priv->data);
812         }
813       }
814
815       PAD_BROADCAST_EVENT (pad);
816       PAD_UNLOCK (pad);
817     }
818   } while (event || query);
819
820   return TRUE;
821 }
822
823 static void
824 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
825     GstFlowReturn flow_return, gboolean full)
826 {
827   GList *item;
828
829   PAD_LOCK (aggpad);
830   if (flow_return == GST_FLOW_NOT_LINKED)
831     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
832   else
833     aggpad->priv->flow_return = flow_return;
834
835   item = g_queue_peek_head_link (&aggpad->priv->data);
836   while (item) {
837     GList *next = item->next;
838
839     /* In partial flush, we do like the pad, we get rid of non-sticky events
840      * and EOS/SEGMENT.
841      */
842     if (full || GST_IS_BUFFER (item->data) ||
843         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
844         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
845         !GST_EVENT_IS_STICKY (item->data)) {
846       if (!GST_IS_QUERY (item->data))
847         gst_mini_object_unref (item->data);
848       g_queue_delete_link (&aggpad->priv->data, item);
849     }
850     item = next;
851   }
852   aggpad->priv->num_buffers = 0;
853   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
854
855   PAD_BROADCAST_EVENT (aggpad);
856   PAD_UNLOCK (aggpad);
857 }
858
859 static GstFlowReturn
860 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
861     GstCaps ** ret)
862 {
863   *ret = gst_caps_ref (caps);
864
865   return GST_FLOW_OK;
866 }
867
868 static GstCaps *
869 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
870 {
871   caps = gst_caps_fixate (caps);
872
873   return caps;
874 }
875
876 static gboolean
877 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
878 {
879   return TRUE;
880 }
881
882
883 /* takes ownership of the pool, allocator and query */
884 static gboolean
885 gst_aggregator_set_allocation (GstAggregator * self,
886     GstBufferPool * pool, GstAllocator * allocator,
887     GstAllocationParams * params, GstQuery * query)
888 {
889   GstAllocator *oldalloc;
890   GstBufferPool *oldpool;
891   GstQuery *oldquery;
892
893   GST_DEBUG ("storing allocation query");
894
895   GST_OBJECT_LOCK (self);
896   oldpool = self->priv->pool;
897   self->priv->pool = pool;
898
899   oldalloc = self->priv->allocator;
900   self->priv->allocator = allocator;
901
902   oldquery = self->priv->allocation_query;
903   self->priv->allocation_query = query;
904
905   if (params)
906     self->priv->allocation_params = *params;
907   else
908     gst_allocation_params_init (&self->priv->allocation_params);
909   GST_OBJECT_UNLOCK (self);
910
911   if (oldpool) {
912     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
913     gst_buffer_pool_set_active (oldpool, FALSE);
914     gst_object_unref (oldpool);
915   }
916   if (oldalloc) {
917     gst_object_unref (oldalloc);
918   }
919   if (oldquery) {
920     gst_query_unref (oldquery);
921   }
922   return TRUE;
923 }
924
925
926 static gboolean
927 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
928 {
929   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
930
931   if (aggclass->decide_allocation)
932     if (!aggclass->decide_allocation (self, query))
933       return FALSE;
934
935   return TRUE;
936 }
937
938 static gboolean
939 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
940 {
941   GstQuery *query;
942   gboolean result = TRUE;
943   GstBufferPool *pool = NULL;
944   GstAllocator *allocator;
945   GstAllocationParams params;
946
947   /* find a pool for the negotiated caps now */
948   GST_DEBUG_OBJECT (self, "doing allocation query");
949   query = gst_query_new_allocation (caps, TRUE);
950   if (!gst_pad_peer_query (self->srcpad, query)) {
951     /* not a problem, just debug a little */
952     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
953   }
954
955   GST_DEBUG_OBJECT (self, "calling decide_allocation");
956   result = gst_aggregator_decide_allocation (self, query);
957
958   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
959       query);
960
961   if (!result)
962     goto no_decide_allocation;
963
964   /* we got configuration from our peer or the decide_allocation method,
965    * parse them */
966   if (gst_query_get_n_allocation_params (query) > 0) {
967     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
968   } else {
969     allocator = NULL;
970     gst_allocation_params_init (&params);
971   }
972
973   if (gst_query_get_n_allocation_pools (query) > 0)
974     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
975
976   /* now store */
977   result =
978       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
979
980   return result;
981
982   /* Errors */
983 no_decide_allocation:
984   {
985     GST_WARNING_OBJECT (self, "Failed to decide allocation");
986     gst_query_unref (query);
987
988     return result;
989   }
990
991 }
992
993 /* WITH SRC_LOCK held */
994 static GstFlowReturn
995 gst_aggregator_update_src_caps (GstAggregator * self)
996 {
997   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
998   GstCaps *downstream_caps, *template_caps, *caps = NULL;
999   GstFlowReturn ret = GST_FLOW_OK;
1000
1001   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1002   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1003
1004   if (gst_caps_is_empty (downstream_caps)) {
1005     GST_INFO_OBJECT (self, "Downstream caps (%"
1006         GST_PTR_FORMAT ") not compatible with pad template caps (%"
1007         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1008     ret = GST_FLOW_NOT_NEGOTIATED;
1009     goto done;
1010   }
1011
1012   g_assert (agg_klass->update_src_caps);
1013   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1014       downstream_caps);
1015   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1016   if (ret < GST_FLOW_OK) {
1017     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1018     goto done;
1019   }
1020   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1021     ret = GST_FLOW_NOT_NEGOTIATED;
1022     goto done;
1023   }
1024   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1025
1026 #ifdef GST_ENABLE_EXTRA_CHECKS
1027   if (!gst_caps_is_subset (caps, template_caps)) {
1028     GstCaps *intersection;
1029
1030     GST_ERROR_OBJECT (self,
1031         "update_src_caps returned caps %" GST_PTR_FORMAT
1032         " which are not a real subset of the template caps %"
1033         GST_PTR_FORMAT, caps, template_caps);
1034     g_warning ("%s: update_src_caps returned caps which are not a real "
1035         "subset of the filter caps", GST_ELEMENT_NAME (self));
1036
1037     intersection =
1038         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1039     gst_caps_unref (caps);
1040     caps = intersection;
1041   }
1042 #endif
1043
1044   if (gst_caps_is_any (caps)) {
1045     goto done;
1046   }
1047
1048   if (!gst_caps_is_fixed (caps)) {
1049     g_assert (agg_klass->fixate_src_caps);
1050
1051     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1052     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1053       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1054       ret = GST_FLOW_NOT_NEGOTIATED;
1055       goto done;
1056     }
1057     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1058   }
1059
1060   if (agg_klass->negotiated_src_caps) {
1061     if (!agg_klass->negotiated_src_caps (self, caps)) {
1062       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1063       ret = GST_FLOW_NOT_NEGOTIATED;
1064       goto done;
1065     }
1066   }
1067
1068   gst_aggregator_set_src_caps (self, caps);
1069
1070   if (!gst_aggregator_do_allocation (self, caps)) {
1071     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1072     ret = GST_FLOW_NOT_NEGOTIATED;
1073   }
1074
1075 done:
1076   gst_caps_unref (downstream_caps);
1077   gst_caps_unref (template_caps);
1078
1079   if (caps)
1080     gst_caps_unref (caps);
1081
1082   return ret;
1083 }
1084
1085 static void
1086 gst_aggregator_aggregate_func (GstAggregator * self)
1087 {
1088   GstAggregatorPrivate *priv = self->priv;
1089   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1090   gboolean timeout = FALSE;
1091
1092   if (self->priv->running == FALSE) {
1093     GST_DEBUG_OBJECT (self, "Not running anymore");
1094     return;
1095   }
1096
1097   GST_LOG_OBJECT (self, "Checking aggregate");
1098   while (priv->send_eos && priv->running) {
1099     GstFlowReturn flow_return = GST_FLOW_OK;
1100     gboolean processed_event = FALSE;
1101
1102     gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
1103         NULL);
1104
1105     if (!gst_aggregator_wait_and_check (self, &timeout))
1106       continue;
1107
1108     gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
1109         &processed_event);
1110     if (processed_event)
1111       continue;
1112
1113     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1114       flow_return = gst_aggregator_update_src_caps (self);
1115       if (flow_return != GST_FLOW_OK)
1116         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1117     }
1118
1119     if (timeout || flow_return >= GST_FLOW_OK) {
1120       GST_TRACE_OBJECT (self, "Actually aggregating!");
1121       flow_return = klass->aggregate (self, timeout);
1122     }
1123
1124     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1125       continue;
1126
1127     GST_OBJECT_LOCK (self);
1128     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
1129       /* We don't want to set the pads to flushing, but we want to
1130        * stop the thread, so just break here */
1131       GST_OBJECT_UNLOCK (self);
1132       break;
1133     }
1134     GST_OBJECT_UNLOCK (self);
1135
1136     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1137       gst_aggregator_push_eos (self);
1138     }
1139
1140     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1141
1142     if (flow_return != GST_FLOW_OK) {
1143       GList *item;
1144
1145       GST_OBJECT_LOCK (self);
1146       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1147         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1148
1149         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1150       }
1151       GST_OBJECT_UNLOCK (self);
1152       break;
1153     }
1154   }
1155
1156   /* Pause the task here, the only ways to get here are:
1157    * 1) We're stopping, in which case the task is stopped anyway
1158    * 2) We got a flow error above, in which case it might take
1159    *    some time to forward the flow return upstream and we
1160    *    would otherwise call the task function over and over
1161    *    again without doing anything
1162    */
1163   gst_pad_pause_task (self->srcpad);
1164 }
1165
1166 static gboolean
1167 gst_aggregator_start (GstAggregator * self)
1168 {
1169   GstAggregatorClass *klass;
1170   gboolean result;
1171
1172   self->priv->send_stream_start = TRUE;
1173   self->priv->send_segment = TRUE;
1174   self->priv->send_eos = TRUE;
1175   self->priv->srccaps = NULL;
1176
1177   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1178
1179   klass = GST_AGGREGATOR_GET_CLASS (self);
1180
1181   if (klass->start)
1182     result = klass->start (self);
1183   else
1184     result = TRUE;
1185
1186   return result;
1187 }
1188
1189 static gboolean
1190 _check_pending_flush_stop (GstAggregatorPad * pad)
1191 {
1192   gboolean res;
1193
1194   PAD_LOCK (pad);
1195   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1196   PAD_UNLOCK (pad);
1197
1198   return res;
1199 }
1200
1201 static gboolean
1202 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1203 {
1204   gboolean res = TRUE;
1205
1206   GST_INFO_OBJECT (self, "%s srcpad task",
1207       flush_start ? "Pausing" : "Stopping");
1208
1209   SRC_LOCK (self);
1210   self->priv->running = FALSE;
1211   SRC_BROADCAST (self);
1212   SRC_UNLOCK (self);
1213
1214   if (flush_start) {
1215     res = gst_pad_push_event (self->srcpad, flush_start);
1216   }
1217
1218   gst_pad_stop_task (self->srcpad);
1219
1220   return res;
1221 }
1222
1223 static void
1224 gst_aggregator_start_srcpad_task (GstAggregator * self)
1225 {
1226   GST_INFO_OBJECT (self, "Starting srcpad task");
1227
1228   self->priv->running = TRUE;
1229   gst_pad_start_task (GST_PAD (self->srcpad),
1230       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1231 }
1232
1233 static GstFlowReturn
1234 gst_aggregator_flush (GstAggregator * self)
1235 {
1236   GstFlowReturn ret = GST_FLOW_OK;
1237   GstAggregatorPrivate *priv = self->priv;
1238   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1239
1240   GST_DEBUG_OBJECT (self, "Flushing everything");
1241   GST_OBJECT_LOCK (self);
1242   priv->send_segment = TRUE;
1243   priv->flush_seeking = FALSE;
1244   priv->tags_changed = FALSE;
1245   GST_OBJECT_UNLOCK (self);
1246   if (klass->flush)
1247     ret = klass->flush (self);
1248
1249   return ret;
1250 }
1251
1252
1253 /* Called with GstAggregator's object lock held */
1254
1255 static gboolean
1256 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1257 {
1258   GList *tmp;
1259   GstAggregatorPad *tmppad;
1260
1261   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1262     tmppad = (GstAggregatorPad *) tmp->data;
1263
1264     if (_check_pending_flush_stop (tmppad) == FALSE) {
1265       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1266           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1267       return FALSE;
1268     }
1269   }
1270
1271   return TRUE;
1272 }
1273
1274 static void
1275 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1276     GstEvent * event)
1277 {
1278   GstAggregatorPrivate *priv = self->priv;
1279   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1280
1281   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1282
1283   PAD_FLUSH_LOCK (aggpad);
1284   PAD_LOCK (aggpad);
1285   if (padpriv->pending_flush_start) {
1286     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1287
1288     padpriv->pending_flush_start = FALSE;
1289     padpriv->pending_flush_stop = TRUE;
1290   }
1291   PAD_UNLOCK (aggpad);
1292
1293   GST_OBJECT_LOCK (self);
1294   if (priv->flush_seeking) {
1295     /* If flush_seeking we forward the first FLUSH_START */
1296     if (priv->pending_flush_start) {
1297       priv->pending_flush_start = FALSE;
1298       GST_OBJECT_UNLOCK (self);
1299
1300       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1301       gst_aggregator_stop_srcpad_task (self, event);
1302
1303       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1304       GST_PAD_STREAM_LOCK (self->srcpad);
1305       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1306       event = NULL;
1307     } else {
1308       GST_OBJECT_UNLOCK (self);
1309       gst_event_unref (event);
1310     }
1311   } else {
1312     GST_OBJECT_UNLOCK (self);
1313     gst_event_unref (event);
1314   }
1315   PAD_FLUSH_UNLOCK (aggpad);
1316 }
1317
1318 /* Must be called with the the PAD_LOCK held */
1319 static void
1320 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1321 {
1322   if (head) {
1323     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
1324         aggpad->priv->head_segment.format == GST_FORMAT_TIME)
1325       aggpad->priv->head_time =
1326           gst_segment_to_running_time (&aggpad->priv->head_segment,
1327           GST_FORMAT_TIME, aggpad->priv->head_position);
1328     else
1329       aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
1330
1331     if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
1332       aggpad->priv->tail_time = aggpad->priv->head_time;
1333   } else {
1334     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
1335         aggpad->segment.format == GST_FORMAT_TIME)
1336       aggpad->priv->tail_time =
1337           gst_segment_to_running_time (&aggpad->segment,
1338           GST_FORMAT_TIME, aggpad->priv->tail_position);
1339     else
1340       aggpad->priv->tail_time = aggpad->priv->head_time;
1341   }
1342
1343   if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
1344       aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
1345     aggpad->priv->time_level = 0;
1346     return;
1347   }
1348
1349   if (aggpad->priv->tail_time > aggpad->priv->head_time)
1350     aggpad->priv->time_level = 0;
1351   else
1352     aggpad->priv->time_level = aggpad->priv->head_time -
1353         aggpad->priv->tail_time;
1354 }
1355
1356
1357 /* GstAggregator vmethods default implementations */
1358 static gboolean
1359 gst_aggregator_default_sink_event (GstAggregator * self,
1360     GstAggregatorPad * aggpad, GstEvent * event)
1361 {
1362   gboolean res = TRUE;
1363   GstPad *pad = GST_PAD (aggpad);
1364   GstAggregatorPrivate *priv = self->priv;
1365
1366   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1367
1368   switch (GST_EVENT_TYPE (event)) {
1369     case GST_EVENT_FLUSH_START:
1370     {
1371       gst_aggregator_flush_start (self, aggpad, event);
1372       /* We forward only in one case: right after flush_seeking */
1373       event = NULL;
1374       goto eat;
1375     }
1376     case GST_EVENT_FLUSH_STOP:
1377     {
1378       gst_aggregator_pad_flush (aggpad, self);
1379       GST_OBJECT_LOCK (self);
1380       if (priv->flush_seeking) {
1381         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1382         if (gst_aggregator_all_flush_stop_received_locked (self)) {
1383           GST_OBJECT_UNLOCK (self);
1384           /* That means we received FLUSH_STOP/FLUSH_STOP on
1385            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1386           gst_aggregator_flush (self);
1387           gst_pad_push_event (self->srcpad, event);
1388           event = NULL;
1389           SRC_LOCK (self);
1390           priv->send_eos = TRUE;
1391           SRC_BROADCAST (self);
1392           SRC_UNLOCK (self);
1393
1394           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1395           GST_PAD_STREAM_UNLOCK (self->srcpad);
1396           gst_aggregator_start_srcpad_task (self);
1397         } else {
1398           GST_OBJECT_UNLOCK (self);
1399         }
1400       } else {
1401         GST_OBJECT_UNLOCK (self);
1402       }
1403
1404       /* We never forward the event */
1405       goto eat;
1406     }
1407     case GST_EVENT_EOS:
1408     {
1409       /* We still have a buffer, and we don't want the subclass to have to
1410        * check for it. Mark pending_eos, eos will be set when steal_buffer is
1411        * called
1412        */
1413       SRC_LOCK (self);
1414       PAD_LOCK (aggpad);
1415       if (aggpad->priv->num_buffers == 0) {
1416         aggpad->priv->eos = TRUE;
1417       } else {
1418         aggpad->priv->pending_eos = TRUE;
1419       }
1420       PAD_UNLOCK (aggpad);
1421
1422       SRC_BROADCAST (self);
1423       SRC_UNLOCK (self);
1424       goto eat;
1425     }
1426     case GST_EVENT_SEGMENT:
1427     {
1428       PAD_LOCK (aggpad);
1429       GST_OBJECT_LOCK (aggpad);
1430       gst_event_copy_segment (event, &aggpad->segment);
1431       /* We've got a new segment, tail_position is now meaningless
1432        * and may interfere with the time_level calculation
1433        */
1434       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1435       update_time_level (aggpad, FALSE);
1436       GST_OBJECT_UNLOCK (aggpad);
1437       PAD_UNLOCK (aggpad);
1438
1439       GST_OBJECT_LOCK (self);
1440       self->priv->seqnum = gst_event_get_seqnum (event);
1441       GST_OBJECT_UNLOCK (self);
1442       goto eat;
1443     }
1444     case GST_EVENT_STREAM_START:
1445     {
1446       goto eat;
1447     }
1448     case GST_EVENT_GAP:
1449     {
1450       GstClockTime pts, endpts;
1451       GstClockTime duration;
1452       GstBuffer *gapbuf;
1453
1454       gst_event_parse_gap (event, &pts, &duration);
1455       gapbuf = gst_buffer_new ();
1456
1457       if (GST_CLOCK_TIME_IS_VALID (duration))
1458         endpts = pts + duration;
1459       else
1460         endpts = GST_CLOCK_TIME_NONE;
1461
1462       GST_OBJECT_LOCK (aggpad);
1463       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1464           &pts, &endpts);
1465       GST_OBJECT_UNLOCK (aggpad);
1466
1467       if (!res) {
1468         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1469         goto eat;
1470       }
1471
1472       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1473         duration = endpts - pts;
1474       else
1475         duration = GST_CLOCK_TIME_NONE;
1476
1477       GST_BUFFER_PTS (gapbuf) = pts;
1478       GST_BUFFER_DURATION (gapbuf) = duration;
1479       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1480       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1481
1482       /* Remove GAP event so we can replace it with the buffer */
1483       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1484         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1485
1486       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1487           GST_FLOW_OK) {
1488         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1489         res = FALSE;
1490       }
1491
1492       goto eat;
1493     }
1494     case GST_EVENT_TAG:
1495     {
1496       GstTagList *tags;
1497
1498       gst_event_parse_tag (event, &tags);
1499
1500       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
1501         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
1502         gst_event_unref (event);
1503         event = NULL;
1504         goto eat;
1505       }
1506       break;
1507     }
1508     default:
1509     {
1510       break;
1511     }
1512   }
1513
1514   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1515   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1516
1517 eat:
1518   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1519   if (event)
1520     gst_event_unref (event);
1521
1522   return res;
1523 }
1524
1525 static inline gboolean
1526 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
1527     gpointer unused_udata)
1528 {
1529   gst_aggregator_pad_flush (pad, self);
1530
1531   PAD_LOCK (pad);
1532   pad->priv->flow_return = GST_FLOW_FLUSHING;
1533   pad->priv->negotiated = FALSE;
1534   PAD_BROADCAST_EVENT (pad);
1535   PAD_UNLOCK (pad);
1536
1537   return TRUE;
1538 }
1539
1540 static gboolean
1541 gst_aggregator_stop (GstAggregator * agg)
1542 {
1543   GstAggregatorClass *klass;
1544   gboolean result;
1545
1546   gst_aggregator_reset_flow_values (agg);
1547
1548   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
1549
1550   klass = GST_AGGREGATOR_GET_CLASS (agg);
1551
1552   if (klass->stop)
1553     result = klass->stop (agg);
1554   else
1555     result = TRUE;
1556
1557   agg->priv->has_peer_latency = FALSE;
1558   agg->priv->peer_latency_live = FALSE;
1559   agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
1560
1561   if (agg->priv->tags)
1562     gst_tag_list_unref (agg->priv->tags);
1563   agg->priv->tags = NULL;
1564
1565   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1566
1567   return result;
1568 }
1569
1570 /* GstElement vmethods implementations */
1571 static GstStateChangeReturn
1572 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1573 {
1574   GstStateChangeReturn ret;
1575   GstAggregator *self = GST_AGGREGATOR (element);
1576
1577   switch (transition) {
1578     case GST_STATE_CHANGE_READY_TO_PAUSED:
1579       if (!gst_aggregator_start (self))
1580         goto error_start;
1581       break;
1582     default:
1583       break;
1584   }
1585
1586   if ((ret =
1587           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1588               transition)) == GST_STATE_CHANGE_FAILURE)
1589     goto failure;
1590
1591
1592   switch (transition) {
1593     case GST_STATE_CHANGE_PAUSED_TO_READY:
1594       if (!gst_aggregator_stop (self)) {
1595         /* What to do in this case? Error out? */
1596         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1597       }
1598       break;
1599     default:
1600       break;
1601   }
1602
1603   return ret;
1604
1605 /* ERRORS */
1606 failure:
1607   {
1608     GST_ERROR_OBJECT (element, "parent failed state change");
1609     return ret;
1610   }
1611 error_start:
1612   {
1613     GST_ERROR_OBJECT (element, "Subclass failed to start");
1614     return GST_STATE_CHANGE_FAILURE;
1615   }
1616 }
1617
1618 static void
1619 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1620 {
1621   GstAggregator *self = GST_AGGREGATOR (element);
1622   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1623
1624   GST_INFO_OBJECT (pad, "Removing pad");
1625
1626   SRC_LOCK (self);
1627   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1628   gst_element_remove_pad (element, pad);
1629
1630   self->priv->has_peer_latency = FALSE;
1631   SRC_BROADCAST (self);
1632   SRC_UNLOCK (self);
1633 }
1634
1635 static GstAggregatorPad *
1636 gst_aggregator_default_create_new_pad (GstAggregator * self,
1637     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1638 {
1639   GstAggregatorPad *agg_pad;
1640   GstAggregatorPrivate *priv = self->priv;
1641   gint serial = 0;
1642   gchar *name = NULL;
1643
1644   if (templ->direction != GST_PAD_SINK)
1645     goto not_sink;
1646
1647   if (templ->presence != GST_PAD_REQUEST)
1648     goto not_request;
1649
1650   GST_OBJECT_LOCK (self);
1651   if (req_name == NULL || strlen (req_name) < 6
1652       || !g_str_has_prefix (req_name, "sink_")) {
1653     /* no name given when requesting the pad, use next available int */
1654     serial = ++priv->max_padserial;
1655   } else {
1656     /* parse serial number from requested padname */
1657     serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1658     if (serial > priv->max_padserial)
1659       priv->max_padserial = serial;
1660   }
1661
1662   name = g_strdup_printf ("sink_%u", serial);
1663   agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1664       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1665   g_free (name);
1666
1667   GST_OBJECT_UNLOCK (self);
1668
1669   return agg_pad;
1670
1671   /* errors */
1672 not_sink:
1673   {
1674     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
1675     return NULL;
1676   }
1677 not_request:
1678   {
1679     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
1680     return NULL;
1681   }
1682 }
1683
1684 static GstPad *
1685 gst_aggregator_request_new_pad (GstElement * element,
1686     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1687 {
1688   GstAggregator *self;
1689   GstAggregatorPad *agg_pad;
1690   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1691   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1692
1693   self = GST_AGGREGATOR (element);
1694
1695   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1696   if (!agg_pad) {
1697     GST_ERROR_OBJECT (element, "Couldn't create new pad");
1698     return NULL;
1699   }
1700
1701   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1702
1703   if (priv->running)
1704     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1705
1706   /* add the pad to the element */
1707   gst_element_add_pad (element, GST_PAD (agg_pad));
1708
1709   return GST_PAD (agg_pad);
1710 }
1711
1712 /* Must be called with SRC_LOCK held */
1713
1714 static gboolean
1715 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1716 {
1717   gboolean query_ret, live;
1718   GstClockTime our_latency, min, max;
1719
1720   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1721
1722   if (!query_ret) {
1723     GST_WARNING_OBJECT (self, "Latency query failed");
1724     return FALSE;
1725   }
1726
1727   gst_query_parse_latency (query, &live, &min, &max);
1728
1729   our_latency = self->priv->latency;
1730
1731   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1732     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1733         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1734     return FALSE;
1735   }
1736
1737   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1738     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1739         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1740             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1741             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1742     return FALSE;
1743   }
1744
1745   self->priv->peer_latency_live = live;
1746   self->priv->peer_latency_min = min;
1747   self->priv->peer_latency_max = max;
1748   self->priv->has_peer_latency = TRUE;
1749
1750   /* add our own */
1751   min += our_latency;
1752   min += self->priv->sub_latency_min;
1753   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1754       && GST_CLOCK_TIME_IS_VALID (max))
1755     max += self->priv->sub_latency_max + our_latency;
1756   else
1757     max = GST_CLOCK_TIME_NONE;
1758
1759   SRC_BROADCAST (self);
1760
1761   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1762       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1763
1764   gst_query_set_latency (query, live, min, max);
1765
1766   return query_ret;
1767 }
1768
1769 /*
1770  * MUST be called with the src_lock held.
1771  *
1772  * See  gst_aggregator_get_latency() for doc
1773  */
1774 static GstClockTime
1775 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1776 {
1777   GstClockTime latency;
1778
1779   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1780
1781   if (!self->priv->has_peer_latency) {
1782     GstQuery *query = gst_query_new_latency ();
1783     gboolean ret;
1784
1785     ret = gst_aggregator_query_latency_unlocked (self, query);
1786     gst_query_unref (query);
1787     if (!ret)
1788       return GST_CLOCK_TIME_NONE;
1789   }
1790
1791   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1792     return GST_CLOCK_TIME_NONE;
1793
1794   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1795   latency = self->priv->peer_latency_min;
1796
1797   /* add our own */
1798   latency += self->priv->latency;
1799   latency += self->priv->sub_latency_min;
1800
1801   return latency;
1802 }
1803
1804 /**
1805  * gst_aggregator_get_latency:
1806  * @self: a #GstAggregator
1807  *
1808  * Retrieves the latency values reported by @self in response to the latency
1809  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1810  * will not wait for the clock.
1811  *
1812  * Typically only called by subclasses.
1813  *
1814  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1815  */
1816 GstClockTime
1817 gst_aggregator_get_latency (GstAggregator * self)
1818 {
1819   GstClockTime ret;
1820
1821   SRC_LOCK (self);
1822   ret = gst_aggregator_get_latency_unlocked (self);
1823   SRC_UNLOCK (self);
1824
1825   return ret;
1826 }
1827
1828 static gboolean
1829 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1830 {
1831   GstAggregator *self = GST_AGGREGATOR (element);
1832
1833   GST_STATE_LOCK (element);
1834   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1835       GST_STATE (element) < GST_STATE_PAUSED) {
1836     gdouble rate;
1837     GstFormat fmt;
1838     GstSeekFlags flags;
1839     GstSeekType start_type, stop_type;
1840     gint64 start, stop;
1841
1842     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1843         &start, &stop_type, &stop);
1844
1845     GST_OBJECT_LOCK (self);
1846     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1847         stop_type, stop, NULL);
1848     self->priv->seqnum = gst_event_get_seqnum (event);
1849     self->priv->first_buffer = FALSE;
1850     GST_OBJECT_UNLOCK (self);
1851
1852     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1853   }
1854   GST_STATE_UNLOCK (element);
1855
1856
1857   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1858       event);
1859 }
1860
1861 static gboolean
1862 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1863 {
1864   gboolean res = TRUE;
1865
1866   switch (GST_QUERY_TYPE (query)) {
1867     case GST_QUERY_SEEKING:
1868     {
1869       GstFormat format;
1870
1871       /* don't pass it along as some (file)sink might claim it does
1872        * whereas with a collectpads in between that will not likely work */
1873       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1874       gst_query_set_seeking (query, format, FALSE, 0, -1);
1875       res = TRUE;
1876
1877       break;
1878     }
1879     case GST_QUERY_LATENCY:
1880       SRC_LOCK (self);
1881       res = gst_aggregator_query_latency_unlocked (self, query);
1882       SRC_UNLOCK (self);
1883       break;
1884     default:
1885       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1886   }
1887
1888   return res;
1889 }
1890
1891 static gboolean
1892 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1893 {
1894   EventData *evdata = user_data;
1895   gboolean ret = TRUE;
1896   GstPad *peer = gst_pad_get_peer (pad);
1897   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1898
1899   if (peer) {
1900     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1901       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1902       ret = TRUE;
1903     } else {
1904       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1905       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1906       gst_object_unref (peer);
1907     }
1908   }
1909
1910   if (ret == FALSE) {
1911     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1912       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1913
1914       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1915
1916       if (gst_pad_query (peer, seeking)) {
1917         gboolean seekable;
1918
1919         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1920
1921         if (seekable == FALSE) {
1922           GST_INFO_OBJECT (pad,
1923               "Source not seekable, We failed but it does not matter!");
1924
1925           ret = TRUE;
1926         }
1927       } else {
1928         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1929       }
1930
1931       gst_query_unref (seeking);
1932     }
1933
1934     if (evdata->flush) {
1935       PAD_LOCK (aggpad);
1936       aggpad->priv->pending_flush_start = FALSE;
1937       aggpad->priv->pending_flush_stop = FALSE;
1938       PAD_UNLOCK (aggpad);
1939     }
1940   } else {
1941     evdata->one_actually_seeked = TRUE;
1942   }
1943
1944   evdata->result &= ret;
1945
1946   /* Always send to all pads */
1947   return FALSE;
1948 }
1949
1950 static void
1951 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1952     EventData * evdata)
1953 {
1954   evdata->result = TRUE;
1955   evdata->one_actually_seeked = FALSE;
1956
1957   /* We first need to set all pads as flushing in a first pass
1958    * as flush_start flush_stop is sometimes sent synchronously
1959    * while we send the seek event */
1960   if (evdata->flush) {
1961     GList *l;
1962
1963     GST_OBJECT_LOCK (self);
1964     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1965       GstAggregatorPad *pad = l->data;
1966
1967       PAD_LOCK (pad);
1968       pad->priv->pending_flush_start = TRUE;
1969       pad->priv->pending_flush_stop = FALSE;
1970       PAD_UNLOCK (pad);
1971     }
1972     GST_OBJECT_UNLOCK (self);
1973   }
1974
1975   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
1976
1977   gst_event_unref (evdata->event);
1978 }
1979
1980 static gboolean
1981 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1982 {
1983   gdouble rate;
1984   GstFormat fmt;
1985   GstSeekFlags flags;
1986   GstSeekType start_type, stop_type;
1987   gint64 start, stop;
1988   gboolean flush;
1989   EventData evdata = { 0, };
1990   GstAggregatorPrivate *priv = self->priv;
1991
1992   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1993       &start, &stop_type, &stop);
1994
1995   GST_INFO_OBJECT (self, "starting SEEK");
1996
1997   flush = flags & GST_SEEK_FLAG_FLUSH;
1998
1999   GST_OBJECT_LOCK (self);
2000   if (flush) {
2001     priv->pending_flush_start = TRUE;
2002     priv->flush_seeking = TRUE;
2003   }
2004
2005   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
2006       stop_type, stop, NULL);
2007
2008   /* Seeking sets a position */
2009   self->priv->first_buffer = FALSE;
2010   GST_OBJECT_UNLOCK (self);
2011
2012   /* forward the seek upstream */
2013   evdata.event = event;
2014   evdata.flush = flush;
2015   evdata.only_to_active_pads = FALSE;
2016   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2017   event = NULL;
2018
2019   if (!evdata.result || !evdata.one_actually_seeked) {
2020     GST_OBJECT_LOCK (self);
2021     priv->flush_seeking = FALSE;
2022     priv->pending_flush_start = FALSE;
2023     GST_OBJECT_UNLOCK (self);
2024   }
2025
2026   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2027
2028   return evdata.result;
2029 }
2030
2031 static gboolean
2032 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2033 {
2034   EventData evdata = { 0, };
2035
2036   switch (GST_EVENT_TYPE (event)) {
2037     case GST_EVENT_SEEK:
2038       /* _do_seek() unrefs the event. */
2039       return gst_aggregator_do_seek (self, event);
2040     case GST_EVENT_NAVIGATION:
2041       /* navigation is rather pointless. */
2042       gst_event_unref (event);
2043       return FALSE;
2044     default:
2045       break;
2046   }
2047
2048   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2049    * they will receive a QOS event that has earliest_time=0 (because we can't
2050    * have negative timestamps), and consider their buffer as too late */
2051   evdata.event = event;
2052   evdata.flush = FALSE;
2053   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2054   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2055   return evdata.result;
2056 }
2057
2058 static gboolean
2059 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2060     GstEvent * event)
2061 {
2062   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2063
2064   return klass->src_event (GST_AGGREGATOR (parent), event);
2065 }
2066
2067 static gboolean
2068 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2069     GstQuery * query)
2070 {
2071   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2072
2073   return klass->src_query (GST_AGGREGATOR (parent), query);
2074 }
2075
2076 static gboolean
2077 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2078     GstObject * parent, GstPadMode mode, gboolean active)
2079 {
2080   GstAggregator *self = GST_AGGREGATOR (parent);
2081   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2082
2083   if (klass->src_activate) {
2084     if (klass->src_activate (self, mode, active) == FALSE) {
2085       return FALSE;
2086     }
2087   }
2088
2089   if (active == TRUE) {
2090     switch (mode) {
2091       case GST_PAD_MODE_PUSH:
2092       {
2093         GST_INFO_OBJECT (pad, "Activating pad!");
2094         gst_aggregator_start_srcpad_task (self);
2095         return TRUE;
2096       }
2097       default:
2098       {
2099         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2100         return FALSE;
2101       }
2102     }
2103   }
2104
2105   /* deactivating */
2106   GST_INFO_OBJECT (self, "Deactivating srcpad");
2107   gst_aggregator_stop_srcpad_task (self, FALSE);
2108
2109   return TRUE;
2110 }
2111
2112 static gboolean
2113 gst_aggregator_default_sink_query (GstAggregator * self,
2114     GstAggregatorPad * aggpad, GstQuery * query)
2115 {
2116   GstPad *pad = GST_PAD (aggpad);
2117
2118   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2119     GstQuery *decide_query = NULL;
2120     GstAggregatorClass *agg_class;
2121     gboolean ret;
2122
2123     GST_OBJECT_LOCK (self);
2124     PAD_LOCK (aggpad);
2125     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2126       GST_DEBUG_OBJECT (self,
2127           "not negotiated yet, can't answer ALLOCATION query");
2128       PAD_UNLOCK (aggpad);
2129       GST_OBJECT_UNLOCK (self);
2130
2131       return FALSE;
2132     }
2133
2134     if ((decide_query = self->priv->allocation_query))
2135       gst_query_ref (decide_query);
2136     PAD_UNLOCK (aggpad);
2137     GST_OBJECT_UNLOCK (self);
2138
2139     GST_DEBUG_OBJECT (self,
2140         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2141
2142     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2143
2144     /* pass the query to the propose_allocation vmethod if any */
2145     if (agg_class->propose_allocation)
2146       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2147     else
2148       ret = FALSE;
2149
2150     if (decide_query)
2151       gst_query_unref (decide_query);
2152
2153     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2154     return ret;
2155   }
2156
2157   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2158 }
2159
2160 static void
2161 gst_aggregator_finalize (GObject * object)
2162 {
2163   GstAggregator *self = (GstAggregator *) object;
2164
2165   g_mutex_clear (&self->priv->src_lock);
2166   g_cond_clear (&self->priv->src_cond);
2167
2168   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2169 }
2170
2171 /*
2172  * gst_aggregator_set_latency_property:
2173  * @agg: a #GstAggregator
2174  * @latency: the new latency value (in nanoseconds).
2175  *
2176  * Sets the new latency value to @latency. This value is used to limit the
2177  * amount of time a pad waits for data to appear before considering the pad
2178  * as unresponsive.
2179  */
2180 static void
2181 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
2182 {
2183   gboolean changed;
2184
2185   g_return_if_fail (GST_IS_AGGREGATOR (self));
2186   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2187
2188   SRC_LOCK (self);
2189   changed = (self->priv->latency != latency);
2190
2191   if (changed) {
2192     GList *item;
2193
2194     GST_OBJECT_LOCK (self);
2195     /* First lock all the pads */
2196     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2197       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2198       PAD_LOCK (aggpad);
2199     }
2200
2201     self->priv->latency = latency;
2202
2203     SRC_BROADCAST (self);
2204
2205     /* Now wake up the pads */
2206     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2207       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2208       PAD_BROADCAST_EVENT (aggpad);
2209       PAD_UNLOCK (aggpad);
2210     }
2211     GST_OBJECT_UNLOCK (self);
2212   }
2213
2214   SRC_UNLOCK (self);
2215
2216   if (changed)
2217     gst_element_post_message (GST_ELEMENT_CAST (self),
2218         gst_message_new_latency (GST_OBJECT_CAST (self)));
2219 }
2220
2221 /*
2222  * gst_aggregator_get_latency_property:
2223  * @agg: a #GstAggregator
2224  *
2225  * Gets the latency value. See gst_aggregator_set_latency for
2226  * more details.
2227  *
2228  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2229  * before a pad is deemed unresponsive. A value of -1 means an
2230  * unlimited time.
2231  */
2232 static gint64
2233 gst_aggregator_get_latency_property (GstAggregator * agg)
2234 {
2235   gint64 res;
2236
2237   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
2238
2239   GST_OBJECT_LOCK (agg);
2240   res = agg->priv->latency;
2241   GST_OBJECT_UNLOCK (agg);
2242
2243   return res;
2244 }
2245
2246 static void
2247 gst_aggregator_set_property (GObject * object, guint prop_id,
2248     const GValue * value, GParamSpec * pspec)
2249 {
2250   GstAggregator *agg = GST_AGGREGATOR (object);
2251
2252   switch (prop_id) {
2253     case PROP_LATENCY:
2254       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
2255       break;
2256     case PROP_START_TIME_SELECTION:
2257       agg->priv->start_time_selection = g_value_get_enum (value);
2258       break;
2259     case PROP_START_TIME:
2260       agg->priv->start_time = g_value_get_uint64 (value);
2261       break;
2262     default:
2263       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2264       break;
2265   }
2266 }
2267
2268 static void
2269 gst_aggregator_get_property (GObject * object, guint prop_id,
2270     GValue * value, GParamSpec * pspec)
2271 {
2272   GstAggregator *agg = GST_AGGREGATOR (object);
2273
2274   switch (prop_id) {
2275     case PROP_LATENCY:
2276       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
2277       break;
2278     case PROP_START_TIME_SELECTION:
2279       g_value_set_enum (value, agg->priv->start_time_selection);
2280       break;
2281     case PROP_START_TIME:
2282       g_value_set_uint64 (value, agg->priv->start_time);
2283       break;
2284     default:
2285       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2286       break;
2287   }
2288 }
2289
2290 /* GObject vmethods implementations */
2291 static void
2292 gst_aggregator_class_init (GstAggregatorClass * klass)
2293 {
2294   GObjectClass *gobject_class = (GObjectClass *) klass;
2295   GstElementClass *gstelement_class = (GstElementClass *) klass;
2296
2297   aggregator_parent_class = g_type_class_peek_parent (klass);
2298   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
2299
2300   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2301       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2302
2303   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
2304
2305   klass->sink_event = gst_aggregator_default_sink_event;
2306   klass->sink_query = gst_aggregator_default_sink_query;
2307
2308   klass->src_event = gst_aggregator_default_src_event;
2309   klass->src_query = gst_aggregator_default_src_query;
2310
2311   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2312   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2313   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2314   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2315
2316   gstelement_class->request_new_pad =
2317       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2318   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2319   gstelement_class->release_pad =
2320       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2321   gstelement_class->change_state =
2322       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2323
2324   gobject_class->set_property = gst_aggregator_set_property;
2325   gobject_class->get_property = gst_aggregator_get_property;
2326   gobject_class->finalize = gst_aggregator_finalize;
2327
2328   g_object_class_install_property (gobject_class, PROP_LATENCY,
2329       g_param_spec_int64 ("latency", "Buffer latency",
2330           "Additional latency in live mode to allow upstream "
2331           "to take longer to produce buffers for the current "
2332           "position (in nanoseconds)", 0,
2333           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
2334           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2335
2336   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2337       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2338           "Decides which start time is output",
2339           gst_aggregator_start_time_selection_get_type (),
2340           DEFAULT_START_TIME_SELECTION,
2341           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2342
2343   g_object_class_install_property (gobject_class, PROP_START_TIME,
2344       g_param_spec_uint64 ("start-time", "Start Time",
2345           "Start time to use if start-time-selection=set", 0,
2346           G_MAXUINT64,
2347           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2348
2349   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
2350   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
2351 }
2352
2353 static void
2354 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2355 {
2356   GstPadTemplate *pad_template;
2357   GstAggregatorPrivate *priv;
2358
2359   g_return_if_fail (klass->aggregate != NULL);
2360
2361   self->priv =
2362       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
2363       GstAggregatorPrivate);
2364
2365   priv = self->priv;
2366
2367   pad_template =
2368       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2369   g_return_if_fail (pad_template != NULL);
2370
2371   priv->max_padserial = -1;
2372   priv->tags_changed = FALSE;
2373
2374   self->priv->peer_latency_live = FALSE;
2375   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2376   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2377   self->priv->has_peer_latency = FALSE;
2378   gst_aggregator_reset_flow_values (self);
2379
2380   self->srcpad = gst_pad_new_from_template (pad_template, "src");
2381
2382   gst_pad_set_event_function (self->srcpad,
2383       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2384   gst_pad_set_query_function (self->srcpad,
2385       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2386   gst_pad_set_activatemode_function (self->srcpad,
2387       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2388
2389   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2390
2391   self->priv->latency = DEFAULT_LATENCY;
2392   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2393   self->priv->start_time = DEFAULT_START_TIME;
2394
2395   g_mutex_init (&self->priv->src_lock);
2396   g_cond_init (&self->priv->src_cond);
2397 }
2398
2399 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2400  * method to get to the padtemplates */
2401 GType
2402 gst_aggregator_get_type (void)
2403 {
2404   static volatile gsize type = 0;
2405
2406   if (g_once_init_enter (&type)) {
2407     GType _type;
2408     static const GTypeInfo info = {
2409       sizeof (GstAggregatorClass),
2410       NULL,
2411       NULL,
2412       (GClassInitFunc) gst_aggregator_class_init,
2413       NULL,
2414       NULL,
2415       sizeof (GstAggregator),
2416       0,
2417       (GInstanceInitFunc) gst_aggregator_init,
2418     };
2419
2420     _type = g_type_register_static (GST_TYPE_ELEMENT,
2421         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2422     g_once_init_leave (&type, _type);
2423   }
2424   return type;
2425 }
2426
2427 /* Must be called with SRC lock and PAD lock held */
2428 static gboolean
2429 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2430 {
2431   /* Empty queue always has space */
2432   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2433     return TRUE;
2434
2435   /* We also want at least two buffers, one is being processed and one is ready
2436    * for the next iteration when we operate in live mode. */
2437   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2438     return TRUE;
2439
2440   /* zero latency, if there is a buffer, it's full */
2441   if (self->priv->latency == 0)
2442     return FALSE;
2443
2444   /* Allow no more buffers than the latency */
2445   return (aggpad->priv->time_level <= self->priv->latency);
2446 }
2447
2448 /* Must be called with the PAD_LOCK held */
2449 static void
2450 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2451 {
2452   GstClockTime timestamp;
2453
2454   if (GST_BUFFER_DTS_IS_VALID (buffer))
2455     timestamp = GST_BUFFER_DTS (buffer);
2456   else
2457     timestamp = GST_BUFFER_PTS (buffer);
2458
2459   if (timestamp == GST_CLOCK_TIME_NONE) {
2460     if (head)
2461       timestamp = aggpad->priv->head_position;
2462     else
2463       timestamp = aggpad->priv->tail_position;
2464   }
2465
2466   /* add duration */
2467   if (GST_BUFFER_DURATION_IS_VALID (buffer))
2468     timestamp += GST_BUFFER_DURATION (buffer);
2469
2470   if (head)
2471     aggpad->priv->head_position = timestamp;
2472   else
2473     aggpad->priv->tail_position = timestamp;
2474
2475   update_time_level (aggpad, head);
2476 }
2477
2478 static GstFlowReturn
2479 gst_aggregator_pad_chain_internal (GstAggregator * self,
2480     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2481 {
2482   GstFlowReturn flow_return;
2483   GstClockTime buf_pts;
2484
2485   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
2486
2487   PAD_FLUSH_LOCK (aggpad);
2488
2489   PAD_LOCK (aggpad);
2490   flow_return = aggpad->priv->flow_return;
2491   if (flow_return != GST_FLOW_OK)
2492     goto flushing;
2493
2494   if (aggpad->priv->pending_eos == TRUE)
2495     goto eos;
2496
2497   PAD_UNLOCK (aggpad);
2498
2499   buf_pts = GST_BUFFER_PTS (buffer);
2500
2501   for (;;) {
2502     SRC_LOCK (self);
2503     GST_OBJECT_LOCK (self);
2504     PAD_LOCK (aggpad);
2505
2506     if (aggpad->priv->first_buffer) {
2507       self->priv->has_peer_latency = FALSE;
2508       aggpad->priv->first_buffer = FALSE;
2509     }
2510
2511     if (gst_aggregator_pad_has_space (self, aggpad)
2512         && aggpad->priv->flow_return == GST_FLOW_OK) {
2513       if (head)
2514         g_queue_push_head (&aggpad->priv->data, buffer);
2515       else
2516         g_queue_push_tail (&aggpad->priv->data, buffer);
2517       apply_buffer (aggpad, buffer, head);
2518       aggpad->priv->num_buffers++;
2519       buffer = NULL;
2520       SRC_BROADCAST (self);
2521       break;
2522     }
2523
2524     flow_return = aggpad->priv->flow_return;
2525     if (flow_return != GST_FLOW_OK) {
2526       GST_OBJECT_UNLOCK (self);
2527       SRC_UNLOCK (self);
2528       goto flushing;
2529     }
2530     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2531     GST_OBJECT_UNLOCK (self);
2532     SRC_UNLOCK (self);
2533     PAD_WAIT_EVENT (aggpad);
2534
2535     PAD_UNLOCK (aggpad);
2536   }
2537
2538   if (self->priv->first_buffer) {
2539     GstClockTime start_time;
2540
2541     switch (self->priv->start_time_selection) {
2542       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2543       default:
2544         start_time = 0;
2545         break;
2546       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2547         GST_OBJECT_LOCK (aggpad);
2548         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2549           start_time = buf_pts;
2550           if (start_time != -1) {
2551             start_time = MAX (start_time, aggpad->priv->head_segment.start);
2552             start_time =
2553                 gst_segment_to_running_time (&aggpad->priv->head_segment,
2554                 GST_FORMAT_TIME, start_time);
2555           }
2556         } else {
2557           start_time = 0;
2558           GST_WARNING_OBJECT (aggpad,
2559               "Ignoring request of selecting the first start time "
2560               "as the segment is a %s segment instead of a time segment",
2561               gst_format_get_name (aggpad->segment.format));
2562         }
2563         GST_OBJECT_UNLOCK (aggpad);
2564         break;
2565       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2566         start_time = self->priv->start_time;
2567         if (start_time == -1)
2568           start_time = 0;
2569         break;
2570     }
2571
2572     if (start_time != -1) {
2573       if (self->segment.position == -1)
2574         self->segment.position = start_time;
2575       else
2576         self->segment.position = MIN (start_time, self->segment.position);
2577
2578       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2579           GST_TIME_ARGS (start_time));
2580     }
2581   }
2582
2583   PAD_UNLOCK (aggpad);
2584   GST_OBJECT_UNLOCK (self);
2585   SRC_UNLOCK (self);
2586
2587   PAD_FLUSH_UNLOCK (aggpad);
2588
2589   GST_DEBUG_OBJECT (aggpad, "Done chaining");
2590
2591   return flow_return;
2592
2593 flushing:
2594   PAD_UNLOCK (aggpad);
2595   PAD_FLUSH_UNLOCK (aggpad);
2596
2597   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2598       gst_flow_get_name (flow_return));
2599   if (buffer)
2600     gst_buffer_unref (buffer);
2601
2602   return flow_return;
2603
2604 eos:
2605   PAD_UNLOCK (aggpad);
2606   PAD_FLUSH_UNLOCK (aggpad);
2607
2608   gst_buffer_unref (buffer);
2609   GST_DEBUG_OBJECT (aggpad, "We are EOS already...");
2610
2611   return GST_FLOW_EOS;
2612 }
2613
2614 static GstFlowReturn
2615 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2616 {
2617   return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2618       GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
2619 }
2620
2621 static gboolean
2622 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2623     GstQuery * query)
2624 {
2625   GstAggregator *self = GST_AGGREGATOR (parent);
2626   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2627   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2628
2629   if (GST_QUERY_IS_SERIALIZED (query)) {
2630     GstStructure *s;
2631     gboolean ret = FALSE;
2632
2633     SRC_LOCK (self);
2634     PAD_LOCK (aggpad);
2635
2636     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2637       SRC_UNLOCK (self);
2638       goto flushing;
2639     }
2640
2641     g_queue_push_head (&aggpad->priv->data, query);
2642     SRC_BROADCAST (self);
2643     SRC_UNLOCK (self);
2644
2645     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2646         && aggpad->priv->flow_return == GST_FLOW_OK) {
2647       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2648       PAD_WAIT_EVENT (aggpad);
2649     }
2650
2651     s = gst_query_writable_structure (query);
2652     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2653       gst_structure_remove_field (s, "gst-aggregator-retval");
2654     else
2655       g_queue_remove (&aggpad->priv->data, query);
2656
2657     if (aggpad->priv->flow_return != GST_FLOW_OK)
2658       goto flushing;
2659
2660     PAD_UNLOCK (aggpad);
2661
2662     return ret;
2663   }
2664
2665   return klass->sink_query (self, aggpad, query);
2666
2667 flushing:
2668   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2669       gst_flow_get_name (aggpad->priv->flow_return));
2670   PAD_UNLOCK (aggpad);
2671
2672   return FALSE;
2673 }
2674
2675 static GstFlowReturn
2676 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2677     GstEvent * event)
2678 {
2679   GstFlowReturn ret = GST_FLOW_OK;
2680   GstAggregator *self = GST_AGGREGATOR (parent);
2681   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2682   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2683
2684   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
2685       /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) {
2686     SRC_LOCK (self);
2687     PAD_LOCK (aggpad);
2688
2689     if (aggpad->priv->flow_return != GST_FLOW_OK
2690         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2691       ret = aggpad->priv->flow_return;
2692       goto flushing;
2693     }
2694
2695     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2696       GST_OBJECT_LOCK (aggpad);
2697       gst_event_copy_segment (event, &aggpad->priv->head_segment);
2698       aggpad->priv->head_position = aggpad->priv->head_segment.position;
2699       update_time_level (aggpad, TRUE);
2700       GST_OBJECT_UNLOCK (aggpad);
2701     }
2702
2703     if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2704       GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
2705           event);
2706       g_queue_push_head (&aggpad->priv->data, event);
2707       event = NULL;
2708       SRC_BROADCAST (self);
2709     }
2710     PAD_UNLOCK (aggpad);
2711     SRC_UNLOCK (self);
2712   }
2713
2714   if (event) {
2715     if (!klass->sink_event (self, aggpad, event)) {
2716       /* Copied from GstPad to convert boolean to a GstFlowReturn in
2717        * the event handling func */
2718       ret = GST_FLOW_ERROR;
2719     }
2720   }
2721
2722   return ret;
2723
2724 flushing:
2725   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2726       gst_flow_get_name (aggpad->priv->flow_return));
2727   PAD_UNLOCK (aggpad);
2728   SRC_UNLOCK (self);
2729   if (GST_EVENT_IS_STICKY (event))
2730     gst_pad_store_sticky_event (pad, event);
2731   gst_event_unref (event);
2732
2733   return ret;
2734 }
2735
2736 static gboolean
2737 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2738     GstObject * parent, GstPadMode mode, gboolean active)
2739 {
2740   GstAggregator *self = GST_AGGREGATOR (parent);
2741   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2742
2743   if (active == FALSE) {
2744     SRC_LOCK (self);
2745     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2746     SRC_BROADCAST (self);
2747     SRC_UNLOCK (self);
2748   } else {
2749     PAD_LOCK (aggpad);
2750     aggpad->priv->flow_return = GST_FLOW_OK;
2751     PAD_BROADCAST_EVENT (aggpad);
2752     PAD_UNLOCK (aggpad);
2753   }
2754
2755   return TRUE;
2756 }
2757
2758 /***********************************
2759  * GstAggregatorPad implementation  *
2760  ************************************/
2761 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2762
2763 static void
2764 gst_aggregator_pad_constructed (GObject * object)
2765 {
2766   GstPad *pad = GST_PAD (object);
2767
2768   gst_pad_set_chain_function (pad,
2769       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2770   gst_pad_set_event_full_function_full (pad,
2771       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2772   gst_pad_set_query_function (pad,
2773       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2774   gst_pad_set_activatemode_function (pad,
2775       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2776 }
2777
2778 static void
2779 gst_aggregator_pad_finalize (GObject * object)
2780 {
2781   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2782
2783   g_cond_clear (&pad->priv->event_cond);
2784   g_mutex_clear (&pad->priv->flush_lock);
2785   g_mutex_clear (&pad->priv->lock);
2786
2787   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2788 }
2789
2790 static void
2791 gst_aggregator_pad_dispose (GObject * object)
2792 {
2793   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2794
2795   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2796
2797   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2798 }
2799
2800 static void
2801 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2802 {
2803   GObjectClass *gobject_class = (GObjectClass *) klass;
2804
2805   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
2806
2807   gobject_class->constructed = gst_aggregator_pad_constructed;
2808   gobject_class->finalize = gst_aggregator_pad_finalize;
2809   gobject_class->dispose = gst_aggregator_pad_dispose;
2810 }
2811
2812 static void
2813 gst_aggregator_pad_init (GstAggregatorPad * pad)
2814 {
2815   pad->priv =
2816       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
2817       GstAggregatorPadPrivate);
2818
2819   g_queue_init (&pad->priv->data);
2820   g_cond_init (&pad->priv->event_cond);
2821
2822   g_mutex_init (&pad->priv->flush_lock);
2823   g_mutex_init (&pad->priv->lock);
2824
2825   gst_aggregator_pad_reset_unlocked (pad);
2826   pad->priv->negotiated = FALSE;
2827 }
2828
2829 /* Must be called with the PAD_LOCK held */
2830 static void
2831 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
2832 {
2833   pad->priv->num_buffers--;
2834   GST_TRACE_OBJECT (pad, "Consuming buffer");
2835   if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
2836     pad->priv->pending_eos = FALSE;
2837     pad->priv->eos = TRUE;
2838   }
2839   PAD_BROADCAST_EVENT (pad);
2840 }
2841
2842 /* Must be called with the PAD_LOCK held */
2843 static void
2844 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
2845 {
2846   GstAggregator *self = NULL;
2847   GstAggregatorClass *aggclass = NULL;
2848   GstBuffer *buffer = NULL;
2849
2850   while (pad->priv->clipped_buffer == NULL &&
2851       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
2852     buffer = g_queue_pop_tail (&pad->priv->data);
2853
2854     apply_buffer (pad, buffer, FALSE);
2855
2856     /* We only take the parent here so that it's not taken if the buffer is
2857      * already clipped or if the queue is empty.
2858      */
2859     if (self == NULL) {
2860       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
2861       if (self == NULL) {
2862         gst_buffer_unref (buffer);
2863         return;
2864       }
2865
2866       aggclass = GST_AGGREGATOR_GET_CLASS (self);
2867     }
2868
2869     if (aggclass->clip) {
2870       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
2871
2872       buffer = aggclass->clip (self, pad, buffer);
2873
2874       if (buffer == NULL) {
2875         gst_aggregator_pad_buffer_consumed (pad);
2876         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
2877       }
2878     }
2879
2880     pad->priv->clipped_buffer = buffer;
2881   }
2882
2883   if (self)
2884     gst_object_unref (self);
2885 }
2886
2887 /**
2888  * gst_aggregator_pad_steal_buffer:
2889  * @pad: the pad to get buffer from
2890  *
2891  * Steal the ref to the buffer currently queued in @pad.
2892  *
2893  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2894  *   queued. You should unref the buffer after usage.
2895  */
2896 GstBuffer *
2897 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
2898 {
2899   GstBuffer *buffer;
2900
2901   PAD_LOCK (pad);
2902
2903   gst_aggregator_pad_clip_buffer_unlocked (pad);
2904
2905   buffer = pad->priv->clipped_buffer;
2906
2907   if (buffer) {
2908     pad->priv->clipped_buffer = NULL;
2909     gst_aggregator_pad_buffer_consumed (pad);
2910     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2911   }
2912
2913   PAD_UNLOCK (pad);
2914
2915   return buffer;
2916 }
2917
2918 /**
2919  * gst_aggregator_pad_drop_buffer:
2920  * @pad: the pad where to drop any pending buffer
2921  *
2922  * Drop the buffer currently queued in @pad.
2923  *
2924  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2925  */
2926 gboolean
2927 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2928 {
2929   GstBuffer *buf;
2930
2931   buf = gst_aggregator_pad_steal_buffer (pad);
2932
2933   if (buf == NULL)
2934     return FALSE;
2935
2936   gst_buffer_unref (buf);
2937   return TRUE;
2938 }
2939
2940 /**
2941  * gst_aggregator_pad_get_buffer:
2942  * @pad: the pad to get buffer from
2943  *
2944  * Returns: (transfer full): A reference to the buffer in @pad or
2945  * NULL if no buffer was queued. You should unref the buffer after
2946  * usage.
2947  */
2948 GstBuffer *
2949 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
2950 {
2951   GstBuffer *buffer;
2952
2953   PAD_LOCK (pad);
2954
2955   gst_aggregator_pad_clip_buffer_unlocked (pad);
2956
2957   if (pad->priv->clipped_buffer) {
2958     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
2959   } else {
2960     buffer = NULL;
2961   }
2962   PAD_UNLOCK (pad);
2963
2964   return buffer;
2965 }
2966
2967 gboolean
2968 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2969 {
2970   gboolean is_eos;
2971
2972   PAD_LOCK (pad);
2973   is_eos = pad->priv->eos;
2974   PAD_UNLOCK (pad);
2975
2976   return is_eos;
2977 }
2978
2979 /**
2980  * gst_aggregator_merge_tags:
2981  * @self: a #GstAggregator
2982  * @tags: a #GstTagList to merge
2983  * @mode: the #GstTagMergeMode to use
2984  *
2985  * Adds tags to so-called pending tags, which will be processed
2986  * before pushing out data downstream.
2987  *
2988  * Note that this is provided for convenience, and the subclass is
2989  * not required to use this and can still do tag handling on its own.
2990  *
2991  * MT safe.
2992  */
2993 void
2994 gst_aggregator_merge_tags (GstAggregator * self,
2995     const GstTagList * tags, GstTagMergeMode mode)
2996 {
2997   GstTagList *otags;
2998
2999   g_return_if_fail (GST_IS_AGGREGATOR (self));
3000   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3001
3002   /* FIXME Check if we can use OBJECT lock here! */
3003   GST_OBJECT_LOCK (self);
3004   if (tags)
3005     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3006   otags = self->priv->tags;
3007   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3008   if (otags)
3009     gst_tag_list_unref (otags);
3010   self->priv->tags_changed = TRUE;
3011   GST_OBJECT_UNLOCK (self);
3012 }
3013
3014 /**
3015  * gst_aggregator_set_latency:
3016  * @self: a #GstAggregator
3017  * @min_latency: minimum latency
3018  * @max_latency: maximum latency
3019  *
3020  * Lets #GstAggregator sub-classes tell the baseclass what their internal
3021  * latency is. Will also post a LATENCY message on the bus so the pipeline
3022  * can reconfigure its global latency.
3023  */
3024 void
3025 gst_aggregator_set_latency (GstAggregator * self,
3026     GstClockTime min_latency, GstClockTime max_latency)
3027 {
3028   gboolean changed = FALSE;
3029
3030   g_return_if_fail (GST_IS_AGGREGATOR (self));
3031   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3032   g_return_if_fail (max_latency >= min_latency);
3033
3034   SRC_LOCK (self);
3035   if (self->priv->sub_latency_min != min_latency) {
3036     self->priv->sub_latency_min = min_latency;
3037     changed = TRUE;
3038   }
3039   if (self->priv->sub_latency_max != max_latency) {
3040     self->priv->sub_latency_max = max_latency;
3041     changed = TRUE;
3042   }
3043
3044   if (changed)
3045     SRC_BROADCAST (self);
3046   SRC_UNLOCK (self);
3047
3048   if (changed) {
3049     gst_element_post_message (GST_ELEMENT_CAST (self),
3050         gst_message_new_latency (GST_OBJECT_CAST (self)));
3051   }
3052 }
3053
3054 /**
3055  * gst_aggregator_get_buffer_pool:
3056  * @self: a #GstAggregator
3057  *
3058  * Returns: (transfer full): the instance of the #GstBufferPool used
3059  * by @trans; free it after use it
3060  */
3061 GstBufferPool *
3062 gst_aggregator_get_buffer_pool (GstAggregator * self)
3063 {
3064   GstBufferPool *pool;
3065
3066   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3067
3068   GST_OBJECT_LOCK (self);
3069   pool = self->priv->pool;
3070   if (pool)
3071     gst_object_ref (pool);
3072   GST_OBJECT_UNLOCK (self);
3073
3074   return pool;
3075 }
3076
3077 /**
3078  * gst_aggregator_get_allocator:
3079  * @self: a #GstAggregator
3080  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3081  * used
3082  * @params: (out) (allow-none) (transfer full): the
3083  * #GstAllocationParams of @allocator
3084  *
3085  * Lets #GstAggregator sub-classes get the memory @allocator
3086  * acquired by the base class and its @params.
3087  *
3088  * Unref the @allocator after use it.
3089  */
3090 void
3091 gst_aggregator_get_allocator (GstAggregator * self,
3092     GstAllocator ** allocator, GstAllocationParams * params)
3093 {
3094   g_return_if_fail (GST_IS_AGGREGATOR (self));
3095
3096   if (allocator)
3097     *allocator = self->priv->allocator ?
3098         gst_object_ref (self->priv->allocator) : NULL;
3099
3100   if (params)
3101     *params = self->priv->allocation_params;
3102 }