aggregator: drop special casing for eos
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: manages a set of pads with the purpose of
26  * aggregating their buffers.
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *
35  *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36  *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
37  *    Subclasses should not overwrite those, but instead implement
38  *    #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
39  *    needed.
40  *
41  *  * When data is queued on all pads, the aggregate vmethod is called.
42  *
43  *  * One can peek at the data on any given GstAggregatorPad with the
44  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
45  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
46  *    has been taken with steal_buffer (), a new buffer can be queued
47  *    on that pad.
48  *
49  *  * If the subclass wishes to push a buffer downstream in its aggregate
50  *    implementation, it should do so through the
51  *    gst_aggregator_finish_buffer () method. This method will take care
52  *    of sending and ordering mandatory events such as stream start, caps
53  *    and segment.
54  *
55  *  * Same goes for EOS events, which should not be pushed directly by the
56  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
57  *    implementation.
58  *
59  *  * Note that the aggregator logic regarding gap event handling is to turn
60  *    these into gap buffers with matching PTS and duration. It will also
61  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
62  *    to ease their identification and subsequent processing.
63  *
64  */
65
66 #ifdef HAVE_CONFIG_H
67 #  include "config.h"
68 #endif
69
70 #include <string.h>             /* strlen */
71
72 #include "gstaggregator.h"
73
74 typedef enum
75 {
76   GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
77   GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
78   GST_AGGREGATOR_START_TIME_SELECTION_SET
79 } GstAggregatorStartTimeSelection;
80
81 static GType
82 gst_aggregator_start_time_selection_get_type (void)
83 {
84   static GType gtype = 0;
85
86   if (gtype == 0) {
87     static const GEnumValue values[] = {
88       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
89           "Start at 0 running time (default)", "zero"},
90       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
91           "Start at first observed input running time", "first"},
92       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
93           "Set start time with start-time property", "set"},
94       {0, NULL, NULL}
95     };
96
97     gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
98   }
99   return gtype;
100 }
101
102 /*  Might become API */
103 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
104     const GstTagList * tags, GstTagMergeMode mode);
105 static void gst_aggregator_set_latency_property (GstAggregator * agg,
106     gint64 latency);
107 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
108
109 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
110
111 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
112 #define GST_CAT_DEFAULT aggregator_debug
113
114 /* Locking order, locks in this element must always be taken in this order
115  *
116  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
117  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
118  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
119  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
120  * standard element object lock -> GST_OBJECT_LOCK(agg)
121  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
122  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
123  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
124  */
125
126 /* GstAggregatorPad definitions */
127 #define PAD_LOCK(pad)   G_STMT_START {                                  \
128   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
129         g_thread_self());                                               \
130   g_mutex_lock(&pad->priv->lock);                                       \
131   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
132         g_thread_self());                                               \
133   } G_STMT_END
134
135 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
136   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
137       g_thread_self());                                                 \
138   g_mutex_unlock(&pad->priv->lock);                                     \
139   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
140         g_thread_self());                                               \
141   } G_STMT_END
142
143
144 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
145   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
146         g_thread_self());                                               \
147   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
148       (&((GstAggregatorPad*)pad)->priv->lock));                         \
149   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
150         g_thread_self());                                               \
151   } G_STMT_END
152
153 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
154   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
155         g_thread_self());                                              \
156   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
157   } G_STMT_END
158
159
160 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
161   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
162         g_thread_self());                                               \
163   g_mutex_lock(&pad->priv->flush_lock);                                 \
164   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
165         g_thread_self());                                               \
166   } G_STMT_END
167
168 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
169   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
170         g_thread_self());                                               \
171   g_mutex_unlock(&pad->priv->flush_lock);                               \
172   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
173         g_thread_self());                                               \
174   } G_STMT_END
175
176 #define SRC_LOCK(self)   G_STMT_START {                             \
177   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
178       g_thread_self());                                             \
179   g_mutex_lock(&self->priv->src_lock);                              \
180   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
181         g_thread_self());                                           \
182   } G_STMT_END
183
184 #define SRC_UNLOCK(self)  G_STMT_START {                            \
185   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
186         g_thread_self());                                           \
187   g_mutex_unlock(&self->priv->src_lock);                            \
188   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
189         g_thread_self());                                           \
190   } G_STMT_END
191
192 #define SRC_WAIT(self) G_STMT_START {                               \
193   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
194         g_thread_self());                                           \
195   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
196   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
197         g_thread_self());                                           \
198   } G_STMT_END
199
200 #define SRC_BROADCAST(self) G_STMT_START {                          \
201     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
202         g_thread_self());                                           \
203     if (self->priv->aggregate_id)                                   \
204       gst_clock_id_unschedule (self->priv->aggregate_id);           \
205     g_cond_broadcast(&(self->priv->src_cond));                      \
206   } G_STMT_END
207
208 struct _GstAggregatorPadPrivate
209 {
210   /* Following fields are protected by the PAD_LOCK */
211   GstFlowReturn flow_return;
212   gboolean pending_flush_start;
213   gboolean pending_flush_stop;
214
215   gboolean first_buffer;
216
217   GQueue data;                  /* buffers, events and queries */
218   GstBuffer *clipped_buffer;
219   guint num_buffers;
220   GstClockTime head_position;
221   GstClockTime tail_position;
222   GstClockTime head_time;       /* running time */
223   GstClockTime tail_time;
224   GstClockTime time_level;      /* how much head is ahead of tail */
225   GstSegment head_segment;      /* segment before the queue */
226
227   gboolean negotiated;
228
229   gboolean eos;
230
231   GMutex lock;
232   GCond event_cond;
233   /* This lock prevents a flush start processing happening while
234    * the chain function is also happening.
235    */
236   GMutex flush_lock;
237 };
238
239 /* Must be called with PAD_LOCK held */
240 static void
241 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
242 {
243   aggpad->priv->eos = FALSE;
244   aggpad->priv->flow_return = GST_FLOW_OK;
245   GST_OBJECT_LOCK (aggpad);
246   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
247   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
248   GST_OBJECT_UNLOCK (aggpad);
249   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
250   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
251   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
252   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
253   aggpad->priv->time_level = 0;
254   aggpad->priv->first_buffer = TRUE;
255 }
256
257 static gboolean
258 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
259 {
260   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
261
262   PAD_LOCK (aggpad);
263   gst_aggregator_pad_reset_unlocked (aggpad);
264   PAD_UNLOCK (aggpad);
265
266   if (klass->flush)
267     return klass->flush (aggpad, agg);
268
269   return TRUE;
270 }
271
272 /*************************************
273  * GstAggregator implementation  *
274  *************************************/
275 static GstElementClass *aggregator_parent_class = NULL;
276
277 /* All members are protected by the object lock unless otherwise noted */
278
279 struct _GstAggregatorPrivate
280 {
281   gint max_padserial;
282
283   /* Our state is >= PAUSED */
284   gboolean running;             /* protected by src_lock */
285
286   gint seqnum;
287   gboolean send_stream_start;   /* protected by srcpad stream lock */
288   gboolean send_segment;
289   gboolean flush_seeking;
290   gboolean pending_flush_start;
291   gboolean send_eos;            /* protected by srcpad stream lock */
292
293   GstCaps *srccaps;             /* protected by the srcpad stream lock */
294
295   GstTagList *tags;
296   gboolean tags_changed;
297
298   gboolean peer_latency_live;   /* protected by src_lock */
299   GstClockTime peer_latency_min;        /* protected by src_lock */
300   GstClockTime peer_latency_max;        /* protected by src_lock */
301   gboolean has_peer_latency;    /* protected by src_lock */
302
303   GstClockTime sub_latency_min; /* protected by src_lock */
304   GstClockTime sub_latency_max; /* protected by src_lock */
305
306   /* aggregate */
307   GstClockID aggregate_id;      /* protected by src_lock */
308   GMutex src_lock;
309   GCond src_cond;
310
311   gboolean first_buffer;        /* protected by object lock */
312   GstAggregatorStartTimeSelection start_time_selection;
313   GstClockTime start_time;
314
315   /* protected by the object lock */
316   GstQuery *allocation_query;
317   GstAllocator *allocator;
318   GstBufferPool *pool;
319   GstAllocationParams allocation_params;
320
321   /* properties */
322   gint64 latency;               /* protected by both src_lock and all pad locks */
323 };
324
325 /* Seek event forwarding helper */
326 typedef struct
327 {
328   /* parameters */
329   GstEvent *event;
330   gboolean flush;
331   gboolean only_to_active_pads;
332
333   /* results */
334   gboolean result;
335   gboolean one_actually_seeked;
336 } EventData;
337
338 #define DEFAULT_LATENCY              0
339 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
340 #define DEFAULT_START_TIME           (-1)
341
342 enum
343 {
344   PROP_0,
345   PROP_LATENCY,
346   PROP_START_TIME_SELECTION,
347   PROP_START_TIME,
348   PROP_LAST
349 };
350
351 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
352     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
353
354 /**
355  * gst_aggregator_iterate_sinkpads:
356  * @self: The #GstAggregator
357  * @func: (scope call): The function to call.
358  * @user_data: (closure): The data to pass to @func.
359  *
360  * Iterate the sinkpads of aggregator to call a function on them.
361  *
362  * This method guarantees that @func will be called only once for each
363  * sink pad.
364  *
365  * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE
366  */
367 gboolean
368 gst_aggregator_iterate_sinkpads (GstAggregator * self,
369     GstAggregatorPadForeachFunc func, gpointer user_data)
370 {
371   gboolean result = FALSE;
372   GstIterator *iter;
373   gboolean done = FALSE;
374   GValue item = { 0, };
375   GList *seen_pads = NULL;
376
377   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
378
379   if (!iter)
380     goto no_iter;
381
382   while (!done) {
383     switch (gst_iterator_next (iter, &item)) {
384       case GST_ITERATOR_OK:
385       {
386         GstAggregatorPad *pad;
387
388         pad = g_value_get_object (&item);
389
390         /* if already pushed, skip. FIXME, find something faster to tag pads */
391         if (pad == NULL || g_list_find (seen_pads, pad)) {
392           g_value_reset (&item);
393           break;
394         }
395
396         GST_LOG_OBJECT (pad, "calling function %s on pad",
397             GST_DEBUG_FUNCPTR_NAME (func));
398
399         result = func (self, pad, user_data);
400
401         done = !result;
402
403         seen_pads = g_list_prepend (seen_pads, pad);
404
405         g_value_reset (&item);
406         break;
407       }
408       case GST_ITERATOR_RESYNC:
409         gst_iterator_resync (iter);
410         break;
411       case GST_ITERATOR_ERROR:
412         GST_ERROR_OBJECT (self,
413             "Could not iterate over internally linked pads");
414         done = TRUE;
415         break;
416       case GST_ITERATOR_DONE:
417         done = TRUE;
418         break;
419     }
420   }
421   g_value_unset (&item);
422   gst_iterator_free (iter);
423
424   if (seen_pads == NULL) {
425     GST_DEBUG_OBJECT (self, "No pad seen");
426     return FALSE;
427   }
428
429   g_list_free (seen_pads);
430
431 no_iter:
432   return result;
433 }
434
435 static gboolean
436 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
437 {
438   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
439       pad->priv->clipped_buffer == NULL);
440 }
441
442 static gboolean
443 gst_aggregator_check_pads_ready (GstAggregator * self)
444 {
445   GstAggregatorPad *pad;
446   GList *l, *sinkpads;
447   gboolean have_buffer = TRUE;
448   gboolean have_event_or_query = FALSE;
449
450   GST_LOG_OBJECT (self, "checking pads");
451
452   GST_OBJECT_LOCK (self);
453
454   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
455   if (sinkpads == NULL)
456     goto no_sinkpads;
457
458   for (l = sinkpads; l != NULL; l = l->next) {
459     pad = l->data;
460
461     PAD_LOCK (pad);
462
463     if (pad->priv->num_buffers == 0) {
464       if (!gst_aggregator_pad_queue_is_empty (pad))
465         have_event_or_query = TRUE;
466       if (!pad->priv->eos) {
467         have_buffer = FALSE;
468
469         /* If not live we need data on all pads, so leave the loop */
470         if (!self->priv->peer_latency_live) {
471           PAD_UNLOCK (pad);
472           goto pad_not_ready;
473         }
474       }
475     } else if (self->priv->peer_latency_live) {
476       /* In live mode, having a single pad with buffers is enough to
477        * generate a start time from it. In non-live mode all pads need
478        * to have a buffer
479        */
480       self->priv->first_buffer = FALSE;
481     }
482
483     PAD_UNLOCK (pad);
484   }
485
486   if (!have_buffer && !have_event_or_query)
487     goto pad_not_ready;
488
489   if (have_buffer)
490     self->priv->first_buffer = FALSE;
491
492   GST_OBJECT_UNLOCK (self);
493   GST_LOG_OBJECT (self, "pads are ready");
494   return TRUE;
495
496 no_sinkpads:
497   {
498     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
499     GST_OBJECT_UNLOCK (self);
500     return FALSE;
501   }
502 pad_not_ready:
503   {
504     if (have_event_or_query)
505       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
506           " but waking up for serialized event");
507     else
508       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
509     GST_OBJECT_UNLOCK (self);
510     return have_event_or_query;
511   }
512 }
513
514 static void
515 gst_aggregator_reset_flow_values (GstAggregator * self)
516 {
517   GST_OBJECT_LOCK (self);
518   self->priv->send_stream_start = TRUE;
519   self->priv->send_segment = TRUE;
520   gst_segment_init (&self->segment, GST_FORMAT_TIME);
521   self->priv->first_buffer = TRUE;
522   GST_OBJECT_UNLOCK (self);
523 }
524
525 static inline void
526 gst_aggregator_push_mandatory_events (GstAggregator * self)
527 {
528   GstAggregatorPrivate *priv = self->priv;
529   GstEvent *segment = NULL;
530   GstEvent *tags = NULL;
531
532   if (self->priv->send_stream_start) {
533     gchar s_id[32];
534
535     GST_INFO_OBJECT (self, "pushing stream start");
536     /* stream-start (FIXME: create id based on input ids) */
537     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
538     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
539       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
540     }
541     self->priv->send_stream_start = FALSE;
542   }
543
544   if (self->priv->srccaps) {
545
546     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
547         self->priv->srccaps);
548     if (!gst_pad_push_event (self->srcpad,
549             gst_event_new_caps (self->priv->srccaps))) {
550       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
551     }
552     gst_caps_unref (self->priv->srccaps);
553     self->priv->srccaps = NULL;
554   }
555
556   GST_OBJECT_LOCK (self);
557   if (self->priv->send_segment && !self->priv->flush_seeking) {
558     segment = gst_event_new_segment (&self->segment);
559
560     if (!self->priv->seqnum)
561       self->priv->seqnum = gst_event_get_seqnum (segment);
562     else
563       gst_event_set_seqnum (segment, self->priv->seqnum);
564     self->priv->send_segment = FALSE;
565
566     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
567   }
568
569   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
570     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
571     priv->tags_changed = FALSE;
572   }
573   GST_OBJECT_UNLOCK (self);
574
575   if (segment)
576     gst_pad_push_event (self->srcpad, segment);
577   if (tags)
578     gst_pad_push_event (self->srcpad, tags);
579
580 }
581
582 /**
583  * gst_aggregator_set_src_caps:
584  * @self: The #GstAggregator
585  * @caps: The #GstCaps to set on the src pad.
586  *
587  * Sets the caps to be used on the src pad.
588  */
589 void
590 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
591 {
592   GST_PAD_STREAM_LOCK (self->srcpad);
593   gst_caps_replace (&self->priv->srccaps, caps);
594   gst_aggregator_push_mandatory_events (self);
595   GST_PAD_STREAM_UNLOCK (self->srcpad);
596 }
597
598 /**
599  * gst_aggregator_finish_buffer:
600  * @self: The #GstAggregator
601  * @buffer: (transfer full): the #GstBuffer to push.
602  *
603  * This method will push the provided output buffer downstream. If needed,
604  * mandatory events such as stream-start, caps, and segment events will be
605  * sent before pushing the buffer.
606  */
607 GstFlowReturn
608 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
609 {
610   gst_aggregator_push_mandatory_events (self);
611
612   GST_OBJECT_LOCK (self);
613   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
614     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
615     GST_OBJECT_UNLOCK (self);
616     return gst_pad_push (self->srcpad, buffer);
617   } else {
618     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
619         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
620     GST_OBJECT_UNLOCK (self);
621     gst_buffer_unref (buffer);
622     return GST_FLOW_OK;
623   }
624 }
625
626 static void
627 gst_aggregator_push_eos (GstAggregator * self)
628 {
629   GstEvent *event;
630   gst_aggregator_push_mandatory_events (self);
631
632   event = gst_event_new_eos ();
633
634   GST_OBJECT_LOCK (self);
635   self->priv->send_eos = FALSE;
636   gst_event_set_seqnum (event, self->priv->seqnum);
637   GST_OBJECT_UNLOCK (self);
638
639   gst_pad_push_event (self->srcpad, event);
640 }
641
642 static GstClockTime
643 gst_aggregator_get_next_time (GstAggregator * self)
644 {
645   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
646
647   if (klass->get_next_time)
648     return klass->get_next_time (self);
649
650   return GST_CLOCK_TIME_NONE;
651 }
652
653 static gboolean
654 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
655 {
656   GstClockTime latency;
657   GstClockTime start;
658   gboolean res;
659
660   *timeout = FALSE;
661
662   SRC_LOCK (self);
663
664   latency = gst_aggregator_get_latency_unlocked (self);
665
666   if (gst_aggregator_check_pads_ready (self)) {
667     GST_DEBUG_OBJECT (self, "all pads have data");
668     SRC_UNLOCK (self);
669
670     return TRUE;
671   }
672
673   /* Before waiting, check if we're actually still running */
674   if (!self->priv->running || !self->priv->send_eos) {
675     SRC_UNLOCK (self);
676
677     return FALSE;
678   }
679
680   start = gst_aggregator_get_next_time (self);
681
682   /* If we're not live, or if we use the running time
683    * of the first buffer as start time, we wait until
684    * all pads have buffers.
685    * Otherwise (i.e. if we are live!), we wait on the clock
686    * and if a pad does not have a buffer in time we ignore
687    * that pad.
688    */
689   GST_OBJECT_LOCK (self);
690   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
691       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
692       !GST_CLOCK_TIME_IS_VALID (start) ||
693       (self->priv->first_buffer
694           && self->priv->start_time_selection ==
695           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
696     /* We wake up here when something happened, and below
697      * then check if we're ready now. If we return FALSE,
698      * we will be directly called again.
699      */
700     GST_OBJECT_UNLOCK (self);
701     SRC_WAIT (self);
702   } else {
703     GstClockTime base_time, time;
704     GstClock *clock;
705     GstClockReturn status;
706     GstClockTimeDiff jitter;
707
708     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
709         GST_TIME_ARGS (start));
710
711     base_time = GST_ELEMENT_CAST (self)->base_time;
712     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
713     GST_OBJECT_UNLOCK (self);
714
715     time = base_time + start;
716     time += latency;
717
718     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
719         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
720         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
721         GST_TIME_ARGS (time),
722         GST_TIME_ARGS (base_time),
723         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
724         GST_TIME_ARGS (gst_clock_get_time (clock)));
725
726     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
727     gst_object_unref (clock);
728     SRC_UNLOCK (self);
729
730     jitter = 0;
731     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
732
733     SRC_LOCK (self);
734     if (self->priv->aggregate_id) {
735       gst_clock_id_unref (self->priv->aggregate_id);
736       self->priv->aggregate_id = NULL;
737     }
738
739     GST_DEBUG_OBJECT (self,
740         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
741         status, GST_STIME_ARGS (jitter));
742
743     /* we timed out */
744     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
745       SRC_UNLOCK (self);
746       *timeout = TRUE;
747       return TRUE;
748     }
749   }
750
751   res = gst_aggregator_check_pads_ready (self);
752   SRC_UNLOCK (self);
753
754   return res;
755 }
756
757 static gboolean
758 gst_aggregator_do_events_and_queries (GstAggregator * self,
759     GstAggregatorPad * pad, gpointer user_data)
760 {
761   GstEvent *event = NULL;
762   GstQuery *query = NULL;
763   GstAggregatorClass *klass = NULL;
764   gboolean *processed_event = user_data;
765
766   do {
767     event = NULL;
768     query = NULL;
769
770     PAD_LOCK (pad);
771     if (pad->priv->clipped_buffer == NULL &&
772         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
773       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
774         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
775       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
776         query = g_queue_peek_tail (&pad->priv->data);
777     }
778     PAD_UNLOCK (pad);
779     if (event || query) {
780       gboolean ret;
781
782       if (processed_event)
783         *processed_event = TRUE;
784       if (klass == NULL)
785         klass = GST_AGGREGATOR_GET_CLASS (self);
786
787       if (event) {
788         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
789         gst_event_ref (event);
790         ret = klass->sink_event (self, pad, event);
791
792         PAD_LOCK (pad);
793         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
794           pad->priv->negotiated = ret;
795         if (g_queue_peek_tail (&pad->priv->data) == event)
796           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
797         gst_event_unref (event);
798       } else if (query) {
799         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
800         ret = klass->sink_query (self, pad, query);
801
802         PAD_LOCK (pad);
803         if (g_queue_peek_tail (&pad->priv->data) == query) {
804           GstStructure *s;
805
806           s = gst_query_writable_structure (query);
807           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
808               NULL);
809           g_queue_pop_tail (&pad->priv->data);
810         }
811       }
812
813       PAD_BROADCAST_EVENT (pad);
814       PAD_UNLOCK (pad);
815     }
816   } while (event || query);
817
818   return TRUE;
819 }
820
821 static void
822 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
823     GstFlowReturn flow_return, gboolean full)
824 {
825   GList *item;
826
827   PAD_LOCK (aggpad);
828   if (flow_return == GST_FLOW_NOT_LINKED)
829     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
830   else
831     aggpad->priv->flow_return = flow_return;
832
833   item = g_queue_peek_head_link (&aggpad->priv->data);
834   while (item) {
835     GList *next = item->next;
836
837     /* In partial flush, we do like the pad, we get rid of non-sticky events
838      * and EOS/SEGMENT.
839      */
840     if (full || GST_IS_BUFFER (item->data) ||
841         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
842         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
843         !GST_EVENT_IS_STICKY (item->data)) {
844       if (!GST_IS_QUERY (item->data))
845         gst_mini_object_unref (item->data);
846       g_queue_delete_link (&aggpad->priv->data, item);
847     }
848     item = next;
849   }
850   aggpad->priv->num_buffers = 0;
851   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
852
853   PAD_BROADCAST_EVENT (aggpad);
854   PAD_UNLOCK (aggpad);
855 }
856
857 static GstFlowReturn
858 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
859     GstCaps ** ret)
860 {
861   *ret = gst_caps_ref (caps);
862
863   return GST_FLOW_OK;
864 }
865
866 static GstCaps *
867 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
868 {
869   caps = gst_caps_fixate (caps);
870
871   return caps;
872 }
873
874 static gboolean
875 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
876 {
877   return TRUE;
878 }
879
880
881 /* takes ownership of the pool, allocator and query */
882 static gboolean
883 gst_aggregator_set_allocation (GstAggregator * self,
884     GstBufferPool * pool, GstAllocator * allocator,
885     GstAllocationParams * params, GstQuery * query)
886 {
887   GstAllocator *oldalloc;
888   GstBufferPool *oldpool;
889   GstQuery *oldquery;
890
891   GST_DEBUG ("storing allocation query");
892
893   GST_OBJECT_LOCK (self);
894   oldpool = self->priv->pool;
895   self->priv->pool = pool;
896
897   oldalloc = self->priv->allocator;
898   self->priv->allocator = allocator;
899
900   oldquery = self->priv->allocation_query;
901   self->priv->allocation_query = query;
902
903   if (params)
904     self->priv->allocation_params = *params;
905   else
906     gst_allocation_params_init (&self->priv->allocation_params);
907   GST_OBJECT_UNLOCK (self);
908
909   if (oldpool) {
910     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
911     gst_buffer_pool_set_active (oldpool, FALSE);
912     gst_object_unref (oldpool);
913   }
914   if (oldalloc) {
915     gst_object_unref (oldalloc);
916   }
917   if (oldquery) {
918     gst_query_unref (oldquery);
919   }
920   return TRUE;
921 }
922
923
924 static gboolean
925 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
926 {
927   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
928
929   if (aggclass->decide_allocation)
930     if (!aggclass->decide_allocation (self, query))
931       return FALSE;
932
933   return TRUE;
934 }
935
936 static gboolean
937 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
938 {
939   GstQuery *query;
940   gboolean result = TRUE;
941   GstBufferPool *pool = NULL;
942   GstAllocator *allocator;
943   GstAllocationParams params;
944
945   /* find a pool for the negotiated caps now */
946   GST_DEBUG_OBJECT (self, "doing allocation query");
947   query = gst_query_new_allocation (caps, TRUE);
948   if (!gst_pad_peer_query (self->srcpad, query)) {
949     /* not a problem, just debug a little */
950     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
951   }
952
953   GST_DEBUG_OBJECT (self, "calling decide_allocation");
954   result = gst_aggregator_decide_allocation (self, query);
955
956   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
957       query);
958
959   if (!result)
960     goto no_decide_allocation;
961
962   /* we got configuration from our peer or the decide_allocation method,
963    * parse them */
964   if (gst_query_get_n_allocation_params (query) > 0) {
965     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
966   } else {
967     allocator = NULL;
968     gst_allocation_params_init (&params);
969   }
970
971   if (gst_query_get_n_allocation_pools (query) > 0)
972     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
973
974   /* now store */
975   result =
976       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
977
978   return result;
979
980   /* Errors */
981 no_decide_allocation:
982   {
983     GST_WARNING_OBJECT (self, "Failed to decide allocation");
984     gst_query_unref (query);
985
986     return result;
987   }
988
989 }
990
991 /* WITH SRC_LOCK held */
992 static GstFlowReturn
993 gst_aggregator_update_src_caps (GstAggregator * self)
994 {
995   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
996   GstCaps *downstream_caps, *template_caps, *caps = NULL;
997   GstFlowReturn ret = GST_FLOW_OK;
998
999   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1000   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1001
1002   if (gst_caps_is_empty (downstream_caps)) {
1003     GST_INFO_OBJECT (self, "Downstream caps (%"
1004         GST_PTR_FORMAT ") not compatible with pad template caps (%"
1005         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1006     ret = GST_FLOW_NOT_NEGOTIATED;
1007     goto done;
1008   }
1009
1010   g_assert (agg_klass->update_src_caps);
1011   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1012       downstream_caps);
1013   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1014   if (ret < GST_FLOW_OK) {
1015     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1016     goto done;
1017   }
1018   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1019     ret = GST_FLOW_NOT_NEGOTIATED;
1020     goto done;
1021   }
1022   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1023
1024 #ifdef GST_ENABLE_EXTRA_CHECKS
1025   if (!gst_caps_is_subset (caps, template_caps)) {
1026     GstCaps *intersection;
1027
1028     GST_ERROR_OBJECT (self,
1029         "update_src_caps returned caps %" GST_PTR_FORMAT
1030         " which are not a real subset of the template caps %"
1031         GST_PTR_FORMAT, caps, template_caps);
1032     g_warning ("%s: update_src_caps returned caps which are not a real "
1033         "subset of the filter caps", GST_ELEMENT_NAME (self));
1034
1035     intersection =
1036         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1037     gst_caps_unref (caps);
1038     caps = intersection;
1039   }
1040 #endif
1041
1042   if (gst_caps_is_any (caps)) {
1043     goto done;
1044   }
1045
1046   if (!gst_caps_is_fixed (caps)) {
1047     g_assert (agg_klass->fixate_src_caps);
1048
1049     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1050     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1051       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1052       ret = GST_FLOW_NOT_NEGOTIATED;
1053       goto done;
1054     }
1055     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1056   }
1057
1058   if (agg_klass->negotiated_src_caps) {
1059     if (!agg_klass->negotiated_src_caps (self, caps)) {
1060       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1061       ret = GST_FLOW_NOT_NEGOTIATED;
1062       goto done;
1063     }
1064   }
1065
1066   gst_aggregator_set_src_caps (self, caps);
1067
1068   if (!gst_aggregator_do_allocation (self, caps)) {
1069     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1070     ret = GST_FLOW_NOT_NEGOTIATED;
1071   }
1072
1073 done:
1074   gst_caps_unref (downstream_caps);
1075   gst_caps_unref (template_caps);
1076
1077   if (caps)
1078     gst_caps_unref (caps);
1079
1080   return ret;
1081 }
1082
1083 static void
1084 gst_aggregator_aggregate_func (GstAggregator * self)
1085 {
1086   GstAggregatorPrivate *priv = self->priv;
1087   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1088   gboolean timeout = FALSE;
1089
1090   if (self->priv->running == FALSE) {
1091     GST_DEBUG_OBJECT (self, "Not running anymore");
1092     return;
1093   }
1094
1095   GST_LOG_OBJECT (self, "Checking aggregate");
1096   while (priv->send_eos && priv->running) {
1097     GstFlowReturn flow_return = GST_FLOW_OK;
1098     gboolean processed_event = FALSE;
1099
1100     gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
1101         NULL);
1102
1103     if (!gst_aggregator_wait_and_check (self, &timeout))
1104       continue;
1105
1106     gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
1107         &processed_event);
1108     if (processed_event)
1109       continue;
1110
1111     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1112       flow_return = gst_aggregator_update_src_caps (self);
1113       if (flow_return != GST_FLOW_OK)
1114         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1115     }
1116
1117     if (timeout || flow_return >= GST_FLOW_OK) {
1118       GST_TRACE_OBJECT (self, "Actually aggregating!");
1119       flow_return = klass->aggregate (self, timeout);
1120     }
1121
1122     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1123       continue;
1124
1125     GST_OBJECT_LOCK (self);
1126     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
1127       /* We don't want to set the pads to flushing, but we want to
1128        * stop the thread, so just break here */
1129       GST_OBJECT_UNLOCK (self);
1130       break;
1131     }
1132     GST_OBJECT_UNLOCK (self);
1133
1134     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1135       gst_aggregator_push_eos (self);
1136     }
1137
1138     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1139
1140     if (flow_return != GST_FLOW_OK) {
1141       GList *item;
1142
1143       GST_OBJECT_LOCK (self);
1144       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1145         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1146
1147         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1148       }
1149       GST_OBJECT_UNLOCK (self);
1150       break;
1151     }
1152   }
1153
1154   /* Pause the task here, the only ways to get here are:
1155    * 1) We're stopping, in which case the task is stopped anyway
1156    * 2) We got a flow error above, in which case it might take
1157    *    some time to forward the flow return upstream and we
1158    *    would otherwise call the task function over and over
1159    *    again without doing anything
1160    */
1161   gst_pad_pause_task (self->srcpad);
1162 }
1163
1164 static gboolean
1165 gst_aggregator_start (GstAggregator * self)
1166 {
1167   GstAggregatorClass *klass;
1168   gboolean result;
1169
1170   self->priv->send_stream_start = TRUE;
1171   self->priv->send_segment = TRUE;
1172   self->priv->send_eos = TRUE;
1173   self->priv->srccaps = NULL;
1174
1175   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1176
1177   klass = GST_AGGREGATOR_GET_CLASS (self);
1178
1179   if (klass->start)
1180     result = klass->start (self);
1181   else
1182     result = TRUE;
1183
1184   return result;
1185 }
1186
1187 static gboolean
1188 _check_pending_flush_stop (GstAggregatorPad * pad)
1189 {
1190   gboolean res;
1191
1192   PAD_LOCK (pad);
1193   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1194   PAD_UNLOCK (pad);
1195
1196   return res;
1197 }
1198
1199 static gboolean
1200 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1201 {
1202   gboolean res = TRUE;
1203
1204   GST_INFO_OBJECT (self, "%s srcpad task",
1205       flush_start ? "Pausing" : "Stopping");
1206
1207   SRC_LOCK (self);
1208   self->priv->running = FALSE;
1209   SRC_BROADCAST (self);
1210   SRC_UNLOCK (self);
1211
1212   if (flush_start) {
1213     res = gst_pad_push_event (self->srcpad, flush_start);
1214   }
1215
1216   gst_pad_stop_task (self->srcpad);
1217
1218   return res;
1219 }
1220
1221 static void
1222 gst_aggregator_start_srcpad_task (GstAggregator * self)
1223 {
1224   GST_INFO_OBJECT (self, "Starting srcpad task");
1225
1226   self->priv->running = TRUE;
1227   gst_pad_start_task (GST_PAD (self->srcpad),
1228       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1229 }
1230
1231 static GstFlowReturn
1232 gst_aggregator_flush (GstAggregator * self)
1233 {
1234   GstFlowReturn ret = GST_FLOW_OK;
1235   GstAggregatorPrivate *priv = self->priv;
1236   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1237
1238   GST_DEBUG_OBJECT (self, "Flushing everything");
1239   GST_OBJECT_LOCK (self);
1240   priv->send_segment = TRUE;
1241   priv->flush_seeking = FALSE;
1242   priv->tags_changed = FALSE;
1243   GST_OBJECT_UNLOCK (self);
1244   if (klass->flush)
1245     ret = klass->flush (self);
1246
1247   return ret;
1248 }
1249
1250
1251 /* Called with GstAggregator's object lock held */
1252
1253 static gboolean
1254 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1255 {
1256   GList *tmp;
1257   GstAggregatorPad *tmppad;
1258
1259   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1260     tmppad = (GstAggregatorPad *) tmp->data;
1261
1262     if (_check_pending_flush_stop (tmppad) == FALSE) {
1263       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1264           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1265       return FALSE;
1266     }
1267   }
1268
1269   return TRUE;
1270 }
1271
1272 static void
1273 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1274     GstEvent * event)
1275 {
1276   GstAggregatorPrivate *priv = self->priv;
1277   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1278
1279   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1280
1281   PAD_FLUSH_LOCK (aggpad);
1282   PAD_LOCK (aggpad);
1283   if (padpriv->pending_flush_start) {
1284     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1285
1286     padpriv->pending_flush_start = FALSE;
1287     padpriv->pending_flush_stop = TRUE;
1288   }
1289   PAD_UNLOCK (aggpad);
1290
1291   GST_OBJECT_LOCK (self);
1292   if (priv->flush_seeking) {
1293     /* If flush_seeking we forward the first FLUSH_START */
1294     if (priv->pending_flush_start) {
1295       priv->pending_flush_start = FALSE;
1296       GST_OBJECT_UNLOCK (self);
1297
1298       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1299       gst_aggregator_stop_srcpad_task (self, event);
1300
1301       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1302       GST_PAD_STREAM_LOCK (self->srcpad);
1303       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1304       event = NULL;
1305     } else {
1306       GST_OBJECT_UNLOCK (self);
1307       gst_event_unref (event);
1308     }
1309   } else {
1310     GST_OBJECT_UNLOCK (self);
1311     gst_event_unref (event);
1312   }
1313   PAD_FLUSH_UNLOCK (aggpad);
1314 }
1315
1316 /* Must be called with the the PAD_LOCK held */
1317 static void
1318 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1319 {
1320   if (head) {
1321     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
1322         aggpad->priv->head_segment.format == GST_FORMAT_TIME)
1323       aggpad->priv->head_time =
1324           gst_segment_to_running_time (&aggpad->priv->head_segment,
1325           GST_FORMAT_TIME, aggpad->priv->head_position);
1326     else
1327       aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
1328
1329     if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
1330       aggpad->priv->tail_time = aggpad->priv->head_time;
1331   } else {
1332     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
1333         aggpad->segment.format == GST_FORMAT_TIME)
1334       aggpad->priv->tail_time =
1335           gst_segment_to_running_time (&aggpad->segment,
1336           GST_FORMAT_TIME, aggpad->priv->tail_position);
1337     else
1338       aggpad->priv->tail_time = aggpad->priv->head_time;
1339   }
1340
1341   if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
1342       aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
1343     aggpad->priv->time_level = 0;
1344     return;
1345   }
1346
1347   if (aggpad->priv->tail_time > aggpad->priv->head_time)
1348     aggpad->priv->time_level = 0;
1349   else
1350     aggpad->priv->time_level = aggpad->priv->head_time -
1351         aggpad->priv->tail_time;
1352 }
1353
1354
1355 /* GstAggregator vmethods default implementations */
1356 static gboolean
1357 gst_aggregator_default_sink_event (GstAggregator * self,
1358     GstAggregatorPad * aggpad, GstEvent * event)
1359 {
1360   gboolean res = TRUE;
1361   GstPad *pad = GST_PAD (aggpad);
1362   GstAggregatorPrivate *priv = self->priv;
1363
1364   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1365
1366   switch (GST_EVENT_TYPE (event)) {
1367     case GST_EVENT_FLUSH_START:
1368     {
1369       gst_aggregator_flush_start (self, aggpad, event);
1370       /* We forward only in one case: right after flush_seeking */
1371       event = NULL;
1372       goto eat;
1373     }
1374     case GST_EVENT_FLUSH_STOP:
1375     {
1376       gst_aggregator_pad_flush (aggpad, self);
1377       GST_OBJECT_LOCK (self);
1378       if (priv->flush_seeking) {
1379         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1380         if (gst_aggregator_all_flush_stop_received_locked (self)) {
1381           GST_OBJECT_UNLOCK (self);
1382           /* That means we received FLUSH_STOP/FLUSH_STOP on
1383            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1384           gst_aggregator_flush (self);
1385           gst_pad_push_event (self->srcpad, event);
1386           event = NULL;
1387           SRC_LOCK (self);
1388           priv->send_eos = TRUE;
1389           SRC_BROADCAST (self);
1390           SRC_UNLOCK (self);
1391
1392           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1393           GST_PAD_STREAM_UNLOCK (self->srcpad);
1394           gst_aggregator_start_srcpad_task (self);
1395         } else {
1396           GST_OBJECT_UNLOCK (self);
1397         }
1398       } else {
1399         GST_OBJECT_UNLOCK (self);
1400       }
1401
1402       /* We never forward the event */
1403       goto eat;
1404     }
1405     case GST_EVENT_EOS:
1406     {
1407       SRC_LOCK (self);
1408       PAD_LOCK (aggpad);
1409       g_assert (aggpad->priv->num_buffers == 0);
1410       aggpad->priv->eos = TRUE;
1411       PAD_UNLOCK (aggpad);
1412       SRC_BROADCAST (self);
1413       SRC_UNLOCK (self);
1414       goto eat;
1415     }
1416     case GST_EVENT_SEGMENT:
1417     {
1418       PAD_LOCK (aggpad);
1419       GST_OBJECT_LOCK (aggpad);
1420       gst_event_copy_segment (event, &aggpad->segment);
1421       /* We've got a new segment, tail_position is now meaningless
1422        * and may interfere with the time_level calculation
1423        */
1424       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1425       update_time_level (aggpad, FALSE);
1426       GST_OBJECT_UNLOCK (aggpad);
1427       PAD_UNLOCK (aggpad);
1428
1429       GST_OBJECT_LOCK (self);
1430       self->priv->seqnum = gst_event_get_seqnum (event);
1431       GST_OBJECT_UNLOCK (self);
1432       goto eat;
1433     }
1434     case GST_EVENT_STREAM_START:
1435     {
1436       goto eat;
1437     }
1438     case GST_EVENT_GAP:
1439     {
1440       GstClockTime pts, endpts;
1441       GstClockTime duration;
1442       GstBuffer *gapbuf;
1443
1444       gst_event_parse_gap (event, &pts, &duration);
1445       gapbuf = gst_buffer_new ();
1446
1447       if (GST_CLOCK_TIME_IS_VALID (duration))
1448         endpts = pts + duration;
1449       else
1450         endpts = GST_CLOCK_TIME_NONE;
1451
1452       GST_OBJECT_LOCK (aggpad);
1453       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1454           &pts, &endpts);
1455       GST_OBJECT_UNLOCK (aggpad);
1456
1457       if (!res) {
1458         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1459         goto eat;
1460       }
1461
1462       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1463         duration = endpts - pts;
1464       else
1465         duration = GST_CLOCK_TIME_NONE;
1466
1467       GST_BUFFER_PTS (gapbuf) = pts;
1468       GST_BUFFER_DURATION (gapbuf) = duration;
1469       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1470       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1471
1472       /* Remove GAP event so we can replace it with the buffer */
1473       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1474         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1475
1476       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1477           GST_FLOW_OK) {
1478         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1479         res = FALSE;
1480       }
1481
1482       goto eat;
1483     }
1484     case GST_EVENT_TAG:
1485     {
1486       GstTagList *tags;
1487
1488       gst_event_parse_tag (event, &tags);
1489
1490       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
1491         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
1492         gst_event_unref (event);
1493         event = NULL;
1494         goto eat;
1495       }
1496       break;
1497     }
1498     default:
1499     {
1500       break;
1501     }
1502   }
1503
1504   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1505   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1506
1507 eat:
1508   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1509   if (event)
1510     gst_event_unref (event);
1511
1512   return res;
1513 }
1514
1515 static inline gboolean
1516 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
1517     gpointer unused_udata)
1518 {
1519   gst_aggregator_pad_flush (pad, self);
1520
1521   PAD_LOCK (pad);
1522   pad->priv->flow_return = GST_FLOW_FLUSHING;
1523   pad->priv->negotiated = FALSE;
1524   PAD_BROADCAST_EVENT (pad);
1525   PAD_UNLOCK (pad);
1526
1527   return TRUE;
1528 }
1529
1530 static gboolean
1531 gst_aggregator_stop (GstAggregator * agg)
1532 {
1533   GstAggregatorClass *klass;
1534   gboolean result;
1535
1536   gst_aggregator_reset_flow_values (agg);
1537
1538   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
1539
1540   klass = GST_AGGREGATOR_GET_CLASS (agg);
1541
1542   if (klass->stop)
1543     result = klass->stop (agg);
1544   else
1545     result = TRUE;
1546
1547   agg->priv->has_peer_latency = FALSE;
1548   agg->priv->peer_latency_live = FALSE;
1549   agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
1550
1551   if (agg->priv->tags)
1552     gst_tag_list_unref (agg->priv->tags);
1553   agg->priv->tags = NULL;
1554
1555   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1556
1557   return result;
1558 }
1559
1560 /* GstElement vmethods implementations */
1561 static GstStateChangeReturn
1562 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1563 {
1564   GstStateChangeReturn ret;
1565   GstAggregator *self = GST_AGGREGATOR (element);
1566
1567   switch (transition) {
1568     case GST_STATE_CHANGE_READY_TO_PAUSED:
1569       if (!gst_aggregator_start (self))
1570         goto error_start;
1571       break;
1572     default:
1573       break;
1574   }
1575
1576   if ((ret =
1577           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1578               transition)) == GST_STATE_CHANGE_FAILURE)
1579     goto failure;
1580
1581
1582   switch (transition) {
1583     case GST_STATE_CHANGE_PAUSED_TO_READY:
1584       if (!gst_aggregator_stop (self)) {
1585         /* What to do in this case? Error out? */
1586         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1587       }
1588       break;
1589     default:
1590       break;
1591   }
1592
1593   return ret;
1594
1595 /* ERRORS */
1596 failure:
1597   {
1598     GST_ERROR_OBJECT (element, "parent failed state change");
1599     return ret;
1600   }
1601 error_start:
1602   {
1603     GST_ERROR_OBJECT (element, "Subclass failed to start");
1604     return GST_STATE_CHANGE_FAILURE;
1605   }
1606 }
1607
1608 static void
1609 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1610 {
1611   GstAggregator *self = GST_AGGREGATOR (element);
1612   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1613
1614   GST_INFO_OBJECT (pad, "Removing pad");
1615
1616   SRC_LOCK (self);
1617   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1618   gst_element_remove_pad (element, pad);
1619
1620   self->priv->has_peer_latency = FALSE;
1621   SRC_BROADCAST (self);
1622   SRC_UNLOCK (self);
1623 }
1624
1625 static GstAggregatorPad *
1626 gst_aggregator_default_create_new_pad (GstAggregator * self,
1627     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1628 {
1629   GstAggregatorPad *agg_pad;
1630   GstAggregatorPrivate *priv = self->priv;
1631   gint serial = 0;
1632   gchar *name = NULL;
1633
1634   if (templ->direction != GST_PAD_SINK)
1635     goto not_sink;
1636
1637   if (templ->presence != GST_PAD_REQUEST)
1638     goto not_request;
1639
1640   GST_OBJECT_LOCK (self);
1641   if (req_name == NULL || strlen (req_name) < 6
1642       || !g_str_has_prefix (req_name, "sink_")) {
1643     /* no name given when requesting the pad, use next available int */
1644     serial = ++priv->max_padserial;
1645   } else {
1646     /* parse serial number from requested padname */
1647     serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1648     if (serial > priv->max_padserial)
1649       priv->max_padserial = serial;
1650   }
1651
1652   name = g_strdup_printf ("sink_%u", serial);
1653   agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1654       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1655   g_free (name);
1656
1657   GST_OBJECT_UNLOCK (self);
1658
1659   return agg_pad;
1660
1661   /* errors */
1662 not_sink:
1663   {
1664     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
1665     return NULL;
1666   }
1667 not_request:
1668   {
1669     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
1670     return NULL;
1671   }
1672 }
1673
1674 static GstPad *
1675 gst_aggregator_request_new_pad (GstElement * element,
1676     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1677 {
1678   GstAggregator *self;
1679   GstAggregatorPad *agg_pad;
1680   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1681   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1682
1683   self = GST_AGGREGATOR (element);
1684
1685   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1686   if (!agg_pad) {
1687     GST_ERROR_OBJECT (element, "Couldn't create new pad");
1688     return NULL;
1689   }
1690
1691   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1692
1693   if (priv->running)
1694     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1695
1696   /* add the pad to the element */
1697   gst_element_add_pad (element, GST_PAD (agg_pad));
1698
1699   return GST_PAD (agg_pad);
1700 }
1701
1702 /* Must be called with SRC_LOCK held */
1703
1704 static gboolean
1705 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1706 {
1707   gboolean query_ret, live;
1708   GstClockTime our_latency, min, max;
1709
1710   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1711
1712   if (!query_ret) {
1713     GST_WARNING_OBJECT (self, "Latency query failed");
1714     return FALSE;
1715   }
1716
1717   gst_query_parse_latency (query, &live, &min, &max);
1718
1719   our_latency = self->priv->latency;
1720
1721   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1722     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1723         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1724     return FALSE;
1725   }
1726
1727   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1728     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1729         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1730             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1731             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1732     return FALSE;
1733   }
1734
1735   self->priv->peer_latency_live = live;
1736   self->priv->peer_latency_min = min;
1737   self->priv->peer_latency_max = max;
1738   self->priv->has_peer_latency = TRUE;
1739
1740   /* add our own */
1741   min += our_latency;
1742   min += self->priv->sub_latency_min;
1743   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1744       && GST_CLOCK_TIME_IS_VALID (max))
1745     max += self->priv->sub_latency_max + our_latency;
1746   else
1747     max = GST_CLOCK_TIME_NONE;
1748
1749   SRC_BROADCAST (self);
1750
1751   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1752       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1753
1754   gst_query_set_latency (query, live, min, max);
1755
1756   return query_ret;
1757 }
1758
1759 /*
1760  * MUST be called with the src_lock held.
1761  *
1762  * See  gst_aggregator_get_latency() for doc
1763  */
1764 static GstClockTime
1765 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1766 {
1767   GstClockTime latency;
1768
1769   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1770
1771   if (!self->priv->has_peer_latency) {
1772     GstQuery *query = gst_query_new_latency ();
1773     gboolean ret;
1774
1775     ret = gst_aggregator_query_latency_unlocked (self, query);
1776     gst_query_unref (query);
1777     if (!ret)
1778       return GST_CLOCK_TIME_NONE;
1779   }
1780
1781   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1782     return GST_CLOCK_TIME_NONE;
1783
1784   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1785   latency = self->priv->peer_latency_min;
1786
1787   /* add our own */
1788   latency += self->priv->latency;
1789   latency += self->priv->sub_latency_min;
1790
1791   return latency;
1792 }
1793
1794 /**
1795  * gst_aggregator_get_latency:
1796  * @self: a #GstAggregator
1797  *
1798  * Retrieves the latency values reported by @self in response to the latency
1799  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1800  * will not wait for the clock.
1801  *
1802  * Typically only called by subclasses.
1803  *
1804  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1805  */
1806 GstClockTime
1807 gst_aggregator_get_latency (GstAggregator * self)
1808 {
1809   GstClockTime ret;
1810
1811   SRC_LOCK (self);
1812   ret = gst_aggregator_get_latency_unlocked (self);
1813   SRC_UNLOCK (self);
1814
1815   return ret;
1816 }
1817
1818 static gboolean
1819 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1820 {
1821   GstAggregator *self = GST_AGGREGATOR (element);
1822
1823   GST_STATE_LOCK (element);
1824   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1825       GST_STATE (element) < GST_STATE_PAUSED) {
1826     gdouble rate;
1827     GstFormat fmt;
1828     GstSeekFlags flags;
1829     GstSeekType start_type, stop_type;
1830     gint64 start, stop;
1831
1832     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1833         &start, &stop_type, &stop);
1834
1835     GST_OBJECT_LOCK (self);
1836     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1837         stop_type, stop, NULL);
1838     self->priv->seqnum = gst_event_get_seqnum (event);
1839     self->priv->first_buffer = FALSE;
1840     GST_OBJECT_UNLOCK (self);
1841
1842     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1843   }
1844   GST_STATE_UNLOCK (element);
1845
1846
1847   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1848       event);
1849 }
1850
1851 static gboolean
1852 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1853 {
1854   gboolean res = TRUE;
1855
1856   switch (GST_QUERY_TYPE (query)) {
1857     case GST_QUERY_SEEKING:
1858     {
1859       GstFormat format;
1860
1861       /* don't pass it along as some (file)sink might claim it does
1862        * whereas with a collectpads in between that will not likely work */
1863       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1864       gst_query_set_seeking (query, format, FALSE, 0, -1);
1865       res = TRUE;
1866
1867       break;
1868     }
1869     case GST_QUERY_LATENCY:
1870       SRC_LOCK (self);
1871       res = gst_aggregator_query_latency_unlocked (self, query);
1872       SRC_UNLOCK (self);
1873       break;
1874     default:
1875       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1876   }
1877
1878   return res;
1879 }
1880
1881 static gboolean
1882 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1883 {
1884   EventData *evdata = user_data;
1885   gboolean ret = TRUE;
1886   GstPad *peer = gst_pad_get_peer (pad);
1887   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1888
1889   if (peer) {
1890     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1891       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1892       ret = TRUE;
1893     } else {
1894       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1895       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1896       gst_object_unref (peer);
1897     }
1898   }
1899
1900   if (ret == FALSE) {
1901     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1902       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1903
1904       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1905
1906       if (gst_pad_query (peer, seeking)) {
1907         gboolean seekable;
1908
1909         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1910
1911         if (seekable == FALSE) {
1912           GST_INFO_OBJECT (pad,
1913               "Source not seekable, We failed but it does not matter!");
1914
1915           ret = TRUE;
1916         }
1917       } else {
1918         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1919       }
1920
1921       gst_query_unref (seeking);
1922     }
1923
1924     if (evdata->flush) {
1925       PAD_LOCK (aggpad);
1926       aggpad->priv->pending_flush_start = FALSE;
1927       aggpad->priv->pending_flush_stop = FALSE;
1928       PAD_UNLOCK (aggpad);
1929     }
1930   } else {
1931     evdata->one_actually_seeked = TRUE;
1932   }
1933
1934   evdata->result &= ret;
1935
1936   /* Always send to all pads */
1937   return FALSE;
1938 }
1939
1940 static void
1941 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1942     EventData * evdata)
1943 {
1944   evdata->result = TRUE;
1945   evdata->one_actually_seeked = FALSE;
1946
1947   /* We first need to set all pads as flushing in a first pass
1948    * as flush_start flush_stop is sometimes sent synchronously
1949    * while we send the seek event */
1950   if (evdata->flush) {
1951     GList *l;
1952
1953     GST_OBJECT_LOCK (self);
1954     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1955       GstAggregatorPad *pad = l->data;
1956
1957       PAD_LOCK (pad);
1958       pad->priv->pending_flush_start = TRUE;
1959       pad->priv->pending_flush_stop = FALSE;
1960       PAD_UNLOCK (pad);
1961     }
1962     GST_OBJECT_UNLOCK (self);
1963   }
1964
1965   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
1966
1967   gst_event_unref (evdata->event);
1968 }
1969
1970 static gboolean
1971 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1972 {
1973   gdouble rate;
1974   GstFormat fmt;
1975   GstSeekFlags flags;
1976   GstSeekType start_type, stop_type;
1977   gint64 start, stop;
1978   gboolean flush;
1979   EventData evdata = { 0, };
1980   GstAggregatorPrivate *priv = self->priv;
1981
1982   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1983       &start, &stop_type, &stop);
1984
1985   GST_INFO_OBJECT (self, "starting SEEK");
1986
1987   flush = flags & GST_SEEK_FLAG_FLUSH;
1988
1989   GST_OBJECT_LOCK (self);
1990   if (flush) {
1991     priv->pending_flush_start = TRUE;
1992     priv->flush_seeking = TRUE;
1993   }
1994
1995   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1996       stop_type, stop, NULL);
1997
1998   /* Seeking sets a position */
1999   self->priv->first_buffer = FALSE;
2000   GST_OBJECT_UNLOCK (self);
2001
2002   /* forward the seek upstream */
2003   evdata.event = event;
2004   evdata.flush = flush;
2005   evdata.only_to_active_pads = FALSE;
2006   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2007   event = NULL;
2008
2009   if (!evdata.result || !evdata.one_actually_seeked) {
2010     GST_OBJECT_LOCK (self);
2011     priv->flush_seeking = FALSE;
2012     priv->pending_flush_start = FALSE;
2013     GST_OBJECT_UNLOCK (self);
2014   }
2015
2016   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2017
2018   return evdata.result;
2019 }
2020
2021 static gboolean
2022 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2023 {
2024   EventData evdata = { 0, };
2025
2026   switch (GST_EVENT_TYPE (event)) {
2027     case GST_EVENT_SEEK:
2028       /* _do_seek() unrefs the event. */
2029       return gst_aggregator_do_seek (self, event);
2030     case GST_EVENT_NAVIGATION:
2031       /* navigation is rather pointless. */
2032       gst_event_unref (event);
2033       return FALSE;
2034     default:
2035       break;
2036   }
2037
2038   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2039    * they will receive a QOS event that has earliest_time=0 (because we can't
2040    * have negative timestamps), and consider their buffer as too late */
2041   evdata.event = event;
2042   evdata.flush = FALSE;
2043   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2044   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2045   return evdata.result;
2046 }
2047
2048 static gboolean
2049 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2050     GstEvent * event)
2051 {
2052   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2053
2054   return klass->src_event (GST_AGGREGATOR (parent), event);
2055 }
2056
2057 static gboolean
2058 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2059     GstQuery * query)
2060 {
2061   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2062
2063   return klass->src_query (GST_AGGREGATOR (parent), query);
2064 }
2065
2066 static gboolean
2067 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2068     GstObject * parent, GstPadMode mode, gboolean active)
2069 {
2070   GstAggregator *self = GST_AGGREGATOR (parent);
2071   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2072
2073   if (klass->src_activate) {
2074     if (klass->src_activate (self, mode, active) == FALSE) {
2075       return FALSE;
2076     }
2077   }
2078
2079   if (active == TRUE) {
2080     switch (mode) {
2081       case GST_PAD_MODE_PUSH:
2082       {
2083         GST_INFO_OBJECT (pad, "Activating pad!");
2084         gst_aggregator_start_srcpad_task (self);
2085         return TRUE;
2086       }
2087       default:
2088       {
2089         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2090         return FALSE;
2091       }
2092     }
2093   }
2094
2095   /* deactivating */
2096   GST_INFO_OBJECT (self, "Deactivating srcpad");
2097   gst_aggregator_stop_srcpad_task (self, FALSE);
2098
2099   return TRUE;
2100 }
2101
2102 static gboolean
2103 gst_aggregator_default_sink_query (GstAggregator * self,
2104     GstAggregatorPad * aggpad, GstQuery * query)
2105 {
2106   GstPad *pad = GST_PAD (aggpad);
2107
2108   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2109     GstQuery *decide_query = NULL;
2110     GstAggregatorClass *agg_class;
2111     gboolean ret;
2112
2113     GST_OBJECT_LOCK (self);
2114     PAD_LOCK (aggpad);
2115     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2116       GST_DEBUG_OBJECT (self,
2117           "not negotiated yet, can't answer ALLOCATION query");
2118       PAD_UNLOCK (aggpad);
2119       GST_OBJECT_UNLOCK (self);
2120
2121       return FALSE;
2122     }
2123
2124     if ((decide_query = self->priv->allocation_query))
2125       gst_query_ref (decide_query);
2126     PAD_UNLOCK (aggpad);
2127     GST_OBJECT_UNLOCK (self);
2128
2129     GST_DEBUG_OBJECT (self,
2130         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2131
2132     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2133
2134     /* pass the query to the propose_allocation vmethod if any */
2135     if (agg_class->propose_allocation)
2136       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2137     else
2138       ret = FALSE;
2139
2140     if (decide_query)
2141       gst_query_unref (decide_query);
2142
2143     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2144     return ret;
2145   }
2146
2147   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2148 }
2149
2150 static void
2151 gst_aggregator_finalize (GObject * object)
2152 {
2153   GstAggregator *self = (GstAggregator *) object;
2154
2155   g_mutex_clear (&self->priv->src_lock);
2156   g_cond_clear (&self->priv->src_cond);
2157
2158   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2159 }
2160
2161 /*
2162  * gst_aggregator_set_latency_property:
2163  * @agg: a #GstAggregator
2164  * @latency: the new latency value (in nanoseconds).
2165  *
2166  * Sets the new latency value to @latency. This value is used to limit the
2167  * amount of time a pad waits for data to appear before considering the pad
2168  * as unresponsive.
2169  */
2170 static void
2171 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
2172 {
2173   gboolean changed;
2174
2175   g_return_if_fail (GST_IS_AGGREGATOR (self));
2176   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2177
2178   SRC_LOCK (self);
2179   changed = (self->priv->latency != latency);
2180
2181   if (changed) {
2182     GList *item;
2183
2184     GST_OBJECT_LOCK (self);
2185     /* First lock all the pads */
2186     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2187       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2188       PAD_LOCK (aggpad);
2189     }
2190
2191     self->priv->latency = latency;
2192
2193     SRC_BROADCAST (self);
2194
2195     /* Now wake up the pads */
2196     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2197       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2198       PAD_BROADCAST_EVENT (aggpad);
2199       PAD_UNLOCK (aggpad);
2200     }
2201     GST_OBJECT_UNLOCK (self);
2202   }
2203
2204   SRC_UNLOCK (self);
2205
2206   if (changed)
2207     gst_element_post_message (GST_ELEMENT_CAST (self),
2208         gst_message_new_latency (GST_OBJECT_CAST (self)));
2209 }
2210
2211 /*
2212  * gst_aggregator_get_latency_property:
2213  * @agg: a #GstAggregator
2214  *
2215  * Gets the latency value. See gst_aggregator_set_latency for
2216  * more details.
2217  *
2218  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2219  * before a pad is deemed unresponsive. A value of -1 means an
2220  * unlimited time.
2221  */
2222 static gint64
2223 gst_aggregator_get_latency_property (GstAggregator * agg)
2224 {
2225   gint64 res;
2226
2227   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
2228
2229   GST_OBJECT_LOCK (agg);
2230   res = agg->priv->latency;
2231   GST_OBJECT_UNLOCK (agg);
2232
2233   return res;
2234 }
2235
2236 static void
2237 gst_aggregator_set_property (GObject * object, guint prop_id,
2238     const GValue * value, GParamSpec * pspec)
2239 {
2240   GstAggregator *agg = GST_AGGREGATOR (object);
2241
2242   switch (prop_id) {
2243     case PROP_LATENCY:
2244       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
2245       break;
2246     case PROP_START_TIME_SELECTION:
2247       agg->priv->start_time_selection = g_value_get_enum (value);
2248       break;
2249     case PROP_START_TIME:
2250       agg->priv->start_time = g_value_get_uint64 (value);
2251       break;
2252     default:
2253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2254       break;
2255   }
2256 }
2257
2258 static void
2259 gst_aggregator_get_property (GObject * object, guint prop_id,
2260     GValue * value, GParamSpec * pspec)
2261 {
2262   GstAggregator *agg = GST_AGGREGATOR (object);
2263
2264   switch (prop_id) {
2265     case PROP_LATENCY:
2266       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
2267       break;
2268     case PROP_START_TIME_SELECTION:
2269       g_value_set_enum (value, agg->priv->start_time_selection);
2270       break;
2271     case PROP_START_TIME:
2272       g_value_set_uint64 (value, agg->priv->start_time);
2273       break;
2274     default:
2275       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2276       break;
2277   }
2278 }
2279
2280 /* GObject vmethods implementations */
2281 static void
2282 gst_aggregator_class_init (GstAggregatorClass * klass)
2283 {
2284   GObjectClass *gobject_class = (GObjectClass *) klass;
2285   GstElementClass *gstelement_class = (GstElementClass *) klass;
2286
2287   aggregator_parent_class = g_type_class_peek_parent (klass);
2288   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
2289
2290   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2291       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2292
2293   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
2294
2295   klass->sink_event = gst_aggregator_default_sink_event;
2296   klass->sink_query = gst_aggregator_default_sink_query;
2297
2298   klass->src_event = gst_aggregator_default_src_event;
2299   klass->src_query = gst_aggregator_default_src_query;
2300
2301   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2302   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2303   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2304   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2305
2306   gstelement_class->request_new_pad =
2307       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2308   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2309   gstelement_class->release_pad =
2310       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2311   gstelement_class->change_state =
2312       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2313
2314   gobject_class->set_property = gst_aggregator_set_property;
2315   gobject_class->get_property = gst_aggregator_get_property;
2316   gobject_class->finalize = gst_aggregator_finalize;
2317
2318   g_object_class_install_property (gobject_class, PROP_LATENCY,
2319       g_param_spec_int64 ("latency", "Buffer latency",
2320           "Additional latency in live mode to allow upstream "
2321           "to take longer to produce buffers for the current "
2322           "position (in nanoseconds)", 0,
2323           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
2324           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2325
2326   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2327       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2328           "Decides which start time is output",
2329           gst_aggregator_start_time_selection_get_type (),
2330           DEFAULT_START_TIME_SELECTION,
2331           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2332
2333   g_object_class_install_property (gobject_class, PROP_START_TIME,
2334       g_param_spec_uint64 ("start-time", "Start Time",
2335           "Start time to use if start-time-selection=set", 0,
2336           G_MAXUINT64,
2337           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2338
2339   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
2340   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
2341 }
2342
2343 static void
2344 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2345 {
2346   GstPadTemplate *pad_template;
2347   GstAggregatorPrivate *priv;
2348
2349   g_return_if_fail (klass->aggregate != NULL);
2350
2351   self->priv =
2352       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
2353       GstAggregatorPrivate);
2354
2355   priv = self->priv;
2356
2357   pad_template =
2358       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2359   g_return_if_fail (pad_template != NULL);
2360
2361   priv->max_padserial = -1;
2362   priv->tags_changed = FALSE;
2363
2364   self->priv->peer_latency_live = FALSE;
2365   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2366   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2367   self->priv->has_peer_latency = FALSE;
2368   gst_aggregator_reset_flow_values (self);
2369
2370   self->srcpad = gst_pad_new_from_template (pad_template, "src");
2371
2372   gst_pad_set_event_function (self->srcpad,
2373       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2374   gst_pad_set_query_function (self->srcpad,
2375       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2376   gst_pad_set_activatemode_function (self->srcpad,
2377       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2378
2379   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2380
2381   self->priv->latency = DEFAULT_LATENCY;
2382   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2383   self->priv->start_time = DEFAULT_START_TIME;
2384
2385   g_mutex_init (&self->priv->src_lock);
2386   g_cond_init (&self->priv->src_cond);
2387 }
2388
2389 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2390  * method to get to the padtemplates */
2391 GType
2392 gst_aggregator_get_type (void)
2393 {
2394   static volatile gsize type = 0;
2395
2396   if (g_once_init_enter (&type)) {
2397     GType _type;
2398     static const GTypeInfo info = {
2399       sizeof (GstAggregatorClass),
2400       NULL,
2401       NULL,
2402       (GClassInitFunc) gst_aggregator_class_init,
2403       NULL,
2404       NULL,
2405       sizeof (GstAggregator),
2406       0,
2407       (GInstanceInitFunc) gst_aggregator_init,
2408     };
2409
2410     _type = g_type_register_static (GST_TYPE_ELEMENT,
2411         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2412     g_once_init_leave (&type, _type);
2413   }
2414   return type;
2415 }
2416
2417 /* Must be called with SRC lock and PAD lock held */
2418 static gboolean
2419 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2420 {
2421   /* Empty queue always has space */
2422   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2423     return TRUE;
2424
2425   /* We also want at least two buffers, one is being processed and one is ready
2426    * for the next iteration when we operate in live mode. */
2427   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2428     return TRUE;
2429
2430   /* zero latency, if there is a buffer, it's full */
2431   if (self->priv->latency == 0)
2432     return FALSE;
2433
2434   /* Allow no more buffers than the latency */
2435   return (aggpad->priv->time_level <= self->priv->latency);
2436 }
2437
2438 /* Must be called with the PAD_LOCK held */
2439 static void
2440 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2441 {
2442   GstClockTime timestamp;
2443
2444   if (GST_BUFFER_DTS_IS_VALID (buffer))
2445     timestamp = GST_BUFFER_DTS (buffer);
2446   else
2447     timestamp = GST_BUFFER_PTS (buffer);
2448
2449   if (timestamp == GST_CLOCK_TIME_NONE) {
2450     if (head)
2451       timestamp = aggpad->priv->head_position;
2452     else
2453       timestamp = aggpad->priv->tail_position;
2454   }
2455
2456   /* add duration */
2457   if (GST_BUFFER_DURATION_IS_VALID (buffer))
2458     timestamp += GST_BUFFER_DURATION (buffer);
2459
2460   if (head)
2461     aggpad->priv->head_position = timestamp;
2462   else
2463     aggpad->priv->tail_position = timestamp;
2464
2465   update_time_level (aggpad, head);
2466 }
2467
2468 static GstFlowReturn
2469 gst_aggregator_pad_chain_internal (GstAggregator * self,
2470     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2471 {
2472   GstFlowReturn flow_return;
2473   GstClockTime buf_pts;
2474
2475   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
2476
2477   PAD_FLUSH_LOCK (aggpad);
2478
2479   PAD_LOCK (aggpad);
2480   flow_return = aggpad->priv->flow_return;
2481   if (flow_return != GST_FLOW_OK)
2482     goto flushing;
2483
2484   PAD_UNLOCK (aggpad);
2485
2486   buf_pts = GST_BUFFER_PTS (buffer);
2487
2488   for (;;) {
2489     SRC_LOCK (self);
2490     GST_OBJECT_LOCK (self);
2491     PAD_LOCK (aggpad);
2492
2493     if (aggpad->priv->first_buffer) {
2494       self->priv->has_peer_latency = FALSE;
2495       aggpad->priv->first_buffer = FALSE;
2496     }
2497
2498     if (gst_aggregator_pad_has_space (self, aggpad)
2499         && aggpad->priv->flow_return == GST_FLOW_OK) {
2500       if (head)
2501         g_queue_push_head (&aggpad->priv->data, buffer);
2502       else
2503         g_queue_push_tail (&aggpad->priv->data, buffer);
2504       apply_buffer (aggpad, buffer, head);
2505       aggpad->priv->num_buffers++;
2506       buffer = NULL;
2507       SRC_BROADCAST (self);
2508       break;
2509     }
2510
2511     flow_return = aggpad->priv->flow_return;
2512     if (flow_return != GST_FLOW_OK) {
2513       GST_OBJECT_UNLOCK (self);
2514       SRC_UNLOCK (self);
2515       goto flushing;
2516     }
2517     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2518     GST_OBJECT_UNLOCK (self);
2519     SRC_UNLOCK (self);
2520     PAD_WAIT_EVENT (aggpad);
2521
2522     PAD_UNLOCK (aggpad);
2523   }
2524
2525   if (self->priv->first_buffer) {
2526     GstClockTime start_time;
2527
2528     switch (self->priv->start_time_selection) {
2529       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2530       default:
2531         start_time = 0;
2532         break;
2533       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2534         GST_OBJECT_LOCK (aggpad);
2535         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2536           start_time = buf_pts;
2537           if (start_time != -1) {
2538             start_time = MAX (start_time, aggpad->priv->head_segment.start);
2539             start_time =
2540                 gst_segment_to_running_time (&aggpad->priv->head_segment,
2541                 GST_FORMAT_TIME, start_time);
2542           }
2543         } else {
2544           start_time = 0;
2545           GST_WARNING_OBJECT (aggpad,
2546               "Ignoring request of selecting the first start time "
2547               "as the segment is a %s segment instead of a time segment",
2548               gst_format_get_name (aggpad->segment.format));
2549         }
2550         GST_OBJECT_UNLOCK (aggpad);
2551         break;
2552       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2553         start_time = self->priv->start_time;
2554         if (start_time == -1)
2555           start_time = 0;
2556         break;
2557     }
2558
2559     if (start_time != -1) {
2560       if (self->segment.position == -1)
2561         self->segment.position = start_time;
2562       else
2563         self->segment.position = MIN (start_time, self->segment.position);
2564
2565       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2566           GST_TIME_ARGS (start_time));
2567     }
2568   }
2569
2570   PAD_UNLOCK (aggpad);
2571   GST_OBJECT_UNLOCK (self);
2572   SRC_UNLOCK (self);
2573
2574   PAD_FLUSH_UNLOCK (aggpad);
2575
2576   GST_DEBUG_OBJECT (aggpad, "Done chaining");
2577
2578   return flow_return;
2579
2580 flushing:
2581   PAD_UNLOCK (aggpad);
2582   PAD_FLUSH_UNLOCK (aggpad);
2583
2584   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2585       gst_flow_get_name (flow_return));
2586   if (buffer)
2587     gst_buffer_unref (buffer);
2588
2589   return flow_return;
2590 }
2591
2592 static GstFlowReturn
2593 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2594 {
2595   return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2596       GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
2597 }
2598
2599 static gboolean
2600 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2601     GstQuery * query)
2602 {
2603   GstAggregator *self = GST_AGGREGATOR (parent);
2604   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2605   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2606
2607   if (GST_QUERY_IS_SERIALIZED (query)) {
2608     GstStructure *s;
2609     gboolean ret = FALSE;
2610
2611     SRC_LOCK (self);
2612     PAD_LOCK (aggpad);
2613
2614     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2615       SRC_UNLOCK (self);
2616       goto flushing;
2617     }
2618
2619     g_queue_push_head (&aggpad->priv->data, query);
2620     SRC_BROADCAST (self);
2621     SRC_UNLOCK (self);
2622
2623     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2624         && aggpad->priv->flow_return == GST_FLOW_OK) {
2625       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2626       PAD_WAIT_EVENT (aggpad);
2627     }
2628
2629     s = gst_query_writable_structure (query);
2630     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2631       gst_structure_remove_field (s, "gst-aggregator-retval");
2632     else
2633       g_queue_remove (&aggpad->priv->data, query);
2634
2635     if (aggpad->priv->flow_return != GST_FLOW_OK)
2636       goto flushing;
2637
2638     PAD_UNLOCK (aggpad);
2639
2640     return ret;
2641   }
2642
2643   return klass->sink_query (self, aggpad, query);
2644
2645 flushing:
2646   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2647       gst_flow_get_name (aggpad->priv->flow_return));
2648   PAD_UNLOCK (aggpad);
2649
2650   return FALSE;
2651 }
2652
2653 /* Queue serialized events and let the others go though directly.
2654  * The queued events with be handled from the src-pad task in
2655  * gst_aggregator_do_events_and_queries().
2656  */
2657 static GstFlowReturn
2658 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2659     GstEvent * event)
2660 {
2661   GstFlowReturn ret = GST_FLOW_OK;
2662   GstAggregator *self = GST_AGGREGATOR (parent);
2663   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2664   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2665
2666   if (GST_EVENT_IS_SERIALIZED (event)) {
2667     SRC_LOCK (self);
2668     PAD_LOCK (aggpad);
2669
2670     if (aggpad->priv->flow_return != GST_FLOW_OK
2671         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2672       ret = aggpad->priv->flow_return;
2673       goto flushing;
2674     }
2675
2676     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2677       GST_OBJECT_LOCK (aggpad);
2678       gst_event_copy_segment (event, &aggpad->priv->head_segment);
2679       aggpad->priv->head_position = aggpad->priv->head_segment.position;
2680       update_time_level (aggpad, TRUE);
2681       GST_OBJECT_UNLOCK (aggpad);
2682     }
2683
2684     if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2685       GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
2686           event);
2687       g_queue_push_head (&aggpad->priv->data, event);
2688       event = NULL;
2689       SRC_BROADCAST (self);
2690     }
2691     PAD_UNLOCK (aggpad);
2692     SRC_UNLOCK (self);
2693   }
2694
2695   if (event) {
2696     if (!klass->sink_event (self, aggpad, event)) {
2697       /* Copied from GstPad to convert boolean to a GstFlowReturn in
2698        * the event handling func */
2699       ret = GST_FLOW_ERROR;
2700     }
2701   }
2702
2703   return ret;
2704
2705 flushing:
2706   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2707       gst_flow_get_name (aggpad->priv->flow_return));
2708   PAD_UNLOCK (aggpad);
2709   SRC_UNLOCK (self);
2710   if (GST_EVENT_IS_STICKY (event))
2711     gst_pad_store_sticky_event (pad, event);
2712   gst_event_unref (event);
2713
2714   return ret;
2715 }
2716
2717 static gboolean
2718 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2719     GstObject * parent, GstPadMode mode, gboolean active)
2720 {
2721   GstAggregator *self = GST_AGGREGATOR (parent);
2722   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2723
2724   if (active == FALSE) {
2725     SRC_LOCK (self);
2726     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2727     SRC_BROADCAST (self);
2728     SRC_UNLOCK (self);
2729   } else {
2730     PAD_LOCK (aggpad);
2731     aggpad->priv->flow_return = GST_FLOW_OK;
2732     PAD_BROADCAST_EVENT (aggpad);
2733     PAD_UNLOCK (aggpad);
2734   }
2735
2736   return TRUE;
2737 }
2738
2739 /***********************************
2740  * GstAggregatorPad implementation  *
2741  ************************************/
2742 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2743
2744 static void
2745 gst_aggregator_pad_constructed (GObject * object)
2746 {
2747   GstPad *pad = GST_PAD (object);
2748
2749   gst_pad_set_chain_function (pad,
2750       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2751   gst_pad_set_event_full_function_full (pad,
2752       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2753   gst_pad_set_query_function (pad,
2754       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2755   gst_pad_set_activatemode_function (pad,
2756       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2757 }
2758
2759 static void
2760 gst_aggregator_pad_finalize (GObject * object)
2761 {
2762   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2763
2764   g_cond_clear (&pad->priv->event_cond);
2765   g_mutex_clear (&pad->priv->flush_lock);
2766   g_mutex_clear (&pad->priv->lock);
2767
2768   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2769 }
2770
2771 static void
2772 gst_aggregator_pad_dispose (GObject * object)
2773 {
2774   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2775
2776   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2777
2778   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2779 }
2780
2781 static void
2782 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2783 {
2784   GObjectClass *gobject_class = (GObjectClass *) klass;
2785
2786   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
2787
2788   gobject_class->constructed = gst_aggregator_pad_constructed;
2789   gobject_class->finalize = gst_aggregator_pad_finalize;
2790   gobject_class->dispose = gst_aggregator_pad_dispose;
2791 }
2792
2793 static void
2794 gst_aggregator_pad_init (GstAggregatorPad * pad)
2795 {
2796   pad->priv =
2797       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
2798       GstAggregatorPadPrivate);
2799
2800   g_queue_init (&pad->priv->data);
2801   g_cond_init (&pad->priv->event_cond);
2802
2803   g_mutex_init (&pad->priv->flush_lock);
2804   g_mutex_init (&pad->priv->lock);
2805
2806   gst_aggregator_pad_reset_unlocked (pad);
2807   pad->priv->negotiated = FALSE;
2808 }
2809
2810 /* Must be called with the PAD_LOCK held */
2811 static void
2812 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
2813 {
2814   pad->priv->num_buffers--;
2815   GST_TRACE_OBJECT (pad, "Consuming buffer");
2816   PAD_BROADCAST_EVENT (pad);
2817 }
2818
2819 /* Must be called with the PAD_LOCK held */
2820 static void
2821 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
2822 {
2823   GstAggregator *self = NULL;
2824   GstAggregatorClass *aggclass = NULL;
2825   GstBuffer *buffer = NULL;
2826
2827   while (pad->priv->clipped_buffer == NULL &&
2828       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
2829     buffer = g_queue_pop_tail (&pad->priv->data);
2830
2831     apply_buffer (pad, buffer, FALSE);
2832
2833     /* We only take the parent here so that it's not taken if the buffer is
2834      * already clipped or if the queue is empty.
2835      */
2836     if (self == NULL) {
2837       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
2838       if (self == NULL) {
2839         gst_buffer_unref (buffer);
2840         return;
2841       }
2842
2843       aggclass = GST_AGGREGATOR_GET_CLASS (self);
2844     }
2845
2846     if (aggclass->clip) {
2847       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
2848
2849       buffer = aggclass->clip (self, pad, buffer);
2850
2851       if (buffer == NULL) {
2852         gst_aggregator_pad_buffer_consumed (pad);
2853         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
2854       }
2855     }
2856
2857     pad->priv->clipped_buffer = buffer;
2858   }
2859
2860   if (self)
2861     gst_object_unref (self);
2862 }
2863
2864 /**
2865  * gst_aggregator_pad_steal_buffer:
2866  * @pad: the pad to get buffer from
2867  *
2868  * Steal the ref to the buffer currently queued in @pad.
2869  *
2870  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2871  *   queued. You should unref the buffer after usage.
2872  */
2873 GstBuffer *
2874 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
2875 {
2876   GstBuffer *buffer;
2877
2878   PAD_LOCK (pad);
2879
2880   gst_aggregator_pad_clip_buffer_unlocked (pad);
2881
2882   buffer = pad->priv->clipped_buffer;
2883
2884   if (buffer) {
2885     pad->priv->clipped_buffer = NULL;
2886     gst_aggregator_pad_buffer_consumed (pad);
2887     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2888   }
2889
2890   PAD_UNLOCK (pad);
2891
2892   return buffer;
2893 }
2894
2895 /**
2896  * gst_aggregator_pad_drop_buffer:
2897  * @pad: the pad where to drop any pending buffer
2898  *
2899  * Drop the buffer currently queued in @pad.
2900  *
2901  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2902  */
2903 gboolean
2904 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2905 {
2906   GstBuffer *buf;
2907
2908   buf = gst_aggregator_pad_steal_buffer (pad);
2909
2910   if (buf == NULL)
2911     return FALSE;
2912
2913   gst_buffer_unref (buf);
2914   return TRUE;
2915 }
2916
2917 /**
2918  * gst_aggregator_pad_get_buffer:
2919  * @pad: the pad to get buffer from
2920  *
2921  * Returns: (transfer full): A reference to the buffer in @pad or
2922  * NULL if no buffer was queued. You should unref the buffer after
2923  * usage.
2924  */
2925 GstBuffer *
2926 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
2927 {
2928   GstBuffer *buffer;
2929
2930   PAD_LOCK (pad);
2931
2932   gst_aggregator_pad_clip_buffer_unlocked (pad);
2933
2934   if (pad->priv->clipped_buffer) {
2935     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
2936   } else {
2937     buffer = NULL;
2938   }
2939   PAD_UNLOCK (pad);
2940
2941   return buffer;
2942 }
2943
2944 gboolean
2945 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2946 {
2947   gboolean is_eos;
2948
2949   PAD_LOCK (pad);
2950   is_eos = pad->priv->eos;
2951   PAD_UNLOCK (pad);
2952
2953   return is_eos;
2954 }
2955
2956 /**
2957  * gst_aggregator_merge_tags:
2958  * @self: a #GstAggregator
2959  * @tags: a #GstTagList to merge
2960  * @mode: the #GstTagMergeMode to use
2961  *
2962  * Adds tags to so-called pending tags, which will be processed
2963  * before pushing out data downstream.
2964  *
2965  * Note that this is provided for convenience, and the subclass is
2966  * not required to use this and can still do tag handling on its own.
2967  *
2968  * MT safe.
2969  */
2970 void
2971 gst_aggregator_merge_tags (GstAggregator * self,
2972     const GstTagList * tags, GstTagMergeMode mode)
2973 {
2974   GstTagList *otags;
2975
2976   g_return_if_fail (GST_IS_AGGREGATOR (self));
2977   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
2978
2979   /* FIXME Check if we can use OBJECT lock here! */
2980   GST_OBJECT_LOCK (self);
2981   if (tags)
2982     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2983   otags = self->priv->tags;
2984   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2985   if (otags)
2986     gst_tag_list_unref (otags);
2987   self->priv->tags_changed = TRUE;
2988   GST_OBJECT_UNLOCK (self);
2989 }
2990
2991 /**
2992  * gst_aggregator_set_latency:
2993  * @self: a #GstAggregator
2994  * @min_latency: minimum latency
2995  * @max_latency: maximum latency
2996  *
2997  * Lets #GstAggregator sub-classes tell the baseclass what their internal
2998  * latency is. Will also post a LATENCY message on the bus so the pipeline
2999  * can reconfigure its global latency.
3000  */
3001 void
3002 gst_aggregator_set_latency (GstAggregator * self,
3003     GstClockTime min_latency, GstClockTime max_latency)
3004 {
3005   gboolean changed = FALSE;
3006
3007   g_return_if_fail (GST_IS_AGGREGATOR (self));
3008   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3009   g_return_if_fail (max_latency >= min_latency);
3010
3011   SRC_LOCK (self);
3012   if (self->priv->sub_latency_min != min_latency) {
3013     self->priv->sub_latency_min = min_latency;
3014     changed = TRUE;
3015   }
3016   if (self->priv->sub_latency_max != max_latency) {
3017     self->priv->sub_latency_max = max_latency;
3018     changed = TRUE;
3019   }
3020
3021   if (changed)
3022     SRC_BROADCAST (self);
3023   SRC_UNLOCK (self);
3024
3025   if (changed) {
3026     gst_element_post_message (GST_ELEMENT_CAST (self),
3027         gst_message_new_latency (GST_OBJECT_CAST (self)));
3028   }
3029 }
3030
3031 /**
3032  * gst_aggregator_get_buffer_pool:
3033  * @self: a #GstAggregator
3034  *
3035  * Returns: (transfer full): the instance of the #GstBufferPool used
3036  * by @trans; free it after use it
3037  */
3038 GstBufferPool *
3039 gst_aggregator_get_buffer_pool (GstAggregator * self)
3040 {
3041   GstBufferPool *pool;
3042
3043   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3044
3045   GST_OBJECT_LOCK (self);
3046   pool = self->priv->pool;
3047   if (pool)
3048     gst_object_ref (pool);
3049   GST_OBJECT_UNLOCK (self);
3050
3051   return pool;
3052 }
3053
3054 /**
3055  * gst_aggregator_get_allocator:
3056  * @self: a #GstAggregator
3057  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3058  * used
3059  * @params: (out) (allow-none) (transfer full): the
3060  * #GstAllocationParams of @allocator
3061  *
3062  * Lets #GstAggregator sub-classes get the memory @allocator
3063  * acquired by the base class and its @params.
3064  *
3065  * Unref the @allocator after use it.
3066  */
3067 void
3068 gst_aggregator_get_allocator (GstAggregator * self,
3069     GstAllocator ** allocator, GstAllocationParams * params)
3070 {
3071   g_return_if_fail (GST_IS_AGGREGATOR (self));
3072
3073   if (allocator)
3074     *allocator = self->priv->allocator ?
3075         gst_object_ref (self->priv->allocator) : NULL;
3076
3077   if (params)
3078     *params = self->priv->allocation_params;
3079 }