aggregator: Unify downstream flow return and flushing
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72 static void gst_aggregator_set_latency_property (GstAggregator * agg,
73     gint64 latency);
74 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
75
76
77 /* Locking order, locks in this element must always be taken in this order
78  *
79  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
80  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
81  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
82  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
83  * standard element object lock -> GST_OBJECT_LOCK(agg)
84  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
85  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
86  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
87  */
88
89
90 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
91
92 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
93 #define GST_CAT_DEFAULT aggregator_debug
94
95 /* GstAggregatorPad definitions */
96 #define PAD_LOCK(pad)   G_STMT_START {                                  \
97   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
98         g_thread_self());                                               \
99   g_mutex_lock(&pad->priv->lock);                                       \
100   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
101         g_thread_self());                                               \
102   } G_STMT_END
103
104 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
105   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
106       g_thread_self());                                                 \
107   g_mutex_unlock(&pad->priv->lock);                                     \
108   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
109         g_thread_self());                                               \
110   } G_STMT_END
111
112
113 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
114   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
115         g_thread_self());                                               \
116   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
117       (&((GstAggregatorPad*)pad)->priv->lock));                         \
118   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
119         g_thread_self());                                               \
120   } G_STMT_END
121
122 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
123   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
124         g_thread_self());                                              \
125   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
126   } G_STMT_END
127
128
129 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
130   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
131         g_thread_self());                                               \
132   g_mutex_lock(&pad->priv->flush_lock);                                 \
133   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
134         g_thread_self());                                               \
135   } G_STMT_END
136
137 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
138   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
139         g_thread_self());                                               \
140   g_mutex_unlock(&pad->priv->flush_lock);                               \
141   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
142         g_thread_self());                                               \
143   } G_STMT_END
144
145 #define SRC_LOCK(self)   G_STMT_START {                             \
146   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
147       g_thread_self());                                             \
148   g_mutex_lock(&self->priv->src_lock);                              \
149   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
150         g_thread_self());                                           \
151   } G_STMT_END
152
153 #define SRC_UNLOCK(self)  G_STMT_START {                            \
154   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
155         g_thread_self());                                           \
156   g_mutex_unlock(&self->priv->src_lock);                            \
157   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
158         g_thread_self());                                           \
159   } G_STMT_END
160
161 #define SRC_WAIT(self) G_STMT_START {                               \
162   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
163         g_thread_self());                                           \
164   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
165   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
166         g_thread_self());                                           \
167   } G_STMT_END
168
169 #define SRC_BROADCAST(self) G_STMT_START {                          \
170     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
171         g_thread_self());                                           \
172     if (self->priv->aggregate_id)                                   \
173       gst_clock_id_unschedule (self->priv->aggregate_id);           \
174     g_cond_broadcast(&(self->priv->src_cond));                      \
175   } G_STMT_END
176
177 struct _GstAggregatorPadPrivate
178 {
179   /* Following fields are protected by the PAD_LOCK */
180   GstFlowReturn flow_return;
181   gboolean pending_flush_start;
182   gboolean pending_flush_stop;
183   gboolean pending_eos;
184
185   GstBuffer *buffer;
186   gboolean eos;
187
188   GMutex lock;
189   GCond event_cond;
190   /* This lock prevents a flush start processing happening while
191    * the chain function is also happening.
192    */
193   GMutex flush_lock;
194 };
195
196 static gboolean
197 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
198 {
199   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
200
201   PAD_LOCK (aggpad);
202   aggpad->priv->pending_eos = FALSE;
203   aggpad->priv->eos = FALSE;
204   aggpad->priv->flow_return = GST_FLOW_OK;
205   PAD_UNLOCK (aggpad);
206
207   if (klass->flush)
208     return klass->flush (aggpad, agg);
209
210   return TRUE;
211 }
212
213 /*************************************
214  * GstAggregator implementation  *
215  *************************************/
216 static GstElementClass *aggregator_parent_class = NULL;
217
218 /* All members are protected by the object lock unless otherwise noted */
219
220 struct _GstAggregatorPrivate
221 {
222   gint padcount;
223
224   /* Our state is >= PAUSED */
225   gboolean running;             /* protected by src_lock */
226
227   gint seqnum;
228   gboolean send_stream_start;   /* protected by srcpad stream lock */
229   gboolean send_segment;
230   gboolean flush_seeking;
231   gboolean pending_flush_start;
232   gboolean send_eos;            /* protected by srcpad stream lock */
233
234   GstCaps *srccaps;             /* protected by the srcpad stream lock */
235
236   GstTagList *tags;
237   gboolean tags_changed;
238
239   gboolean peer_latency_live;   /* protected by src_lock */
240   GstClockTime peer_latency_min;        /* protected by src_lock */
241   GstClockTime peer_latency_max;        /* protected by src_lock */
242   gboolean has_peer_latency;
243
244   GstClockTime sub_latency_min; /* protected by src_lock */
245   GstClockTime sub_latency_max; /* protected by src_lock */
246
247   /* aggregate */
248   GstClockID aggregate_id;      /* protected by src_lock */
249   GMutex src_lock;
250   GCond src_cond;
251
252   /* properties */
253   gint64 latency;
254 };
255
256 typedef struct
257 {
258   GstEvent *event;
259   gboolean result;
260   gboolean flush;
261
262   gboolean one_actually_seeked;
263 } EventData;
264
265 #define DEFAULT_LATENCY        0
266
267 enum
268 {
269   PROP_0,
270   PROP_LATENCY,
271   PROP_LAST
272 };
273
274 /**
275  * gst_aggregator_iterate_sinkpads:
276  * @self: The #GstAggregator
277  * @func: (scope call): The function to call.
278  * @user_data: (closure): The data to pass to @func.
279  *
280  * Iterate the sinkpads of aggregator to call a function on them.
281  *
282  * This method guarantees that @func will be called only once for each
283  * sink pad.
284  */
285 gboolean
286 gst_aggregator_iterate_sinkpads (GstAggregator * self,
287     GstAggregatorPadForeachFunc func, gpointer user_data)
288 {
289   gboolean result = FALSE;
290   GstIterator *iter;
291   gboolean done = FALSE;
292   GValue item = { 0, };
293   GList *seen_pads = NULL;
294
295   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
296
297   if (!iter)
298     goto no_iter;
299
300   while (!done) {
301     switch (gst_iterator_next (iter, &item)) {
302       case GST_ITERATOR_OK:
303       {
304         GstAggregatorPad *pad;
305
306         pad = g_value_get_object (&item);
307
308         /* if already pushed, skip. FIXME, find something faster to tag pads */
309         if (pad == NULL || g_list_find (seen_pads, pad)) {
310           g_value_reset (&item);
311           break;
312         }
313
314         GST_LOG_OBJECT (pad, "calling function %s on pad",
315             GST_DEBUG_FUNCPTR_NAME (func));
316
317         result = func (self, pad, user_data);
318
319         done = !result;
320
321         seen_pads = g_list_prepend (seen_pads, pad);
322
323         g_value_reset (&item);
324         break;
325       }
326       case GST_ITERATOR_RESYNC:
327         gst_iterator_resync (iter);
328         break;
329       case GST_ITERATOR_ERROR:
330         GST_ERROR_OBJECT (self,
331             "Could not iterate over internally linked pads");
332         done = TRUE;
333         break;
334       case GST_ITERATOR_DONE:
335         done = TRUE;
336         break;
337     }
338   }
339   g_value_unset (&item);
340   gst_iterator_free (iter);
341
342   if (seen_pads == NULL) {
343     GST_DEBUG_OBJECT (self, "No pad seen");
344     return FALSE;
345   }
346
347   g_list_free (seen_pads);
348
349 no_iter:
350   return result;
351 }
352
353 static gboolean
354 gst_aggregator_check_pads_ready (GstAggregator * self)
355 {
356   GstAggregatorPad *pad;
357   GList *l, *sinkpads;
358
359   GST_LOG_OBJECT (self, "checking pads");
360
361   GST_OBJECT_LOCK (self);
362
363   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
364   if (sinkpads == NULL)
365     goto no_sinkpads;
366
367   for (l = sinkpads; l != NULL; l = l->next) {
368     pad = l->data;
369
370     PAD_LOCK (pad);
371     if (pad->priv->buffer == NULL && !pad->priv->eos) {
372       PAD_UNLOCK (pad);
373       goto pad_not_ready;
374     }
375     PAD_UNLOCK (pad);
376
377   }
378
379   GST_OBJECT_UNLOCK (self);
380   GST_LOG_OBJECT (self, "pads are ready");
381   return TRUE;
382
383 no_sinkpads:
384   {
385     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
386     GST_OBJECT_UNLOCK (self);
387     return FALSE;
388   }
389 pad_not_ready:
390   {
391     GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
392     GST_OBJECT_UNLOCK (self);
393     return FALSE;
394   }
395 }
396
397 static void
398 gst_aggregator_reset_flow_values (GstAggregator * self)
399 {
400   GST_OBJECT_LOCK (self);
401   self->priv->send_stream_start = TRUE;
402   self->priv->send_segment = TRUE;
403   gst_segment_init (&self->segment, GST_FORMAT_TIME);
404   GST_OBJECT_UNLOCK (self);
405 }
406
407 static inline void
408 gst_aggregator_push_mandatory_events (GstAggregator * self)
409 {
410   GstAggregatorPrivate *priv = self->priv;
411   GstEvent *segment = NULL;
412   GstEvent *tags = NULL;
413
414   if (self->priv->send_stream_start) {
415     gchar s_id[32];
416
417     GST_INFO_OBJECT (self, "pushing stream start");
418     /* stream-start (FIXME: create id based on input ids) */
419     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
420     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
421       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
422     }
423     self->priv->send_stream_start = FALSE;
424   }
425
426   if (self->priv->srccaps) {
427
428     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
429         self->priv->srccaps);
430     if (!gst_pad_push_event (self->srcpad,
431             gst_event_new_caps (self->priv->srccaps))) {
432       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
433     }
434     gst_caps_unref (self->priv->srccaps);
435     self->priv->srccaps = NULL;
436   }
437
438   GST_OBJECT_LOCK (self);
439   if (self->priv->send_segment && !self->priv->flush_seeking) {
440     segment = gst_event_new_segment (&self->segment);
441
442     if (!self->priv->seqnum)
443       self->priv->seqnum = gst_event_get_seqnum (segment);
444     else
445       gst_event_set_seqnum (segment, self->priv->seqnum);
446     self->priv->send_segment = FALSE;
447
448     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
449   }
450
451   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
452     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
453     priv->tags_changed = FALSE;
454   }
455   GST_OBJECT_UNLOCK (self);
456
457   if (segment)
458     gst_pad_push_event (self->srcpad, segment);
459   if (tags)
460     gst_pad_push_event (self->srcpad, tags);
461
462 }
463
464 /**
465  * gst_aggregator_set_src_caps:
466  * @self: The #GstAggregator
467  * @caps: The #GstCaps to set on the src pad.
468  *
469  * Sets the caps to be used on the src pad.
470  */
471 void
472 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
473 {
474   GST_PAD_STREAM_LOCK (self->srcpad);
475   gst_caps_replace (&self->priv->srccaps, caps);
476   gst_aggregator_push_mandatory_events (self);
477   GST_PAD_STREAM_UNLOCK (self->srcpad);
478 }
479
480 /**
481  * gst_aggregator_finish_buffer:
482  * @self: The #GstAggregator
483  * @buffer: (transfer full): the #GstBuffer to push.
484  *
485  * This method will push the provided output buffer downstream. If needed,
486  * mandatory events such as stream-start, caps, and segment events will be
487  * sent before pushing the buffer.
488  */
489 GstFlowReturn
490 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
491 {
492   gst_aggregator_push_mandatory_events (self);
493
494   GST_OBJECT_LOCK (self);
495   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
496     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
497     GST_OBJECT_UNLOCK (self);
498     return gst_pad_push (self->srcpad, buffer);
499   } else {
500     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
501         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
502     GST_OBJECT_UNLOCK (self);
503     gst_buffer_unref (buffer);
504     return GST_FLOW_OK;
505   }
506 }
507
508 static void
509 gst_aggregator_push_eos (GstAggregator * self)
510 {
511   GstEvent *event;
512   gst_aggregator_push_mandatory_events (self);
513
514   event = gst_event_new_eos ();
515
516   GST_OBJECT_LOCK (self);
517   self->priv->send_eos = FALSE;
518   gst_event_set_seqnum (event, self->priv->seqnum);
519   GST_OBJECT_UNLOCK (self);
520
521   gst_pad_push_event (self->srcpad, event);
522 }
523
524 static GstClockTime
525 gst_aggregator_get_next_time (GstAggregator * self)
526 {
527   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
528
529   if (klass->get_next_time)
530     return klass->get_next_time (self);
531
532   return GST_CLOCK_TIME_NONE;
533 }
534
535 static gboolean
536 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
537 {
538   GstClockTime latency;
539   GstClockTime start;
540   gboolean res;
541
542   *timeout = FALSE;
543
544   SRC_LOCK (self);
545
546   latency = gst_aggregator_get_latency_unlocked (self);
547
548   if (gst_aggregator_check_pads_ready (self)) {
549     GST_DEBUG_OBJECT (self, "all pads have data");
550     SRC_UNLOCK (self);
551
552     return TRUE;
553   }
554
555   /* Before waiting, check if we're actually still running */
556   if (!self->priv->running || !self->priv->send_eos) {
557     SRC_UNLOCK (self);
558
559     return FALSE;
560   }
561
562   start = gst_aggregator_get_next_time (self);
563
564   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
565       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
566       !GST_CLOCK_TIME_IS_VALID (start)) {
567     /* We wake up here when something happened, and below
568      * then check if we're ready now. If we return FALSE,
569      * we will be directly called again.
570      */
571     SRC_WAIT (self);
572   } else {
573     GstClockTime base_time, time;
574     GstClock *clock;
575     GstClockReturn status;
576     GstClockTimeDiff jitter;
577
578     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
579         GST_TIME_ARGS (start));
580
581     GST_OBJECT_LOCK (self);
582     base_time = GST_ELEMENT_CAST (self)->base_time;
583     clock = GST_ELEMENT_CLOCK (self);
584     if (clock)
585       gst_object_ref (clock);
586     GST_OBJECT_UNLOCK (self);
587
588     time = base_time + start;
589     time += latency;
590
591     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
592         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
593         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
594         GST_TIME_ARGS (time),
595         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
596         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
597         GST_TIME_ARGS (gst_clock_get_time (clock)));
598
599
600     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
601     gst_object_unref (clock);
602     SRC_UNLOCK (self);
603
604     jitter = 0;
605     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
606
607     SRC_LOCK (self);
608     if (self->priv->aggregate_id) {
609       gst_clock_id_unref (self->priv->aggregate_id);
610       self->priv->aggregate_id = NULL;
611     }
612
613     GST_DEBUG_OBJECT (self,
614         "clock returned %d (jitter: %s%" GST_TIME_FORMAT ")",
615         status, (jitter < 0 ? "-" : " "),
616         GST_TIME_ARGS ((jitter < 0 ? -jitter : jitter)));
617
618     /* we timed out */
619     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
620       SRC_UNLOCK (self);
621       *timeout = TRUE;
622       return TRUE;
623     }
624   }
625
626   res = gst_aggregator_check_pads_ready (self);
627   SRC_UNLOCK (self);
628
629   return res;
630 }
631
632 static void
633 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
634     GstFlowReturn flow_return)
635 {
636   PAD_LOCK (aggpad);
637   if (flow_return == GST_FLOW_NOT_LINKED)
638     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
639   else
640     aggpad->priv->flow_return = flow_return;
641   gst_buffer_replace (&aggpad->priv->buffer, NULL);
642   PAD_BROADCAST_EVENT (aggpad);
643   PAD_UNLOCK (aggpad);
644 }
645
646 static void
647 gst_aggregator_aggregate_func (GstAggregator * self)
648 {
649   GstAggregatorPrivate *priv = self->priv;
650   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
651   gboolean timeout = FALSE;
652
653   if (self->priv->running == FALSE) {
654     GST_DEBUG_OBJECT (self, "Not running anymore");
655     return;
656   }
657
658   GST_LOG_OBJECT (self, "Checking aggregate");
659   while (priv->send_eos && priv->running) {
660     GstFlowReturn flow_return;
661
662     if (!gst_aggregator_wait_and_check (self, &timeout))
663       continue;
664
665     GST_TRACE_OBJECT (self, "Actually aggregating!");
666
667     flow_return = klass->aggregate (self, timeout);
668
669     GST_OBJECT_LOCK (self);
670     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
671       /* We don't want to set the pads to flushing, but we want to
672        * stop the thread, so just break here */
673       GST_OBJECT_UNLOCK (self);
674       break;
675     }
676     GST_OBJECT_UNLOCK (self);
677
678     if (flow_return == GST_FLOW_EOS) {
679       gst_aggregator_push_eos (self);
680     }
681
682     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
683
684     if (flow_return != GST_FLOW_OK) {
685       GList *item;
686
687       GST_OBJECT_LOCK (self);
688       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
689         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
690
691         gst_aggregator_pad_set_flushing (aggpad, flow_return);
692       }
693       GST_OBJECT_UNLOCK (self);
694       break;
695     }
696   }
697
698   /* Pause the task here, the only ways to get here are:
699    * 1) We're stopping, in which case the task is stopped anyway
700    * 2) We got a flow error above, in which case it might take
701    *    some time to forward the flow return upstream and we
702    *    would otherwise call the task function over and over
703    *    again without doing anything
704    */
705   gst_pad_pause_task (self->srcpad);
706 }
707
708 static gboolean
709 gst_aggregator_start (GstAggregator * self)
710 {
711   GstAggregatorClass *klass;
712   gboolean result;
713
714   self->priv->running = TRUE;
715   self->priv->send_stream_start = TRUE;
716   self->priv->send_segment = TRUE;
717   self->priv->send_eos = TRUE;
718   self->priv->srccaps = NULL;
719
720   klass = GST_AGGREGATOR_GET_CLASS (self);
721
722   if (klass->start)
723     result = klass->start (self);
724   else
725     result = TRUE;
726
727   return result;
728 }
729
730 static gboolean
731 _check_pending_flush_stop (GstAggregatorPad * pad)
732 {
733   gboolean res;
734
735   PAD_LOCK (pad);
736   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
737   PAD_UNLOCK (pad);
738
739   return res;
740 }
741
742 static gboolean
743 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
744 {
745   gboolean res = TRUE;
746
747   GST_INFO_OBJECT (self, "%s srcpad task",
748       flush_start ? "Pausing" : "Stopping");
749
750   SRC_LOCK (self);
751   self->priv->running = FALSE;
752   SRC_BROADCAST (self);
753   SRC_UNLOCK (self);
754
755   if (flush_start) {
756     res = gst_pad_push_event (self->srcpad, flush_start);
757   }
758
759   gst_pad_stop_task (self->srcpad);
760
761   return res;
762 }
763
764 static void
765 gst_aggregator_start_srcpad_task (GstAggregator * self)
766 {
767   GST_INFO_OBJECT (self, "Starting srcpad task");
768
769   self->priv->running = TRUE;
770   gst_pad_start_task (GST_PAD (self->srcpad),
771       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
772 }
773
774 static GstFlowReturn
775 gst_aggregator_flush (GstAggregator * self)
776 {
777   GstFlowReturn ret = GST_FLOW_OK;
778   GstAggregatorPrivate *priv = self->priv;
779   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
780
781   GST_DEBUG_OBJECT (self, "Flushing everything");
782   GST_OBJECT_LOCK (self);
783   priv->send_segment = TRUE;
784   priv->flush_seeking = FALSE;
785   priv->tags_changed = FALSE;
786   GST_OBJECT_UNLOCK (self);
787   if (klass->flush)
788     ret = klass->flush (self);
789
790   return ret;
791 }
792
793
794 /* Called with GstAggregator's object lock held */
795
796 static gboolean
797 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
798 {
799   GList *tmp;
800   GstAggregatorPad *tmppad;
801
802   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
803     tmppad = (GstAggregatorPad *) tmp->data;
804
805     if (_check_pending_flush_stop (tmppad) == FALSE) {
806       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
807           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
808       return FALSE;
809     }
810   }
811
812   return TRUE;
813 }
814
815 static void
816 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
817     GstEvent * event)
818 {
819   GstAggregatorPrivate *priv = self->priv;
820   GstAggregatorPadPrivate *padpriv = aggpad->priv;
821
822   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
823
824   PAD_FLUSH_LOCK (aggpad);
825   PAD_LOCK (aggpad);
826   if (padpriv->pending_flush_start) {
827     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
828
829     padpriv->pending_flush_start = FALSE;
830     padpriv->pending_flush_stop = TRUE;
831   }
832   PAD_UNLOCK (aggpad);
833
834   GST_OBJECT_LOCK (self);
835   if (priv->flush_seeking) {
836     /* If flush_seeking we forward the first FLUSH_START */
837     if (priv->pending_flush_start) {
838       priv->pending_flush_start = FALSE;
839       GST_OBJECT_UNLOCK (self);
840
841       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
842       gst_aggregator_stop_srcpad_task (self, event);
843
844       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
845       GST_PAD_STREAM_LOCK (self->srcpad);
846       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
847       event = NULL;
848     } else {
849       GST_OBJECT_UNLOCK (self);
850       gst_event_unref (event);
851     }
852   } else {
853     GST_OBJECT_UNLOCK (self);
854     gst_event_unref (event);
855   }
856   PAD_FLUSH_UNLOCK (aggpad);
857
858   gst_aggregator_pad_drop_buffer (aggpad);
859 }
860
861 /* GstAggregator vmethods default implementations */
862 static gboolean
863 gst_aggregator_default_sink_event (GstAggregator * self,
864     GstAggregatorPad * aggpad, GstEvent * event)
865 {
866   gboolean res = TRUE;
867   GstPad *pad = GST_PAD (aggpad);
868   GstAggregatorPrivate *priv = self->priv;
869
870   switch (GST_EVENT_TYPE (event)) {
871     case GST_EVENT_FLUSH_START:
872     {
873       gst_aggregator_flush_start (self, aggpad, event);
874       /* We forward only in one case: right after flush_seeking */
875       event = NULL;
876       goto eat;
877     }
878     case GST_EVENT_FLUSH_STOP:
879     {
880       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
881
882       gst_aggregator_pad_flush (aggpad, self);
883       GST_OBJECT_LOCK (self);
884       if (priv->flush_seeking) {
885         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
886         if (gst_aggregator_all_flush_stop_received_locked (self)) {
887           GST_OBJECT_UNLOCK (self);
888           /* That means we received FLUSH_STOP/FLUSH_STOP on
889            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
890           gst_aggregator_flush (self);
891           gst_pad_push_event (self->srcpad, event);
892           event = NULL;
893           SRC_LOCK (self);
894           priv->send_eos = TRUE;
895           SRC_BROADCAST (self);
896           SRC_UNLOCK (self);
897
898           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
899           GST_PAD_STREAM_UNLOCK (self->srcpad);
900           gst_aggregator_start_srcpad_task (self);
901         } else {
902           GST_OBJECT_UNLOCK (self);
903         }
904       } else {
905         GST_OBJECT_UNLOCK (self);
906       }
907
908       /* We never forward the event */
909       goto eat;
910     }
911     case GST_EVENT_EOS:
912     {
913       GST_DEBUG_OBJECT (aggpad, "EOS");
914
915       /* We still have a buffer, and we don't want the subclass to have to
916        * check for it. Mark pending_eos, eos will be set when steal_buffer is
917        * called
918        */
919       SRC_LOCK (self);
920       PAD_LOCK (aggpad);
921       if (!aggpad->priv->buffer) {
922         aggpad->priv->eos = TRUE;
923       } else {
924         aggpad->priv->pending_eos = TRUE;
925       }
926       PAD_UNLOCK (aggpad);
927
928       SRC_BROADCAST (self);
929       SRC_UNLOCK (self);
930       goto eat;
931     }
932     case GST_EVENT_SEGMENT:
933     {
934       GST_OBJECT_LOCK (aggpad);
935       gst_event_copy_segment (event, &aggpad->segment);
936       GST_OBJECT_UNLOCK (aggpad);
937
938       GST_OBJECT_LOCK (self);
939       self->priv->seqnum = gst_event_get_seqnum (event);
940       GST_OBJECT_UNLOCK (self);
941       goto eat;
942     }
943     case GST_EVENT_STREAM_START:
944     {
945       goto eat;
946     }
947     case GST_EVENT_GAP:
948     {
949       /* FIXME: need API to handle GAP events properly */
950       GST_FIXME_OBJECT (self, "implement support for GAP events");
951       /* don't forward GAP events downstream */
952       goto eat;
953     }
954     case GST_EVENT_TAG:
955     {
956       GstTagList *tags;
957
958       gst_event_parse_tag (event, &tags);
959
960       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
961         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
962         gst_event_unref (event);
963         event = NULL;
964         goto eat;
965       }
966       break;
967     }
968     default:
969     {
970       break;
971     }
972   }
973
974   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
975   return gst_pad_event_default (pad, GST_OBJECT (self), event);
976
977 eat:
978   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
979   if (event)
980     gst_event_unref (event);
981
982   return res;
983 }
984
985 static inline gboolean
986 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
987     gpointer unused_udata)
988 {
989   gst_aggregator_pad_flush (pad, self);
990
991   return TRUE;
992 }
993
994 static gboolean
995 gst_aggregator_stop (GstAggregator * agg)
996 {
997   GstAggregatorClass *klass;
998   gboolean result;
999
1000   gst_aggregator_reset_flow_values (agg);
1001
1002   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
1003
1004   klass = GST_AGGREGATOR_GET_CLASS (agg);
1005
1006   if (klass->stop)
1007     result = klass->stop (agg);
1008   else
1009     result = TRUE;
1010
1011   agg->priv->has_peer_latency = FALSE;
1012   agg->priv->peer_latency_live = FALSE;
1013   agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
1014
1015   if (agg->priv->tags)
1016     gst_tag_list_unref (agg->priv->tags);
1017   agg->priv->tags = NULL;
1018
1019   return result;
1020 }
1021
1022 /* GstElement vmethods implementations */
1023 static GstStateChangeReturn
1024 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1025 {
1026   GstStateChangeReturn ret;
1027   GstAggregator *self = GST_AGGREGATOR (element);
1028
1029   switch (transition) {
1030     case GST_STATE_CHANGE_READY_TO_PAUSED:
1031       if (!gst_aggregator_start (self))
1032         goto error_start;
1033       break;
1034     default:
1035       break;
1036   }
1037
1038   if ((ret =
1039           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1040               transition)) == GST_STATE_CHANGE_FAILURE)
1041     goto failure;
1042
1043
1044   switch (transition) {
1045     case GST_STATE_CHANGE_PAUSED_TO_READY:
1046       if (!gst_aggregator_stop (self)) {
1047         /* What to do in this case? Error out? */
1048         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1049       }
1050       break;
1051     default:
1052       break;
1053   }
1054
1055   return ret;
1056
1057 /* ERRORS */
1058 failure:
1059   {
1060     GST_ERROR_OBJECT (element, "parent failed state change");
1061     return ret;
1062   }
1063 error_start:
1064   {
1065     GST_ERROR_OBJECT (element, "Subclass failed to start");
1066     return GST_STATE_CHANGE_FAILURE;
1067   }
1068 }
1069
1070 static void
1071 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1072 {
1073   GstAggregator *self = GST_AGGREGATOR (element);
1074   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1075
1076   GST_INFO_OBJECT (pad, "Removing pad");
1077
1078   SRC_LOCK (self);
1079   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
1080   gst_element_remove_pad (element, pad);
1081
1082   SRC_BROADCAST (self);
1083   SRC_UNLOCK (self);
1084 }
1085
1086 static GstPad *
1087 gst_aggregator_request_new_pad (GstElement * element,
1088     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1089 {
1090   GstAggregator *self;
1091   GstAggregatorPad *agg_pad;
1092
1093   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
1094   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1095
1096   self = GST_AGGREGATOR (element);
1097
1098   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
1099     gint serial = 0;
1100     gchar *name = NULL;
1101
1102     GST_OBJECT_LOCK (element);
1103     if (req_name == NULL || strlen (req_name) < 6
1104         || !g_str_has_prefix (req_name, "sink_")) {
1105       /* no name given when requesting the pad, use next available int */
1106       priv->padcount++;
1107     } else {
1108       /* parse serial number from requested padname */
1109       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1110       if (serial >= priv->padcount)
1111         priv->padcount = serial;
1112     }
1113
1114     name = g_strdup_printf ("sink_%u", priv->padcount);
1115     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1116         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1117     g_free (name);
1118
1119     GST_OBJECT_UNLOCK (element);
1120
1121   } else {
1122     return NULL;
1123   }
1124
1125   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1126
1127   if (priv->running)
1128     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1129
1130   /* add the pad to the element */
1131   gst_element_add_pad (element, GST_PAD (agg_pad));
1132
1133   return GST_PAD (agg_pad);
1134 }
1135
1136 /* Must be called with SRC_LOCK held */
1137
1138 static gboolean
1139 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1140 {
1141   gboolean query_ret, live;
1142   GstClockTime our_latency, min, max;
1143
1144   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1145
1146   if (!query_ret) {
1147     GST_WARNING_OBJECT (self, "Latency query failed");
1148     return FALSE;
1149   }
1150
1151   gst_query_parse_latency (query, &live, &min, &max);
1152
1153   our_latency = self->priv->latency;
1154
1155   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1156     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1157         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1158     return FALSE;
1159   }
1160
1161   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1162     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1163         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1164             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1165             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1166     return FALSE;
1167   }
1168
1169   self->priv->peer_latency_live = live;
1170   self->priv->peer_latency_min = min;
1171   self->priv->peer_latency_max = max;
1172   self->priv->has_peer_latency = TRUE;
1173
1174   /* add our own */
1175   min += our_latency;
1176   min += self->priv->sub_latency_min;
1177   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1178       && GST_CLOCK_TIME_IS_VALID (max))
1179     max += self->priv->sub_latency_max;
1180   else
1181     max = GST_CLOCK_TIME_NONE;
1182
1183   if (live && min > max) {
1184     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1185         ("%s", "Latency too big"),
1186         ("The requested latency value is too big for the current pipeline.  "
1187             "Limiting to %" G_GINT64_FORMAT, max));
1188     min = max;
1189     /* FIXME: This could in theory become negative, but in
1190      * that case all is lost anyway */
1191     self->priv->latency -= min - max;
1192     /* FIXME: shouldn't we g_object_notify() the change here? */
1193   }
1194
1195   SRC_BROADCAST (self);
1196
1197   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1198       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1199
1200   gst_query_set_latency (query, live, min, max);
1201
1202   return query_ret;
1203 }
1204
1205 /*
1206  * MUST be called with the src_lock held.
1207  *
1208  * See  gst_aggregator_get_latency() for doc
1209  */
1210 static GstClockTime
1211 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1212 {
1213   GstClockTime latency;
1214
1215   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1216
1217   if (!self->priv->has_peer_latency) {
1218     GstQuery *query = gst_query_new_latency ();
1219     gboolean ret;
1220
1221     ret = gst_aggregator_query_latency_unlocked (self, query);
1222     gst_query_unref (query);
1223     if (!ret)
1224       return GST_CLOCK_TIME_NONE;
1225   }
1226
1227   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1228     return GST_CLOCK_TIME_NONE;
1229
1230   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1231   latency = self->priv->peer_latency_min;
1232
1233   /* add our own */
1234   latency += self->priv->latency;
1235   latency += self->priv->sub_latency_min;
1236
1237   return latency;
1238 }
1239
1240 /**
1241  * gst_aggregator_get_latency:
1242  * @self: a #GstAggregator
1243  *
1244  * Retrieves the latency values reported by @self in response to the latency
1245  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1246  * will not wait for the clock.
1247  *
1248  * Typically only called by subclasses.
1249  *
1250  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1251  */
1252 GstClockTime
1253 gst_aggregator_get_latency (GstAggregator * self)
1254 {
1255   GstClockTime ret;
1256
1257   SRC_LOCK (self);
1258   ret = gst_aggregator_get_latency_unlocked (self);
1259   SRC_UNLOCK (self);
1260
1261   return ret;
1262 }
1263
1264 static gboolean
1265 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1266 {
1267   GstAggregator *self = GST_AGGREGATOR (element);
1268
1269   GST_STATE_LOCK (element);
1270   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1271       GST_STATE (element) < GST_STATE_PAUSED) {
1272     gdouble rate;
1273     GstFormat fmt;
1274     GstSeekFlags flags;
1275     GstSeekType start_type, stop_type;
1276     gint64 start, stop;
1277
1278     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1279         &start, &stop_type, &stop);
1280
1281     GST_OBJECT_LOCK (self);
1282     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1283         stop_type, stop, NULL);
1284     self->priv->seqnum = gst_event_get_seqnum (event);
1285     GST_OBJECT_UNLOCK (self);
1286
1287     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1288   }
1289   GST_STATE_UNLOCK (element);
1290
1291
1292   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1293       event);
1294 }
1295
1296 static gboolean
1297 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1298 {
1299   gboolean res = TRUE;
1300
1301   switch (GST_QUERY_TYPE (query)) {
1302     case GST_QUERY_SEEKING:
1303     {
1304       GstFormat format;
1305
1306       /* don't pass it along as some (file)sink might claim it does
1307        * whereas with a collectpads in between that will not likely work */
1308       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1309       gst_query_set_seeking (query, format, FALSE, 0, -1);
1310       res = TRUE;
1311
1312       break;
1313     }
1314     case GST_QUERY_LATENCY:
1315       SRC_LOCK (self);
1316       res = gst_aggregator_query_latency_unlocked (self, query);
1317       SRC_UNLOCK (self);
1318       break;
1319     default:
1320       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1321   }
1322
1323   return res;
1324 }
1325
1326 static gboolean
1327 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1328 {
1329   EventData *evdata = user_data;
1330   gboolean ret = TRUE;
1331   GstPad *peer = gst_pad_get_peer (pad);
1332   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1333
1334   if (peer) {
1335     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1336     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1337     gst_object_unref (peer);
1338   }
1339
1340   if (ret == FALSE) {
1341     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
1342       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1343
1344     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1345       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1346
1347       if (gst_pad_query (peer, seeking)) {
1348         gboolean seekable;
1349
1350         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1351
1352         if (seekable == FALSE) {
1353           GST_INFO_OBJECT (pad,
1354               "Source not seekable, We failed but it does not matter!");
1355
1356           ret = TRUE;
1357         }
1358       } else {
1359         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1360       }
1361
1362       gst_query_unref (seeking);
1363     }
1364
1365     if (evdata->flush) {
1366       PAD_LOCK (aggpad);
1367       aggpad->priv->pending_flush_start = FALSE;
1368       aggpad->priv->pending_flush_stop = FALSE;
1369       PAD_UNLOCK (aggpad);
1370     }
1371   } else {
1372     evdata->one_actually_seeked = TRUE;
1373   }
1374
1375   evdata->result &= ret;
1376
1377   /* Always send to all pads */
1378   return FALSE;
1379 }
1380
1381 static EventData
1382 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1383     GstEvent * event, gboolean flush)
1384 {
1385   EventData evdata;
1386
1387   evdata.event = event;
1388   evdata.result = TRUE;
1389   evdata.flush = flush;
1390   evdata.one_actually_seeked = FALSE;
1391
1392   /* We first need to set all pads as flushing in a first pass
1393    * as flush_start flush_stop is sometimes sent synchronously
1394    * while we send the seek event */
1395   if (flush) {
1396     GList *l;
1397
1398     GST_OBJECT_LOCK (self);
1399     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1400       GstAggregatorPad *pad = l->data;
1401
1402       PAD_LOCK (pad);
1403       pad->priv->pending_flush_start = TRUE;
1404       pad->priv->pending_flush_stop = FALSE;
1405       PAD_UNLOCK (pad);
1406     }
1407     GST_OBJECT_UNLOCK (self);
1408   }
1409
1410   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
1411
1412   gst_event_unref (event);
1413
1414   return evdata;
1415 }
1416
1417 static gboolean
1418 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1419 {
1420   gdouble rate;
1421   GstFormat fmt;
1422   GstSeekFlags flags;
1423   GstSeekType start_type, stop_type;
1424   gint64 start, stop;
1425   gboolean flush;
1426   EventData evdata;
1427   GstAggregatorPrivate *priv = self->priv;
1428
1429   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1430       &start, &stop_type, &stop);
1431
1432   GST_INFO_OBJECT (self, "starting SEEK");
1433
1434   flush = flags & GST_SEEK_FLAG_FLUSH;
1435
1436   GST_OBJECT_LOCK (self);
1437   if (flush) {
1438     priv->pending_flush_start = TRUE;
1439     priv->flush_seeking = TRUE;
1440   }
1441
1442   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1443       stop_type, stop, NULL);
1444   GST_OBJECT_UNLOCK (self);
1445
1446   /* forward the seek upstream */
1447   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
1448   event = NULL;
1449
1450   if (!evdata.result || !evdata.one_actually_seeked) {
1451     GST_OBJECT_LOCK (self);
1452     priv->flush_seeking = FALSE;
1453     priv->pending_flush_start = FALSE;
1454     GST_OBJECT_UNLOCK (self);
1455   }
1456
1457   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1458
1459   return evdata.result;
1460 }
1461
1462 static gboolean
1463 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
1464 {
1465   EventData evdata;
1466   gboolean res = TRUE;
1467
1468   switch (GST_EVENT_TYPE (event)) {
1469     case GST_EVENT_SEEK:
1470     {
1471       gst_event_ref (event);
1472       res = gst_aggregator_do_seek (self, event);
1473       gst_event_unref (event);
1474       event = NULL;
1475       goto done;
1476     }
1477     case GST_EVENT_NAVIGATION:
1478     {
1479       /* navigation is rather pointless. */
1480       res = FALSE;
1481       gst_event_unref (event);
1482       goto done;
1483     }
1484     default:
1485     {
1486       break;
1487     }
1488   }
1489
1490   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE);
1491   res = evdata.result;
1492
1493 done:
1494   return res;
1495 }
1496
1497 static gboolean
1498 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
1499     GstEvent * event)
1500 {
1501   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1502
1503   return klass->src_event (GST_AGGREGATOR (parent), event);
1504 }
1505
1506 static gboolean
1507 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
1508     GstQuery * query)
1509 {
1510   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1511
1512   return klass->src_query (GST_AGGREGATOR (parent), query);
1513 }
1514
1515 static gboolean
1516 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
1517     GstObject * parent, GstPadMode mode, gboolean active)
1518 {
1519   GstAggregator *self = GST_AGGREGATOR (parent);
1520   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1521
1522   if (klass->src_activate) {
1523     if (klass->src_activate (self, mode, active) == FALSE) {
1524       return FALSE;
1525     }
1526   }
1527
1528   if (active == TRUE) {
1529     switch (mode) {
1530       case GST_PAD_MODE_PUSH:
1531       {
1532         GST_INFO_OBJECT (pad, "Activating pad!");
1533         gst_aggregator_start_srcpad_task (self);
1534         return TRUE;
1535       }
1536       default:
1537       {
1538         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1539         return FALSE;
1540       }
1541     }
1542   }
1543
1544   /* deactivating */
1545   GST_INFO_OBJECT (self, "Deactivating srcpad");
1546   gst_aggregator_stop_srcpad_task (self, FALSE);
1547
1548   return TRUE;
1549 }
1550
1551 static gboolean
1552 gst_aggregator_default_sink_query (GstAggregator * self,
1553     GstAggregatorPad * aggpad, GstQuery * query)
1554 {
1555   GstPad *pad = GST_PAD (aggpad);
1556
1557   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1558 }
1559
1560 static void
1561 gst_aggregator_finalize (GObject * object)
1562 {
1563   GstAggregator *self = (GstAggregator *) object;
1564
1565   g_mutex_clear (&self->priv->src_lock);
1566   g_cond_clear (&self->priv->src_cond);
1567
1568   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1569 }
1570
1571 /*
1572  * gst_aggregator_set_latency_property:
1573  * @agg: a #GstAggregator
1574  * @latency: the new latency value.
1575  *
1576  * Sets the new latency value to @latency. This value is used to limit the
1577  * amount of time a pad waits for data to appear before considering the pad
1578  * as unresponsive.
1579  */
1580 static void
1581 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1582 {
1583   gboolean changed;
1584   GstClockTime min, max;
1585
1586   g_return_if_fail (GST_IS_AGGREGATOR (self));
1587   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
1588
1589   SRC_LOCK (self);
1590   if (self->priv->peer_latency_live) {
1591     min = self->priv->peer_latency_min;
1592     max = self->priv->peer_latency_max;
1593     /* add our own */
1594     min += latency;
1595     min += self->priv->sub_latency_min;
1596     if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1597         && GST_CLOCK_TIME_IS_VALID (max))
1598       max += self->priv->sub_latency_max;
1599     else
1600       max = GST_CLOCK_TIME_NONE;
1601
1602     if (GST_CLOCK_TIME_IS_VALID (max) && min > max) {
1603       GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1604           ("%s", "Latency too big"),
1605           ("The requested latency value is too big for the latency in the "
1606               "current pipeline.  Limiting to %" G_GINT64_FORMAT, max));
1607       /* FIXME: This could in theory become negative, but in
1608        * that case all is lost anyway */
1609       latency -= min - max;
1610       /* FIXME: shouldn't we g_object_notify() the change here? */
1611     }
1612   }
1613
1614   changed = (self->priv->latency != latency);
1615   self->priv->latency = latency;
1616
1617   if (changed)
1618     SRC_BROADCAST (self);
1619   SRC_UNLOCK (self);
1620
1621   if (changed)
1622     gst_element_post_message (GST_ELEMENT_CAST (self),
1623         gst_message_new_latency (GST_OBJECT_CAST (self)));
1624 }
1625
1626 /*
1627  * gst_aggregator_get_latency_property:
1628  * @agg: a #GstAggregator
1629  *
1630  * Gets the latency value. See gst_aggregator_set_latency for
1631  * more details.
1632  *
1633  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
1634  * before a pad is deemed unresponsive. A value of -1 means an
1635  * unlimited time.
1636  */
1637 static gint64
1638 gst_aggregator_get_latency_property (GstAggregator * agg)
1639 {
1640   gint64 res;
1641
1642   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
1643
1644   GST_OBJECT_LOCK (agg);
1645   res = agg->priv->latency;
1646   GST_OBJECT_UNLOCK (agg);
1647
1648   return res;
1649 }
1650
1651 static void
1652 gst_aggregator_set_property (GObject * object, guint prop_id,
1653     const GValue * value, GParamSpec * pspec)
1654 {
1655   GstAggregator *agg = GST_AGGREGATOR (object);
1656
1657   switch (prop_id) {
1658     case PROP_LATENCY:
1659       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
1660       break;
1661     default:
1662       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1663       break;
1664   }
1665 }
1666
1667 static void
1668 gst_aggregator_get_property (GObject * object, guint prop_id,
1669     GValue * value, GParamSpec * pspec)
1670 {
1671   GstAggregator *agg = GST_AGGREGATOR (object);
1672
1673   switch (prop_id) {
1674     case PROP_LATENCY:
1675       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
1676       break;
1677     default:
1678       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1679       break;
1680   }
1681 }
1682
1683 /* GObject vmethods implementations */
1684 static void
1685 gst_aggregator_class_init (GstAggregatorClass * klass)
1686 {
1687   GObjectClass *gobject_class = (GObjectClass *) klass;
1688   GstElementClass *gstelement_class = (GstElementClass *) klass;
1689
1690   aggregator_parent_class = g_type_class_peek_parent (klass);
1691   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1692
1693   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1694       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1695
1696   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1697
1698   klass->sink_event = gst_aggregator_default_sink_event;
1699   klass->sink_query = gst_aggregator_default_sink_query;
1700
1701   klass->src_event = gst_aggregator_default_src_event;
1702   klass->src_query = gst_aggregator_default_src_query;
1703
1704   gstelement_class->request_new_pad =
1705       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
1706   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
1707   gstelement_class->release_pad =
1708       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
1709   gstelement_class->change_state =
1710       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
1711
1712   gobject_class->set_property = gst_aggregator_set_property;
1713   gobject_class->get_property = gst_aggregator_get_property;
1714   gobject_class->finalize = gst_aggregator_finalize;
1715
1716   g_object_class_install_property (gobject_class, PROP_LATENCY,
1717       g_param_spec_int64 ("latency", "Buffer latency",
1718           "Additional latency in live mode to allow upstream "
1719           "to take longer to produce buffers for the current "
1720           "position", 0,
1721           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
1722           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1723
1724   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
1725 }
1726
1727 static void
1728 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1729 {
1730   GstPadTemplate *pad_template;
1731   GstAggregatorPrivate *priv;
1732
1733   g_return_if_fail (klass->aggregate != NULL);
1734
1735   self->priv =
1736       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1737       GstAggregatorPrivate);
1738
1739   priv = self->priv;
1740
1741   pad_template =
1742       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1743   g_return_if_fail (pad_template != NULL);
1744
1745   priv->padcount = -1;
1746   priv->tags_changed = FALSE;
1747
1748   self->priv->peer_latency_live = FALSE;
1749   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
1750   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
1751   self->priv->has_peer_latency = FALSE;
1752   gst_aggregator_reset_flow_values (self);
1753
1754   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1755
1756   gst_pad_set_event_function (self->srcpad,
1757       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
1758   gst_pad_set_query_function (self->srcpad,
1759       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
1760   gst_pad_set_activatemode_function (self->srcpad,
1761       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
1762
1763   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1764
1765   self->priv->latency = DEFAULT_LATENCY;
1766
1767   g_mutex_init (&self->priv->src_lock);
1768   g_cond_init (&self->priv->src_cond);
1769 }
1770
1771 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1772  * method to get to the padtemplates */
1773 GType
1774 gst_aggregator_get_type (void)
1775 {
1776   static volatile gsize type = 0;
1777
1778   if (g_once_init_enter (&type)) {
1779     GType _type;
1780     static const GTypeInfo info = {
1781       sizeof (GstAggregatorClass),
1782       NULL,
1783       NULL,
1784       (GClassInitFunc) gst_aggregator_class_init,
1785       NULL,
1786       NULL,
1787       sizeof (GstAggregator),
1788       0,
1789       (GInstanceInitFunc) gst_aggregator_init,
1790     };
1791
1792     _type = g_type_register_static (GST_TYPE_ELEMENT,
1793         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1794     g_once_init_leave (&type, _type);
1795   }
1796   return type;
1797 }
1798
1799 static GstFlowReturn
1800 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1801 {
1802   GstBuffer *actual_buf = buffer;
1803   GstAggregator *self = GST_AGGREGATOR (object);
1804   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1805   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1806   GstFlowReturn flow_return;
1807
1808   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1809
1810   PAD_FLUSH_LOCK (aggpad);
1811
1812   PAD_LOCK (aggpad);
1813   flow_return = aggpad->priv->flow_return;
1814   if (flow_return != GST_FLOW_OK)
1815     goto flushing;
1816
1817   if (aggpad->priv->pending_eos == TRUE)
1818     goto eos;
1819
1820   while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
1821     PAD_WAIT_EVENT (aggpad);
1822
1823   flow_return = aggpad->priv->flow_return;
1824   if (flow_return != GST_FLOW_OK)
1825     goto flushing;
1826
1827   PAD_UNLOCK (aggpad);
1828
1829   if (aggclass->clip) {
1830     aggclass->clip (self, aggpad, buffer, &actual_buf);
1831   }
1832
1833   SRC_LOCK (self);
1834   PAD_LOCK (aggpad);
1835   if (aggpad->priv->buffer)
1836     gst_buffer_unref (aggpad->priv->buffer);
1837   aggpad->priv->buffer = actual_buf;
1838
1839   flow_return = aggpad->priv->flow_return;
1840
1841   PAD_UNLOCK (aggpad);
1842   PAD_FLUSH_UNLOCK (aggpad);
1843
1844   SRC_BROADCAST (self);
1845   SRC_UNLOCK (self);
1846
1847   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1848
1849   return flow_return;
1850
1851 flushing:
1852   PAD_UNLOCK (aggpad);
1853   PAD_FLUSH_UNLOCK (aggpad);
1854
1855   gst_buffer_unref (buffer);
1856   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
1857       gst_flow_get_name (flow_return));
1858
1859   return flow_return;
1860
1861 eos:
1862   PAD_UNLOCK (aggpad);
1863   PAD_FLUSH_UNLOCK (aggpad);
1864
1865   gst_buffer_unref (buffer);
1866   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1867
1868   return GST_FLOW_EOS;
1869 }
1870
1871 static gboolean
1872 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
1873     GstQuery * query)
1874 {
1875   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1876   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1877
1878   if (GST_QUERY_IS_SERIALIZED (query)) {
1879     PAD_LOCK (aggpad);
1880
1881     while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
1882       PAD_WAIT_EVENT (aggpad);
1883
1884     if (aggpad->priv->flow_return != GST_FLOW_OK)
1885       goto flushing;
1886
1887     PAD_UNLOCK (aggpad);
1888   }
1889
1890   return klass->sink_query (GST_AGGREGATOR (parent),
1891       GST_AGGREGATOR_PAD (pad), query);
1892
1893 flushing:
1894   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
1895       gst_flow_get_name (aggpad->priv->flow_return));
1896   PAD_UNLOCK (aggpad);
1897   return FALSE;
1898 }
1899
1900 static gboolean
1901 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
1902     GstEvent * event)
1903 {
1904   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1905   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1906
1907   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
1908       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
1909     PAD_LOCK (aggpad);
1910
1911
1912     while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
1913       PAD_WAIT_EVENT (aggpad);
1914
1915     if (aggpad->priv->flow_return != GST_FLOW_OK
1916         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
1917       goto flushing;
1918
1919     PAD_UNLOCK (aggpad);
1920   }
1921
1922   return klass->sink_event (GST_AGGREGATOR (parent),
1923       GST_AGGREGATOR_PAD (pad), event);
1924
1925 flushing:
1926   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
1927       gst_flow_get_name (aggpad->priv->flow_return));
1928   PAD_UNLOCK (aggpad);
1929   if (GST_EVENT_IS_STICKY (event))
1930     gst_pad_store_sticky_event (pad, event);
1931   gst_event_unref (event);
1932   return FALSE;
1933 }
1934
1935 static gboolean
1936 gst_aggregator_pad_activate_mode_func (GstPad * pad,
1937     GstObject * parent, GstPadMode mode, gboolean active)
1938 {
1939   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1940
1941   if (active == FALSE) {
1942     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
1943   } else {
1944     PAD_LOCK (aggpad);
1945     aggpad->priv->flow_return = GST_FLOW_OK;
1946     PAD_BROADCAST_EVENT (aggpad);
1947     PAD_UNLOCK (aggpad);
1948   }
1949
1950   return TRUE;
1951 }
1952
1953 /***********************************
1954  * GstAggregatorPad implementation  *
1955  ************************************/
1956 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1957
1958 static void
1959 gst_aggregator_pad_constructed (GObject * object)
1960 {
1961   GstPad *pad = GST_PAD (object);
1962
1963   gst_pad_set_chain_function (pad,
1964       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
1965   gst_pad_set_event_function (pad,
1966       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func));
1967   gst_pad_set_query_function (pad,
1968       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
1969   gst_pad_set_activatemode_function (pad,
1970       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
1971 }
1972
1973 static void
1974 gst_aggregator_pad_finalize (GObject * object)
1975 {
1976   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1977
1978   g_cond_clear (&pad->priv->event_cond);
1979   g_mutex_clear (&pad->priv->flush_lock);
1980   g_mutex_clear (&pad->priv->lock);
1981
1982   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
1983 }
1984
1985 static void
1986 gst_aggregator_pad_dispose (GObject * object)
1987 {
1988   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1989
1990   gst_aggregator_pad_drop_buffer (pad);
1991
1992   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
1993 }
1994
1995 static void
1996 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
1997 {
1998   GObjectClass *gobject_class = (GObjectClass *) klass;
1999
2000   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
2001
2002   gobject_class->constructed = gst_aggregator_pad_constructed;
2003   gobject_class->finalize = gst_aggregator_pad_finalize;
2004   gobject_class->dispose = gst_aggregator_pad_dispose;
2005 }
2006
2007 static void
2008 gst_aggregator_pad_init (GstAggregatorPad * pad)
2009 {
2010   pad->priv =
2011       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
2012       GstAggregatorPadPrivate);
2013
2014   pad->priv->buffer = NULL;
2015   g_cond_init (&pad->priv->event_cond);
2016
2017   g_mutex_init (&pad->priv->flush_lock);
2018   g_mutex_init (&pad->priv->lock);
2019 }
2020
2021 /**
2022  * gst_aggregator_pad_steal_buffer:
2023  * @pad: the pad to get buffer from
2024  *
2025  * Steal the ref to the buffer currently queued in @pad.
2026  *
2027  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2028  *   queued. You should unref the buffer after usage.
2029  */
2030 GstBuffer *
2031 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
2032 {
2033   GstBuffer *buffer = NULL;
2034
2035   PAD_LOCK (pad);
2036   if (pad->priv->buffer) {
2037     GST_TRACE_OBJECT (pad, "Consuming buffer");
2038     buffer = pad->priv->buffer;
2039     pad->priv->buffer = NULL;
2040     if (pad->priv->pending_eos) {
2041       pad->priv->pending_eos = FALSE;
2042       pad->priv->eos = TRUE;
2043     }
2044     PAD_BROADCAST_EVENT (pad);
2045     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2046   }
2047   PAD_UNLOCK (pad);
2048
2049   return buffer;
2050 }
2051
2052 /**
2053  * gst_aggregator_pad_drop_buffer:
2054  * @pad: the pad where to drop any pending buffer
2055  *
2056  * Drop the buffer currently queued in @pad.
2057  *
2058  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2059  */
2060 gboolean
2061 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2062 {
2063   GstBuffer *buf;
2064
2065   buf = gst_aggregator_pad_steal_buffer (pad);
2066
2067   if (buf == NULL)
2068     return FALSE;
2069
2070   gst_buffer_unref (buf);
2071   return TRUE;
2072 }
2073
2074 /**
2075  * gst_aggregator_pad_get_buffer:
2076  * @pad: the pad to get buffer from
2077  *
2078  * Returns: (transfer full): A reference to the buffer in @pad or
2079  * NULL if no buffer was queued. You should unref the buffer after
2080  * usage.
2081  */
2082 GstBuffer *
2083 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
2084 {
2085   GstBuffer *buffer = NULL;
2086
2087   PAD_LOCK (pad);
2088   if (pad->priv->buffer)
2089     buffer = gst_buffer_ref (pad->priv->buffer);
2090   PAD_UNLOCK (pad);
2091
2092   return buffer;
2093 }
2094
2095 gboolean
2096 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2097 {
2098   gboolean is_eos;
2099
2100   PAD_LOCK (pad);
2101   is_eos = pad->priv->eos;
2102   PAD_UNLOCK (pad);
2103
2104   return is_eos;
2105 }
2106
2107 /**
2108  * gst_aggregator_merge_tags:
2109  * @self: a #GstAggregator
2110  * @tags: a #GstTagList to merge
2111  * @mode: the #GstTagMergeMode to use
2112  *
2113  * Adds tags to so-called pending tags, which will be processed
2114  * before pushing out data downstream.
2115  *
2116  * Note that this is provided for convenience, and the subclass is
2117  * not required to use this and can still do tag handling on its own.
2118  *
2119  * MT safe.
2120  */
2121 void
2122 gst_aggregator_merge_tags (GstAggregator * self,
2123     const GstTagList * tags, GstTagMergeMode mode)
2124 {
2125   GstTagList *otags;
2126
2127   g_return_if_fail (GST_IS_AGGREGATOR (self));
2128   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
2129
2130   /* FIXME Check if we can use OBJECT lock here! */
2131   GST_OBJECT_LOCK (self);
2132   if (tags)
2133     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2134   otags = self->priv->tags;
2135   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2136   if (otags)
2137     gst_tag_list_unref (otags);
2138   self->priv->tags_changed = TRUE;
2139   GST_OBJECT_UNLOCK (self);
2140 }
2141
2142 /**
2143  * gst_aggregator_set_latency:
2144  * @self: a #GstAggregator
2145  * @min_latency: minimum latency
2146  * @max_latency: maximum latency
2147  *
2148  * Lets #GstAggregator sub-classes tell the baseclass what their internal
2149  * latency is. Will also post a LATENCY message on the bus so the pipeline
2150  * can reconfigure its global latency.
2151  */
2152 void
2153 gst_aggregator_set_latency (GstAggregator * self,
2154     GstClockTime min_latency, GstClockTime max_latency)
2155 {
2156   gboolean changed = FALSE;
2157
2158   g_return_if_fail (GST_IS_AGGREGATOR (self));
2159   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
2160   g_return_if_fail (max_latency >= min_latency);
2161
2162   SRC_LOCK (self);
2163   if (self->priv->sub_latency_min != min_latency) {
2164     self->priv->sub_latency_min = min_latency;
2165     changed = TRUE;
2166   }
2167   if (self->priv->sub_latency_max != max_latency) {
2168     self->priv->sub_latency_max = max_latency;
2169     changed = TRUE;
2170   }
2171
2172   if (changed)
2173     SRC_BROADCAST (self);
2174   SRC_UNLOCK (self);
2175
2176   if (changed) {
2177     gst_element_post_message (GST_ELEMENT_CAST (self),
2178         gst_message_new_latency (GST_OBJECT_CAST (self)));
2179   }
2180 }