aggregator: Make sure that the minimum latencies are never GST_CLOCK_TIME_NONE
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Implementers should at least implement
33  *    the aggregate () vmethod.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72 static void gst_aggregator_set_latency_property (GstAggregator * agg,
73     gint64 latency);
74 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
75
76
77 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
78 #define GST_CAT_DEFAULT aggregator_debug
79
80 /* GstAggregatorPad definitions */
81 #define PAD_LOCK_EVENT(pad)   G_STMT_START {                            \
82   GST_LOG_OBJECT (pad, "Taking EVENT lock from thread %p",              \
83         g_thread_self());                                               \
84   g_mutex_lock(&pad->priv->event_lock);                                 \
85   GST_LOG_OBJECT (pad, "Took EVENT lock from thread %p",              \
86         g_thread_self());                                               \
87   } G_STMT_END
88
89 #define PAD_UNLOCK_EVENT(pad)  G_STMT_START {                           \
90   GST_LOG_OBJECT (pad, "Releasing EVENT lock from thread %p",          \
91         g_thread_self());                                               \
92   g_mutex_unlock(&pad->priv->event_lock);                               \
93   GST_LOG_OBJECT (pad, "Release EVENT lock from thread %p",          \
94         g_thread_self());                                               \
95   } G_STMT_END
96
97
98 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
99   GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p",               \
100         g_thread_self());                                               \
101   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),       \
102       &(pad->priv->event_lock));                                        \
103   GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p",               \
104         g_thread_self());                                               \
105   } G_STMT_END
106
107 #define PAD_BROADCAST_EVENT(pad) {                                          \
108   GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p",               \
109         g_thread_self());                                                   \
110   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \
111   }
112
113 #define GST_AGGREGATOR_SETCAPS_LOCK(self)   G_STMT_START {        \
114   GST_LOG_OBJECT (self, "Taking SETCAPS lock from thread %p",   \
115         g_thread_self());                                         \
116   g_mutex_lock(&self->priv->setcaps_lock);                         \
117   GST_LOG_OBJECT (self, "Took SETCAPS lock from thread %p",     \
118         g_thread_self());                                         \
119   } G_STMT_END
120
121 #define GST_AGGREGATOR_SETCAPS_UNLOCK(self)   G_STMT_START {        \
122   GST_LOG_OBJECT (self, "Releasing SETCAPS lock from thread %p",  \
123         g_thread_self());                                           \
124   g_mutex_unlock(&self->priv->setcaps_lock);                         \
125   GST_LOG_OBJECT (self, "Took SETCAPS lock from thread %p",       \
126         g_thread_self());                                           \
127   } G_STMT_END
128
129 #define PAD_STREAM_LOCK(pad)   G_STMT_START {                            \
130   GST_LOG_OBJECT (pad, "Taking lock from thread %p",              \
131         g_thread_self());                                               \
132   g_mutex_lock(&pad->priv->stream_lock);                                 \
133   GST_LOG_OBJECT (pad, "Took lock from thread %p",              \
134         g_thread_self());                                               \
135   } G_STMT_END
136
137 #define PAD_STREAM_UNLOCK(pad)  G_STMT_START {                           \
138   GST_LOG_OBJECT (pad, "Releasing lock from thread %p",          \
139         g_thread_self());                                               \
140   g_mutex_unlock(&pad->priv->stream_lock);                               \
141   GST_LOG_OBJECT (pad, "Release lock from thread %p",          \
142         g_thread_self());                                               \
143   } G_STMT_END
144
145 #define SRC_STREAM_LOCK(self)   G_STMT_START {                             \
146   GST_LOG_OBJECT (self, "Taking src STREAM lock from thread %p",           \
147         g_thread_self());                                                  \
148   g_mutex_lock(&self->priv->src_lock);                                     \
149   GST_LOG_OBJECT (self, "Took src STREAM lock from thread %p",             \
150         g_thread_self());                                                  \
151   } G_STMT_END
152
153 #define SRC_STREAM_UNLOCK(self)  G_STMT_START {                            \
154   GST_LOG_OBJECT (self, "Releasing src STREAM lock from thread %p",        \
155         g_thread_self());                                                  \
156   g_mutex_unlock(&self->priv->src_lock);                                   \
157   GST_LOG_OBJECT (self, "Release src STREAM lock from thread %p",          \
158         g_thread_self());                                                  \
159   } G_STMT_END
160
161 #define SRC_STREAM_WAIT(self) G_STMT_START {                               \
162   GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p",             \
163         g_thread_self());                                                  \
164   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));           \
165   GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p",        \
166         g_thread_self());                                                  \
167   } G_STMT_END
168
169 #define SRC_STREAM_BROADCAST_UNLOCKED(self) G_STMT_START {                 \
170     GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",           \
171         g_thread_self());                                                  \
172     if (self->priv->aggregate_id)                                          \
173       gst_clock_id_unschedule (self->priv->aggregate_id);                  \
174     g_cond_broadcast(&(self->priv->src_cond));                             \
175   } G_STMT_END
176
177 #define SRC_STREAM_BROADCAST(self) G_STMT_START {                          \
178     SRC_STREAM_LOCK (self);                                                \
179     SRC_STREAM_BROADCAST_UNLOCKED (self);                                  \
180     SRC_STREAM_UNLOCK (self);                                              \
181   } G_STMT_END
182
183 struct _GstAggregatorPadPrivate
184 {
185   gboolean pending_flush_start;
186   gboolean pending_flush_stop;
187   gboolean pending_eos;
188   gboolean flushing;
189
190   GMutex event_lock;
191   GCond event_cond;
192
193   GMutex stream_lock;
194 };
195
196 static gboolean
197 _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
198 {
199   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
200
201   aggpad->eos = FALSE;
202   aggpad->priv->flushing = FALSE;
203
204   if (klass->flush)
205     return klass->flush (aggpad, agg);
206
207   return TRUE;
208 }
209
210 /*************************************
211  * GstAggregator implementation  *
212  *************************************/
213 static GstElementClass *aggregator_parent_class = NULL;
214
215 struct _GstAggregatorPrivate
216 {
217   gint padcount;
218
219   /* Our state is >= PAUSED */
220   gboolean running;
221
222
223   gint seqnum;
224   gboolean send_stream_start;
225   gboolean send_segment;
226   gboolean flush_seeking;
227   gboolean pending_flush_start;
228   gboolean send_eos;
229   GstFlowReturn flow_return;
230
231   GstCaps *srccaps;
232
233   GstTagList *tags;
234   gboolean tags_changed;
235
236   /* Lock to prevent two src setcaps from happening at the same time  */
237   GMutex setcaps_lock;
238
239   gboolean latency_live;
240   GstClockTime latency_min;
241   GstClockTime latency_max;
242
243   GstClockTime sub_latency_min;
244   GstClockTime sub_latency_max;
245
246   /* aggregate */
247   GstClockID aggregate_id;
248   GMutex src_lock;
249   GCond src_cond;
250 };
251
252 typedef struct
253 {
254   GstEvent *event;
255   gboolean result;
256   gboolean flush;
257
258   gboolean one_actually_seeked;
259 } EventData;
260
261 #define DEFAULT_LATENCY        0
262
263 enum
264 {
265   PROP_0,
266   PROP_LATENCY,
267   PROP_LAST
268 };
269
270 /**
271  * gst_aggregator_iterate_sinkpads:
272  * @self: The #GstAggregator
273  * @func: The function to call.
274  * @user_data: The data to pass to @func.
275  *
276  * Iterate the sinkpads of aggregator to call a function on them.
277  *
278  * This method guarantees that @func will be called only once for each
279  * sink pad.
280  */
281 gboolean
282 gst_aggregator_iterate_sinkpads (GstAggregator * self,
283     GstAggregatorPadForeachFunc func, gpointer user_data)
284 {
285   gboolean result = FALSE;
286   GstIterator *iter;
287   gboolean done = FALSE;
288   GValue item = { 0, };
289   GList *seen_pads = NULL;
290
291   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
292
293   if (!iter)
294     goto no_iter;
295
296   while (!done) {
297     switch (gst_iterator_next (iter, &item)) {
298       case GST_ITERATOR_OK:
299       {
300         GstPad *pad;
301
302         pad = g_value_get_object (&item);
303
304         /* if already pushed, skip. FIXME, find something faster to tag pads */
305         if (pad == NULL || g_list_find (seen_pads, pad)) {
306           g_value_reset (&item);
307           break;
308         }
309
310         GST_LOG_OBJECT (self, "calling function on pad %s:%s",
311             GST_DEBUG_PAD_NAME (pad));
312         result = func (self, pad, user_data);
313
314         done = !result;
315
316         seen_pads = g_list_prepend (seen_pads, pad);
317
318         g_value_reset (&item);
319         break;
320       }
321       case GST_ITERATOR_RESYNC:
322         gst_iterator_resync (iter);
323         break;
324       case GST_ITERATOR_ERROR:
325         GST_ERROR_OBJECT (self,
326             "Could not iterate over internally linked pads");
327         done = TRUE;
328         break;
329       case GST_ITERATOR_DONE:
330         done = TRUE;
331         break;
332     }
333   }
334   g_value_unset (&item);
335   gst_iterator_free (iter);
336
337   if (seen_pads == NULL) {
338     GST_DEBUG_OBJECT (self, "No pad seen");
339     return FALSE;
340   }
341
342   g_list_free (seen_pads);
343
344 no_iter:
345   return result;
346 }
347
348 static inline gboolean
349 _check_all_pads_with_data_or_eos (GstAggregator * self,
350     GstAggregatorPad * aggpad, gpointer user_data)
351 {
352   if (aggpad->buffer || aggpad->eos) {
353     return TRUE;
354   }
355
356   GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
357
358   return FALSE;
359 }
360
361 static void
362 _reset_flow_values (GstAggregator * self)
363 {
364   self->priv->flow_return = GST_FLOW_FLUSHING;
365   self->priv->send_stream_start = TRUE;
366   self->priv->send_segment = TRUE;
367   gst_segment_init (&self->segment, GST_FORMAT_TIME);
368 }
369
370 static inline void
371 _push_mandatory_events (GstAggregator * self)
372 {
373   GstAggregatorPrivate *priv = self->priv;
374
375   if (g_atomic_int_get (&self->priv->send_stream_start)) {
376     gchar s_id[32];
377
378     GST_INFO_OBJECT (self, "pushing stream start");
379     /* stream-start (FIXME: create id based on input ids) */
380     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
381     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
382       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
383     }
384     g_atomic_int_set (&self->priv->send_stream_start, FALSE);
385   }
386
387   if (self->priv->srccaps) {
388
389     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
390         self->priv->srccaps);
391     if (!gst_pad_push_event (self->srcpad,
392             gst_event_new_caps (self->priv->srccaps))) {
393       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
394     }
395     gst_caps_unref (self->priv->srccaps);
396     self->priv->srccaps = NULL;
397   }
398
399   if (g_atomic_int_get (&self->priv->send_segment)) {
400     if (!g_atomic_int_get (&self->priv->flush_seeking)) {
401       GstEvent *segev = gst_event_new_segment (&self->segment);
402
403       if (!self->priv->seqnum)
404         self->priv->seqnum = gst_event_get_seqnum (segev);
405       else
406         gst_event_set_seqnum (segev, self->priv->seqnum);
407
408       GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
409       gst_pad_push_event (self->srcpad, segev);
410       g_atomic_int_set (&self->priv->send_segment, FALSE);
411     }
412   }
413
414   if (priv->tags && priv->tags_changed) {
415     gst_pad_push_event (self->srcpad,
416         gst_event_new_tag (gst_tag_list_ref (priv->tags)));
417     priv->tags_changed = FALSE;
418   }
419 }
420
421 /**
422  * gst_aggregator_set_src_caps:
423  * @self: The #GstAggregator
424  * @caps: The #GstCaps to set on the src pad.
425  *
426  * Sets the caps to be used on the src pad.
427  */
428 void
429 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
430 {
431   GST_AGGREGATOR_SETCAPS_LOCK (self);
432   gst_caps_replace (&self->priv->srccaps, caps);
433   _push_mandatory_events (self);
434   GST_AGGREGATOR_SETCAPS_UNLOCK (self);
435 }
436
437 /**
438  * gst_aggregator_finish_buffer:
439  * @self: The #GstAggregator
440  * @buffer: the #GstBuffer to push.
441  *
442  * This method will take care of sending mandatory events before pushing
443  * the provided buffer.
444  */
445 GstFlowReturn
446 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
447 {
448   _push_mandatory_events (self);
449
450   if (!g_atomic_int_get (&self->priv->flush_seeking) &&
451       gst_pad_is_active (self->srcpad)) {
452     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
453     return gst_pad_push (self->srcpad, buffer);
454   } else {
455     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
456         g_atomic_int_get (&self->priv->flush_seeking),
457         gst_pad_is_active (self->srcpad));
458     gst_buffer_unref (buffer);
459     return GST_FLOW_OK;
460   }
461 }
462
463 static void
464 _push_eos (GstAggregator * self)
465 {
466   GstEvent *event;
467   _push_mandatory_events (self);
468
469   self->priv->send_eos = FALSE;
470   event = gst_event_new_eos ();
471   gst_event_set_seqnum (event, self->priv->seqnum);
472   gst_pad_push_event (self->srcpad, event);
473 }
474
475 static GstClockTime
476 gst_aggregator_get_next_time (GstAggregator * self)
477 {
478   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
479
480   if (klass->get_next_time)
481     return klass->get_next_time (self);
482
483   return GST_CLOCK_TIME_NONE;
484 }
485
486 /* called with the src STREAM lock */
487 static gboolean
488 _wait_and_check (GstAggregator * self, gboolean * timeout)
489 {
490   GstClockTime latency_max, latency_min;
491   GstClockTime start;
492   gboolean live, res;
493
494   *timeout = FALSE;
495
496   SRC_STREAM_LOCK (self);
497
498   gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
499
500   if (gst_aggregator_iterate_sinkpads (self,
501           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
502           NULL)) {
503     GST_DEBUG_OBJECT (self, "all pads have data");
504     SRC_STREAM_UNLOCK (self);
505
506     return TRUE;
507   }
508
509   /* Before waiting, check if we're actually still running */
510   if (!self->priv->running || !self->priv->send_eos) {
511     SRC_STREAM_UNLOCK (self);
512
513     return FALSE;
514   }
515
516   start = gst_aggregator_get_next_time (self);
517
518   if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
519       || !GST_CLOCK_TIME_IS_VALID (start)) {
520     /* We wake up here when something happened, and below
521      * then check if we're ready now. If we return FALSE,
522      * we will be directly called again.
523      */
524     SRC_STREAM_WAIT (self);
525   } else {
526     GstClockTime base_time, time;
527     GstClock *clock;
528     GstClockReturn status;
529
530     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
531         GST_TIME_ARGS (start));
532
533     GST_OBJECT_LOCK (self);
534     base_time = GST_ELEMENT_CAST (self)->base_time;
535     clock = GST_ELEMENT_CLOCK (self);
536     if (clock)
537       gst_object_ref (clock);
538     GST_OBJECT_UNLOCK (self);
539
540     time = base_time + start;
541
542     if (GST_CLOCK_TIME_IS_VALID (latency_min)) {
543       time += latency_min;
544     } else {
545       time += self->latency;
546     }
547
548     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
549         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
550         " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT
551         " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time),
552         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
553         GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max),
554         GST_TIME_ARGS (latency_min),
555         GST_TIME_ARGS (gst_clock_get_time (clock)));
556
557     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
558     gst_object_unref (clock);
559     SRC_STREAM_UNLOCK (self);
560
561     status = gst_clock_id_wait (self->priv->aggregate_id, NULL);
562
563     SRC_STREAM_LOCK (self);
564     if (self->priv->aggregate_id) {
565       gst_clock_id_unref (self->priv->aggregate_id);
566       self->priv->aggregate_id = NULL;
567     }
568
569     GST_DEBUG_OBJECT (self, "clock returned %d", status);
570
571     /* we timed out */
572     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
573       SRC_STREAM_UNLOCK (self);
574       *timeout = TRUE;
575       return TRUE;
576     }
577   }
578
579   res = gst_aggregator_iterate_sinkpads (self,
580       (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL);
581   SRC_STREAM_UNLOCK (self);
582
583   return res;
584 }
585
586 static void
587 aggregate_func (GstAggregator * self)
588 {
589   GstAggregatorPrivate *priv = self->priv;
590   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
591   gboolean timeout = FALSE;
592
593   if (self->priv->running == FALSE) {
594     GST_DEBUG_OBJECT (self, "Not running anymore");
595     return;
596   }
597
598   GST_LOG_OBJECT (self, "Checking aggregate");
599   while (priv->send_eos && priv->running) {
600     if (!_wait_and_check (self, &timeout))
601       continue;
602
603     GST_TRACE_OBJECT (self, "Actually aggregating!");
604
605     priv->flow_return = klass->aggregate (self, timeout);
606
607     if (priv->flow_return == GST_FLOW_EOS) {
608       _push_eos (self);
609     }
610
611     if (priv->flow_return == GST_FLOW_FLUSHING &&
612         g_atomic_int_get (&priv->flush_seeking))
613       priv->flow_return = GST_FLOW_OK;
614
615     GST_LOG_OBJECT (self, "flow return is %s",
616         gst_flow_get_name (priv->flow_return));
617
618     if (priv->flow_return != GST_FLOW_OK)
619       break;
620   }
621 }
622
623 static gboolean
624 _start (GstAggregator * self)
625 {
626   self->priv->running = TRUE;
627   self->priv->send_stream_start = TRUE;
628   self->priv->send_segment = TRUE;
629   self->priv->send_eos = TRUE;
630   self->priv->srccaps = NULL;
631   self->priv->flow_return = GST_FLOW_OK;
632
633   return TRUE;
634 }
635
636 static gboolean
637 _check_pending_flush_stop (GstAggregatorPad * pad)
638 {
639   return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
640 }
641
642 static gboolean
643 _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
644 {
645   gboolean res = TRUE;
646
647   GST_INFO_OBJECT (self, "%s srcpad task",
648       flush_start ? "Pausing" : "Stopping");
649
650   self->priv->running = FALSE;
651   SRC_STREAM_BROADCAST (self);
652
653   if (flush_start) {
654     res = gst_pad_push_event (self->srcpad, flush_start);
655   }
656
657   gst_pad_stop_task (self->srcpad);
658   SRC_STREAM_BROADCAST (self);
659
660   return res;
661 }
662
663 static void
664 _start_srcpad_task (GstAggregator * self)
665 {
666   GST_INFO_OBJECT (self, "Starting srcpad task");
667
668   self->priv->running = TRUE;
669   gst_pad_start_task (GST_PAD (self->srcpad),
670       (GstTaskFunction) aggregate_func, self, NULL);
671 }
672
673 static GstFlowReturn
674 _flush (GstAggregator * self)
675 {
676   GstFlowReturn ret = GST_FLOW_OK;
677   GstAggregatorPrivate *priv = self->priv;
678   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
679
680   GST_DEBUG_OBJECT (self, "Flushing everything");
681   g_atomic_int_set (&priv->send_segment, TRUE);
682   g_atomic_int_set (&priv->flush_seeking, FALSE);
683   g_atomic_int_set (&priv->tags_changed, FALSE);
684   if (klass->flush)
685     ret = klass->flush (self);
686
687   return ret;
688 }
689
690 static gboolean
691 _all_flush_stop_received (GstAggregator * self)
692 {
693   GList *tmp;
694   GstAggregatorPad *tmppad;
695
696   GST_OBJECT_LOCK (self);
697   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
698     tmppad = (GstAggregatorPad *) tmp->data;
699
700     if (_check_pending_flush_stop (tmppad) == FALSE) {
701       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
702           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
703       GST_OBJECT_UNLOCK (self);
704       return FALSE;
705     }
706   }
707   GST_OBJECT_UNLOCK (self);
708
709   return TRUE;
710 }
711
712 static void
713 _flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
714 {
715   GstBuffer *tmpbuf;
716   GstAggregatorPrivate *priv = self->priv;
717   GstAggregatorPadPrivate *padpriv = aggpad->priv;
718
719   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
720   /*  Remove pad buffer and wake up the streaming thread */
721   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
722   gst_buffer_replace (&tmpbuf, NULL);
723   PAD_STREAM_LOCK (aggpad);
724   if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
725           TRUE, FALSE) == TRUE) {
726     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
727     g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
728   }
729
730   if (g_atomic_int_get (&priv->flush_seeking)) {
731     /* If flush_seeking we forward the first FLUSH_START */
732     if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
733             TRUE, FALSE) == TRUE) {
734
735       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
736       _stop_srcpad_task (self, event);
737       priv->flow_return = GST_FLOW_OK;
738
739       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
740       GST_PAD_STREAM_LOCK (self->srcpad);
741       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
742       event = NULL;
743     } else {
744       gst_event_unref (event);
745     }
746   } else {
747     gst_event_unref (event);
748   }
749   PAD_STREAM_UNLOCK (aggpad);
750
751   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
752   gst_buffer_replace (&tmpbuf, NULL);
753 }
754
755 /* GstAggregator vmethods default implementations */
756 static gboolean
757 _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
758 {
759   gboolean res = TRUE;
760   GstPad *pad = GST_PAD (aggpad);
761   GstAggregatorPrivate *priv = self->priv;
762
763   switch (GST_EVENT_TYPE (event)) {
764     case GST_EVENT_FLUSH_START:
765     {
766       _flush_start (self, aggpad, event);
767       /* We forward only in one case: right after flush_seeking */
768       event = NULL;
769       goto eat;
770     }
771     case GST_EVENT_FLUSH_STOP:
772     {
773       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
774
775       _aggpad_flush (aggpad, self);
776       if (g_atomic_int_get (&priv->flush_seeking)) {
777         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
778
779         if (g_atomic_int_get (&priv->flush_seeking)) {
780           if (_all_flush_stop_received (self)) {
781             /* That means we received FLUSH_STOP/FLUSH_STOP on
782              * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
783             _flush (self);
784             gst_pad_push_event (self->srcpad, event);
785             priv->send_eos = TRUE;
786             event = NULL;
787             SRC_STREAM_BROADCAST (self);
788
789             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
790             GST_PAD_STREAM_UNLOCK (self->srcpad);
791             _start_srcpad_task (self);
792           }
793         }
794       }
795
796       /* We never forward the event */
797       goto eat;
798     }
799     case GST_EVENT_EOS:
800     {
801       GST_DEBUG_OBJECT (aggpad, "EOS");
802
803       /* We still have a buffer, and we don't want the subclass to have to
804        * check for it. Mark pending_eos, eos will be set when steal_buffer is
805        * called
806        */
807       PAD_LOCK_EVENT (aggpad);
808       if (!aggpad->buffer) {
809         aggpad->eos = TRUE;
810       } else {
811         aggpad->priv->pending_eos = TRUE;
812       }
813       PAD_UNLOCK_EVENT (aggpad);
814
815       SRC_STREAM_BROADCAST (self);
816       goto eat;
817     }
818     case GST_EVENT_SEGMENT:
819     {
820       PAD_LOCK_EVENT (aggpad);
821       gst_event_copy_segment (event, &aggpad->segment);
822       self->priv->seqnum = gst_event_get_seqnum (event);
823       PAD_UNLOCK_EVENT (aggpad);
824       goto eat;
825     }
826     case GST_EVENT_STREAM_START:
827     {
828       goto eat;
829     }
830     case GST_EVENT_TAG:
831     {
832       GstTagList *tags;
833
834       gst_event_parse_tag (event, &tags);
835
836       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
837         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
838         gst_event_unref (event);
839         event = NULL;
840         goto eat;
841       }
842       break;
843     }
844     default:
845     {
846       break;
847     }
848   }
849
850   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
851   return gst_pad_event_default (pad, GST_OBJECT (self), event);
852
853 eat:
854   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
855   if (event)
856     gst_event_unref (event);
857
858   return res;
859 }
860
861 static gboolean
862 _stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
863 {
864   _aggpad_flush (pad, self);
865
866   return TRUE;
867 }
868
869 static gboolean
870 _stop (GstAggregator * agg)
871 {
872   _reset_flow_values (agg);
873
874   gst_aggregator_iterate_sinkpads (agg,
875       (GstAggregatorPadForeachFunc) _stop_pad, NULL);
876
877   if (agg->priv->tags)
878     gst_tag_list_unref (agg->priv->tags);
879   agg->priv->tags = NULL;
880
881   return TRUE;
882 }
883
884 /* GstElement vmethods implementations */
885 static GstStateChangeReturn
886 _change_state (GstElement * element, GstStateChange transition)
887 {
888   GstStateChangeReturn ret;
889   GstAggregator *self = GST_AGGREGATOR (element);
890   GstAggregatorClass *agg_class = GST_AGGREGATOR_GET_CLASS (self);
891
892
893   switch (transition) {
894     case GST_STATE_CHANGE_READY_TO_PAUSED:
895       agg_class->start (self);
896       break;
897     default:
898       break;
899   }
900
901   if ((ret =
902           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
903               transition)) == GST_STATE_CHANGE_FAILURE)
904     goto failure;
905
906
907   switch (transition) {
908     case GST_STATE_CHANGE_PAUSED_TO_READY:
909       agg_class->stop (self);
910       break;
911     default:
912       break;
913   }
914
915   return ret;
916
917 failure:
918   {
919     GST_ERROR_OBJECT (element, "parent failed state change");
920     return ret;
921   }
922 }
923
924 static void
925 _release_pad (GstElement * element, GstPad * pad)
926 {
927   GstAggregator *self = GST_AGGREGATOR (element);
928   GstBuffer *tmpbuf;
929
930   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
931
932   GST_INFO_OBJECT (pad, "Removing pad");
933
934   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
935   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
936   gst_buffer_replace (&tmpbuf, NULL);
937   gst_element_remove_pad (element, pad);
938
939   SRC_STREAM_BROADCAST (self);
940 }
941
942 static GstPad *
943 _request_new_pad (GstElement * element,
944     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
945 {
946   GstAggregator *self;
947   GstAggregatorPad *agg_pad;
948
949   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
950   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
951
952   self = GST_AGGREGATOR (element);
953
954   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
955     gint serial = 0;
956     gchar *name = NULL;
957
958     GST_OBJECT_LOCK (element);
959     if (req_name == NULL || strlen (req_name) < 6
960         || !g_str_has_prefix (req_name, "sink_")) {
961       /* no name given when requesting the pad, use next available int */
962       priv->padcount++;
963     } else {
964       /* parse serial number from requested padname */
965       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
966       if (serial >= priv->padcount)
967         priv->padcount = serial;
968     }
969
970     name = g_strdup_printf ("sink_%u", priv->padcount);
971     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
972         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
973     g_free (name);
974
975     GST_OBJECT_UNLOCK (element);
976
977   } else {
978     return NULL;
979   }
980
981   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
982
983   if (priv->running)
984     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
985
986   /* add the pad to the element */
987   gst_element_add_pad (element, GST_PAD (agg_pad));
988
989   return GST_PAD (agg_pad);
990 }
991
992 typedef struct
993 {
994   GstClockTime min, max;
995   gboolean live;
996 } LatencyData;
997
998 static gboolean
999 _latency_query (GstAggregator * self, GstPad * pad, gpointer user_data)
1000 {
1001   LatencyData *data = user_data;
1002   GstClockTime min, max;
1003   GstQuery *query;
1004   gboolean live, res;
1005
1006   query = gst_query_new_latency ();
1007   res = gst_pad_peer_query (pad, query);
1008
1009   if (res) {
1010     gst_query_parse_latency (query, &live, &min, &max);
1011
1012     GST_LOG_OBJECT (self, "%s: got latency live:%s min:%" G_GINT64_FORMAT
1013         " max:%" G_GINT64_FORMAT, GST_PAD_NAME (pad),
1014         live ? "true" : "false", min, max);
1015
1016     if (min != GST_CLOCK_TIME_NONE && min > data->min)
1017       data->min = min;
1018
1019     if (max != GST_CLOCK_TIME_NONE &&
1020         ((data->max != GST_CLOCK_TIME_NONE && max < data->max) ||
1021             (data->max == GST_CLOCK_TIME_NONE)))
1022       data->max = max;
1023
1024     data->live |= live;
1025   }
1026
1027   gst_query_unref (query);
1028
1029   return TRUE;
1030 }
1031
1032 /**
1033  * gst_aggregator_get_latency:
1034  * @self: a #GstAggregator
1035  * @live: (out) (allow-none): whether @self is live
1036  * @min_latency: (out) (allow-none): the configured minimum latency of @self
1037  * @max_latency: (out) (allow-none): the configured maximum latency of @self
1038  *
1039  * Retreives the latency values reported by @self in response to the latency
1040  * query.
1041  *
1042  * Typically only called by subclasses.
1043  */
1044 void
1045 gst_aggregator_get_latency (GstAggregator * self, gboolean * live,
1046     GstClockTime * min_latency, GstClockTime * max_latency)
1047 {
1048   GstClockTime min, max;
1049
1050   g_return_if_fail (GST_IS_AGGREGATOR (self));
1051
1052   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1053   min = self->priv->latency_min;
1054   max = self->priv->latency_max;
1055
1056   min += self->priv->sub_latency_min;
1057   if (GST_CLOCK_TIME_IS_VALID (max)
1058       && GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max))
1059     max += self->priv->sub_latency_max;
1060
1061   if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
1062     min += self->latency;
1063     if (GST_CLOCK_TIME_IS_VALID (max))
1064       max += self->latency;
1065   }
1066
1067   if (live)
1068     *live = self->priv->latency_live;
1069   if (min_latency)
1070     *min_latency = min;
1071   if (max_latency)
1072     *max_latency = max;
1073 }
1074
1075 static gboolean
1076 gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
1077 {
1078   LatencyData data;
1079
1080   data.min = 0;
1081   data.max = GST_CLOCK_TIME_NONE;
1082   data.live = FALSE;
1083
1084   /* query upstream's latency */
1085   SRC_STREAM_LOCK (self);
1086   gst_aggregator_iterate_sinkpads (self,
1087       (GstAggregatorPadForeachFunc) _latency_query, &data);
1088   SRC_STREAM_UNLOCK (self);
1089
1090   if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) &&
1091       self->latency > data.max) {
1092     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1093         ("%s", "Latency too big"),
1094         ("The requested latency value is too big for the current pipeline.  "
1095             "Limiting to %" G_GINT64_FORMAT, data.max));
1096     self->latency = data.max;
1097   }
1098
1099   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (data.min))) {
1100     GST_WARNING_OBJECT (self, "Invalid minimum latency, using 0");
1101     data.min = 0;
1102   }
1103
1104   if (G_UNLIKELY (data.min > data.max)) {
1105     GST_WARNING_OBJECT (self, "Minimum latency is greater than maximum latency "
1106         "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT "). "
1107         "Clamping it at the maximum latency", data.min, data.max);
1108     data.min = data.max;
1109   }
1110
1111   self->priv->latency_live = data.live;
1112   self->priv->latency_min = data.min;
1113   self->priv->latency_max = data.max;
1114
1115   /* add our own */
1116   if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
1117     if (GST_CLOCK_TIME_IS_VALID (data.min))
1118       data.min += self->latency;
1119     if (GST_CLOCK_TIME_IS_VALID (data.max))
1120       data.max += self->latency;
1121   }
1122
1123   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_min)
1124       && GST_CLOCK_TIME_IS_VALID (data.min))
1125     data.min += self->priv->sub_latency_min;
1126   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1127       && GST_CLOCK_TIME_IS_VALID (data.max))
1128     data.max += self->priv->sub_latency_max;
1129
1130   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1131       " max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min,
1132       data.max);
1133
1134   gst_query_set_latency (query, data.live, data.min, data.max);
1135
1136   return TRUE;
1137 }
1138
1139 static gboolean
1140 _send_event (GstElement * element, GstEvent * event)
1141 {
1142   GstAggregator *self = GST_AGGREGATOR (element);
1143
1144   GST_STATE_LOCK (element);
1145   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1146       GST_STATE (element) < GST_STATE_PAUSED) {
1147     gdouble rate;
1148     GstFormat fmt;
1149     GstSeekFlags flags;
1150     GstSeekType start_type, stop_type;
1151     gint64 start, stop;
1152
1153     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1154         &start, &stop_type, &stop);
1155     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1156         stop_type, stop, NULL);
1157
1158     self->priv->seqnum = gst_event_get_seqnum (event);
1159     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1160   }
1161   GST_STATE_UNLOCK (element);
1162
1163
1164   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1165       event);
1166 }
1167
1168 static gboolean
1169 _src_query (GstAggregator * self, GstQuery * query)
1170 {
1171   gboolean res = TRUE;
1172
1173   switch (GST_QUERY_TYPE (query)) {
1174     case GST_QUERY_SEEKING:
1175     {
1176       GstFormat format;
1177
1178       /* don't pass it along as some (file)sink might claim it does
1179        * whereas with a collectpads in between that will not likely work */
1180       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1181       gst_query_set_seeking (query, format, FALSE, 0, -1);
1182       res = TRUE;
1183
1184       goto discard;
1185     }
1186     case GST_QUERY_LATENCY:
1187     {
1188       gboolean ret;
1189
1190       ret = gst_aggregator_query_latency (self, query);
1191       /* Wake up the src thread again, due to changed latencies
1192        * or changed live-ness we might have to adjust if we wait
1193        * on a deadline at all and how long.
1194        */
1195       SRC_STREAM_BROADCAST (self);
1196       return ret;
1197     }
1198     default:
1199       break;
1200   }
1201
1202   return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1203
1204 discard:
1205   return res;
1206 }
1207
1208 static gboolean
1209 event_forward_func (GstPad * pad, EventData * evdata)
1210 {
1211   gboolean ret = TRUE;
1212   GstPad *peer = gst_pad_get_peer (pad);
1213   GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv;
1214
1215   if (peer) {
1216     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1217     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1218     gst_object_unref (peer);
1219   }
1220
1221   if (ret == FALSE) {
1222     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
1223       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1224
1225     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1226       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1227
1228       if (gst_pad_query (peer, seeking)) {
1229         gboolean seekable;
1230
1231         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1232
1233         if (seekable == FALSE) {
1234           GST_INFO_OBJECT (pad,
1235               "Source not seekable, We failed but it does not matter!");
1236
1237           ret = TRUE;
1238         }
1239       } else {
1240         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1241       }
1242     }
1243
1244     if (evdata->flush) {
1245       padpriv->pending_flush_start = FALSE;
1246       padpriv->pending_flush_stop = FALSE;
1247     }
1248   } else {
1249     evdata->one_actually_seeked = TRUE;
1250   }
1251
1252   evdata->result &= ret;
1253
1254   /* Always send to all pads */
1255   return FALSE;
1256 }
1257
1258 static gboolean
1259 _set_flush_pending (GstAggregator * self, GstAggregatorPad * pad,
1260     gpointer udata)
1261 {
1262   pad->priv->pending_flush_start = TRUE;
1263   pad->priv->pending_flush_stop = FALSE;
1264
1265   return TRUE;
1266 }
1267
1268 static EventData
1269 _forward_event_to_all_sinkpads (GstAggregator * self, GstEvent * event,
1270     gboolean flush)
1271 {
1272   EventData evdata;
1273
1274   evdata.event = event;
1275   evdata.result = TRUE;
1276   evdata.flush = flush;
1277   evdata.one_actually_seeked = FALSE;
1278
1279   /* We first need to set all pads as flushing in a first pass
1280    * as flush_start flush_stop is sometimes sent synchronously
1281    * while we send the seek event */
1282   if (flush)
1283     gst_aggregator_iterate_sinkpads (self,
1284         (GstAggregatorPadForeachFunc) _set_flush_pending, NULL);
1285   gst_pad_forward (self->srcpad, (GstPadForwardFunction) event_forward_func,
1286       &evdata);
1287
1288   gst_event_unref (event);
1289
1290   return evdata;
1291 }
1292
1293 static gboolean
1294 _do_seek (GstAggregator * self, GstEvent * event)
1295 {
1296   gdouble rate;
1297   GstFormat fmt;
1298   GstSeekFlags flags;
1299   GstSeekType start_type, stop_type;
1300   gint64 start, stop;
1301   gboolean flush;
1302   EventData evdata;
1303   GstAggregatorPrivate *priv = self->priv;
1304
1305   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1306       &start, &stop_type, &stop);
1307
1308   GST_INFO_OBJECT (self, "starting SEEK");
1309
1310   flush = flags & GST_SEEK_FLAG_FLUSH;
1311
1312   if (flush) {
1313     g_atomic_int_set (&priv->pending_flush_start, TRUE);
1314     g_atomic_int_set (&priv->flush_seeking, TRUE);
1315   }
1316
1317   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1318       stop_type, stop, NULL);
1319
1320   /* forward the seek upstream */
1321   evdata = _forward_event_to_all_sinkpads (self, event, flush);
1322   event = NULL;
1323
1324   if (!evdata.result || !evdata.one_actually_seeked) {
1325     g_atomic_int_set (&priv->flush_seeking, FALSE);
1326     g_atomic_int_set (&priv->pending_flush_start, FALSE);
1327   }
1328
1329   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1330
1331   return evdata.result;
1332 }
1333
1334 static gboolean
1335 _src_event (GstAggregator * self, GstEvent * event)
1336 {
1337   EventData evdata;
1338   gboolean res = TRUE;
1339
1340   switch (GST_EVENT_TYPE (event)) {
1341     case GST_EVENT_SEEK:
1342     {
1343       gst_event_ref (event);
1344       res = _do_seek (self, event);
1345       gst_event_unref (event);
1346       event = NULL;
1347       goto done;
1348     }
1349     case GST_EVENT_NAVIGATION:
1350     {
1351       /* navigation is rather pointless. */
1352       res = FALSE;
1353       gst_event_unref (event);
1354       goto done;
1355     }
1356     default:
1357     {
1358       break;
1359     }
1360   }
1361
1362   evdata = _forward_event_to_all_sinkpads (self, event, FALSE);
1363   res = evdata.result;
1364
1365 done:
1366   return res;
1367 }
1368
1369 static gboolean
1370 src_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
1371 {
1372   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1373
1374   return klass->src_event (GST_AGGREGATOR (parent), event);
1375 }
1376
1377 static gboolean
1378 src_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
1379 {
1380   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1381
1382   return klass->src_query (GST_AGGREGATOR (parent), query);
1383 }
1384
1385 static gboolean
1386 src_activate_mode (GstPad * pad,
1387     GstObject * parent, GstPadMode mode, gboolean active)
1388 {
1389   GstAggregator *self = GST_AGGREGATOR (parent);
1390   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1391
1392   if (klass->src_activate) {
1393     if (klass->src_activate (self, mode, active) == FALSE) {
1394       return FALSE;
1395     }
1396   }
1397
1398   if (active == TRUE) {
1399     switch (mode) {
1400       case GST_PAD_MODE_PUSH:
1401       {
1402         GST_INFO_OBJECT (pad, "Activating pad!");
1403         _start_srcpad_task (self);
1404         return TRUE;
1405       }
1406       default:
1407       {
1408         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1409         return FALSE;
1410       }
1411     }
1412   }
1413
1414   /* deactivating */
1415   GST_INFO_OBJECT (self, "Deactivating srcpad");
1416   _stop_srcpad_task (self, FALSE);
1417
1418   return TRUE;
1419 }
1420
1421 static gboolean
1422 _sink_query (GstAggregator * self, GstAggregatorPad * aggpad, GstQuery * query)
1423 {
1424   GstPad *pad = GST_PAD (aggpad);
1425
1426   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1427 }
1428
1429 static void
1430 gst_aggregator_finalize (GObject * object)
1431 {
1432   GstAggregator *self = (GstAggregator *) object;
1433
1434   gst_object_unref (self->clock);
1435   g_mutex_clear (&self->priv->setcaps_lock);
1436   g_mutex_clear (&self->priv->src_lock);
1437   g_cond_clear (&self->priv->src_cond);
1438
1439   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1440 }
1441
1442 static void
1443 gst_aggregator_dispose (GObject * object)
1444 {
1445   G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
1446 }
1447
1448 /*
1449  * gst_aggregator_set_latency_property:
1450  * @agg: a #GstAggregator
1451  * @latency: the new latency value.
1452  *
1453  * Sets the new latency value to @latency. This value is used to limit the
1454  * amount of time a pad waits for data to appear before considering the pad
1455  * as unresponsive.
1456  */
1457 static void
1458 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1459 {
1460   gboolean changed;
1461
1462   g_return_if_fail (GST_IS_AGGREGATOR (self));
1463
1464   GST_OBJECT_LOCK (self);
1465
1466   if (self->priv->latency_live && self->priv->latency_max != 0 &&
1467       GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) {
1468     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1469         ("%s", "Latency too big"),
1470         ("The requested latency value is too big for the latency in the "
1471             "current pipeline.  Limiting to %" G_GINT64_FORMAT,
1472             self->priv->latency_max));
1473     latency = self->priv->latency_max;
1474   }
1475
1476   changed = self->latency != latency;
1477   self->latency = latency;
1478   GST_OBJECT_UNLOCK (self);
1479
1480   if (changed)
1481     gst_element_post_message (GST_ELEMENT_CAST (self),
1482         gst_message_new_latency (GST_OBJECT_CAST (self)));
1483 }
1484
1485 /*
1486  * gst_aggregator_get_latency_property:
1487  * @agg: a #GstAggregator
1488  *
1489  * Gets the latency value. See gst_aggregator_set_latency for
1490  * more details.
1491  *
1492  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
1493  * before a pad is deemed unresponsive. A value of -1 means an
1494  * unlimited time.
1495  */
1496 static gint64
1497 gst_aggregator_get_latency_property (GstAggregator * agg)
1498 {
1499   gint64 res;
1500
1501   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
1502
1503   GST_OBJECT_LOCK (agg);
1504   res = agg->latency;
1505   GST_OBJECT_UNLOCK (agg);
1506
1507   return res;
1508 }
1509
1510 static void
1511 gst_aggregator_set_property (GObject * object, guint prop_id,
1512     const GValue * value, GParamSpec * pspec)
1513 {
1514   GstAggregator *agg = GST_AGGREGATOR (object);
1515
1516   switch (prop_id) {
1517     case PROP_LATENCY:
1518       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
1519       break;
1520     default:
1521       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1522       break;
1523   }
1524 }
1525
1526 static void
1527 gst_aggregator_get_property (GObject * object, guint prop_id,
1528     GValue * value, GParamSpec * pspec)
1529 {
1530   GstAggregator *agg = GST_AGGREGATOR (object);
1531
1532   switch (prop_id) {
1533     case PROP_LATENCY:
1534       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
1535       break;
1536     default:
1537       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1538       break;
1539   }
1540 }
1541
1542 /* GObject vmethods implementations */
1543 static void
1544 gst_aggregator_class_init (GstAggregatorClass * klass)
1545 {
1546   GObjectClass *gobject_class = (GObjectClass *) klass;
1547   GstElementClass *gstelement_class = (GstElementClass *) klass;
1548
1549   aggregator_parent_class = g_type_class_peek_parent (klass);
1550   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1551
1552   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1553       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1554
1555   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1556   klass->start = _start;
1557   klass->stop = _stop;
1558
1559   klass->sink_event = _sink_event;
1560   klass->sink_query = _sink_query;
1561
1562   klass->src_event = _src_event;
1563   klass->src_query = _src_query;
1564
1565   gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (_request_new_pad);
1566   gstelement_class->send_event = GST_DEBUG_FUNCPTR (_send_event);
1567   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad);
1568   gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state);
1569
1570   gobject_class->set_property = gst_aggregator_set_property;
1571   gobject_class->get_property = gst_aggregator_get_property;
1572   gobject_class->finalize = gst_aggregator_finalize;
1573   gobject_class->dispose = gst_aggregator_dispose;
1574
1575   g_object_class_install_property (gobject_class, PROP_LATENCY,
1576       g_param_spec_int64 ("latency", "Buffer latency",
1577           "Additional latency in live mode to allow upstream "
1578           "to take longer to produce buffers for the current "
1579           "position", 0,
1580           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
1581           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1582 }
1583
1584 static void
1585 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1586 {
1587   GstPadTemplate *pad_template;
1588   GstAggregatorPrivate *priv;
1589
1590   g_return_if_fail (klass->aggregate != NULL);
1591
1592   self->priv =
1593       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1594       GstAggregatorPrivate);
1595
1596   priv = self->priv;
1597
1598   pad_template =
1599       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1600   g_return_if_fail (pad_template != NULL);
1601
1602   priv->padcount = -1;
1603   priv->tags_changed = FALSE;
1604
1605   self->priv->latency_live = FALSE;
1606   self->priv->latency_min = self->priv->sub_latency_min = 0;
1607   self->priv->latency_max = self->priv->sub_latency_max = GST_CLOCK_TIME_NONE;
1608   _reset_flow_values (self);
1609
1610   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1611
1612   gst_pad_set_event_function (self->srcpad,
1613       GST_DEBUG_FUNCPTR ((GstPadEventFunction) src_event_func));
1614   gst_pad_set_query_function (self->srcpad,
1615       GST_DEBUG_FUNCPTR ((GstPadQueryFunction) src_query_func));
1616   gst_pad_set_activatemode_function (self->srcpad,
1617       GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) src_activate_mode));
1618
1619   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1620
1621   self->clock = gst_system_clock_obtain ();
1622   self->latency = 0;
1623
1624   g_mutex_init (&self->priv->setcaps_lock);
1625   g_mutex_init (&self->priv->src_lock);
1626   g_cond_init (&self->priv->src_cond);
1627 }
1628
1629 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1630  * method to get to the padtemplates */
1631 GType
1632 gst_aggregator_get_type (void)
1633 {
1634   static volatile gsize type = 0;
1635
1636   if (g_once_init_enter (&type)) {
1637     GType _type;
1638     static const GTypeInfo info = {
1639       sizeof (GstAggregatorClass),
1640       NULL,
1641       NULL,
1642       (GClassInitFunc) gst_aggregator_class_init,
1643       NULL,
1644       NULL,
1645       sizeof (GstAggregator),
1646       0,
1647       (GInstanceInitFunc) gst_aggregator_init,
1648     };
1649
1650     _type = g_type_register_static (GST_TYPE_ELEMENT,
1651         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1652     g_once_init_leave (&type, _type);
1653   }
1654   return type;
1655 }
1656
1657 static GstFlowReturn
1658 _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1659 {
1660   GstBuffer *actual_buf = buffer;
1661   GstAggregator *self = GST_AGGREGATOR (object);
1662   GstAggregatorPrivate *priv = self->priv;
1663   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1664   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1665
1666   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1667
1668   PAD_STREAM_LOCK (aggpad);
1669
1670   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1671     goto flushing;
1672
1673   if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
1674     goto eos;
1675
1676   PAD_LOCK_EVENT (aggpad);
1677
1678   while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1679     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1680     PAD_WAIT_EVENT (aggpad);
1681   }
1682   PAD_UNLOCK_EVENT (aggpad);
1683
1684   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1685     goto flushing;
1686
1687   if (aggclass->clip) {
1688     aggclass->clip (self, aggpad, buffer, &actual_buf);
1689   }
1690
1691   PAD_LOCK_EVENT (aggpad);
1692   if (aggpad->buffer)
1693     gst_buffer_unref (aggpad->buffer);
1694   aggpad->buffer = actual_buf;
1695   PAD_UNLOCK_EVENT (aggpad);
1696   PAD_STREAM_UNLOCK (aggpad);
1697
1698   SRC_STREAM_LOCK (self);
1699   if (gst_aggregator_iterate_sinkpads (self,
1700           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
1701     SRC_STREAM_BROADCAST_UNLOCKED (self);
1702   SRC_STREAM_UNLOCK (self);
1703
1704   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1705
1706   return priv->flow_return;
1707
1708 flushing:
1709   PAD_STREAM_UNLOCK (aggpad);
1710
1711   gst_buffer_unref (buffer);
1712   GST_DEBUG_OBJECT (aggpad, "We are flushing");
1713
1714   return GST_FLOW_FLUSHING;
1715
1716 eos:
1717   PAD_STREAM_UNLOCK (aggpad);
1718
1719   gst_buffer_unref (buffer);
1720   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1721
1722   return GST_FLOW_EOS;
1723 }
1724
1725 static gboolean
1726 pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
1727 {
1728   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1729   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1730
1731   if (GST_QUERY_IS_SERIALIZED (query)) {
1732     PAD_LOCK_EVENT (aggpad);
1733
1734     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
1735       PAD_UNLOCK_EVENT (aggpad);
1736       goto flushing;
1737     }
1738
1739     while (aggpad->buffer
1740         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1741       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1742       PAD_WAIT_EVENT (aggpad);
1743     }
1744     PAD_UNLOCK_EVENT (aggpad);
1745
1746     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1747       goto flushing;
1748   }
1749
1750   return klass->sink_query (GST_AGGREGATOR (parent),
1751       GST_AGGREGATOR_PAD (pad), query);
1752
1753 flushing:
1754   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
1755   return FALSE;
1756 }
1757
1758 static gboolean
1759 pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
1760 {
1761   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1762   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1763
1764   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
1765       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
1766     PAD_LOCK_EVENT (aggpad);
1767
1768     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1769         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
1770       PAD_UNLOCK_EVENT (aggpad);
1771       goto flushing;
1772     }
1773
1774     while (aggpad->buffer
1775         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1776       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1777       PAD_WAIT_EVENT (aggpad);
1778     }
1779     PAD_UNLOCK_EVENT (aggpad);
1780
1781     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1782         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
1783       goto flushing;
1784   }
1785
1786   return klass->sink_event (GST_AGGREGATOR (parent),
1787       GST_AGGREGATOR_PAD (pad), event);
1788
1789 flushing:
1790   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
1791   if (GST_EVENT_IS_STICKY (event))
1792     gst_pad_store_sticky_event (pad, event);
1793   gst_event_unref (event);
1794   return FALSE;
1795 }
1796
1797 static gboolean
1798 pad_activate_mode_func (GstPad * pad,
1799     GstObject * parent, GstPadMode mode, gboolean active)
1800 {
1801   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1802
1803   if (active == FALSE) {
1804     PAD_LOCK_EVENT (aggpad);
1805     g_atomic_int_set (&aggpad->priv->flushing, TRUE);
1806     gst_buffer_replace (&aggpad->buffer, NULL);
1807     PAD_BROADCAST_EVENT (aggpad);
1808     PAD_UNLOCK_EVENT (aggpad);
1809   } else {
1810     g_atomic_int_set (&aggpad->priv->flushing, FALSE);
1811     PAD_LOCK_EVENT (aggpad);
1812     PAD_BROADCAST_EVENT (aggpad);
1813     PAD_UNLOCK_EVENT (aggpad);
1814   }
1815
1816   return TRUE;
1817 }
1818
1819 /***********************************
1820  * GstAggregatorPad implementation  *
1821  ************************************/
1822 static GstPadClass *aggregator_pad_parent_class = NULL;
1823 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1824
1825 static void
1826 _pad_constructed (GObject * object)
1827 {
1828   GstPad *pad = GST_PAD (object);
1829
1830   gst_pad_set_chain_function (pad,
1831       GST_DEBUG_FUNCPTR ((GstPadChainFunction) _chain));
1832   gst_pad_set_event_function (pad,
1833       GST_DEBUG_FUNCPTR ((GstPadEventFunction) pad_event_func));
1834   gst_pad_set_query_function (pad,
1835       GST_DEBUG_FUNCPTR ((GstPadQueryFunction) pad_query_func));
1836   gst_pad_set_activatemode_function (pad,
1837       GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) pad_activate_mode_func));
1838 }
1839
1840 static void
1841 gst_aggregator_pad_finalize (GObject * object)
1842 {
1843   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1844
1845   g_mutex_clear (&pad->priv->event_lock);
1846   g_cond_clear (&pad->priv->event_cond);
1847   g_mutex_clear (&pad->priv->stream_lock);
1848
1849   G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object);
1850 }
1851
1852 static void
1853 gst_aggregator_pad_dispose (GObject * object)
1854 {
1855   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1856   GstBuffer *buf;
1857
1858   buf = gst_aggregator_pad_steal_buffer (pad);
1859   if (buf)
1860     gst_buffer_unref (buf);
1861
1862   G_OBJECT_CLASS (aggregator_pad_parent_class)->dispose (object);
1863 }
1864
1865 static void
1866 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
1867 {
1868   GObjectClass *gobject_class = (GObjectClass *) klass;
1869
1870   aggregator_pad_parent_class = g_type_class_peek_parent (klass);
1871   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
1872
1873   gobject_class->constructed = GST_DEBUG_FUNCPTR (_pad_constructed);
1874   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_aggregator_pad_finalize);
1875   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_aggregator_pad_dispose);
1876 }
1877
1878 static void
1879 gst_aggregator_pad_init (GstAggregatorPad * pad)
1880 {
1881   pad->priv =
1882       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
1883       GstAggregatorPadPrivate);
1884
1885   pad->buffer = NULL;
1886   g_mutex_init (&pad->priv->event_lock);
1887   g_cond_init (&pad->priv->event_cond);
1888
1889   g_mutex_init (&pad->priv->stream_lock);
1890 }
1891
1892 /**
1893  * gst_aggregator_pad_steal_buffer:
1894  * @pad: the pad to get buffer from
1895  *
1896  * Steal the ref to the buffer currently queued in @pad.
1897  *
1898  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
1899  *   queued. You should unref the buffer after usage.
1900  */
1901 GstBuffer *
1902 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
1903 {
1904   GstBuffer *buffer = NULL;
1905
1906   PAD_LOCK_EVENT (pad);
1907   if (pad->buffer) {
1908     GST_TRACE_OBJECT (pad, "Consuming buffer");
1909     buffer = pad->buffer;
1910     pad->buffer = NULL;
1911     if (pad->priv->pending_eos) {
1912       pad->priv->pending_eos = FALSE;
1913       pad->eos = TRUE;
1914     }
1915     PAD_BROADCAST_EVENT (pad);
1916     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
1917   }
1918   PAD_UNLOCK_EVENT (pad);
1919
1920   return buffer;
1921 }
1922
1923 /**
1924  * gst_aggregator_pad_get_buffer:
1925  * @pad: the pad to get buffer from
1926  *
1927  * Returns: (transfer full): A reference to the buffer in @pad or
1928  * NULL if no buffer was queued. You should unref the buffer after
1929  * usage.
1930  */
1931 GstBuffer *
1932 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
1933 {
1934   GstBuffer *buffer = NULL;
1935
1936   PAD_LOCK_EVENT (pad);
1937   if (pad->buffer)
1938     buffer = gst_buffer_ref (pad->buffer);
1939   PAD_UNLOCK_EVENT (pad);
1940
1941   return buffer;
1942 }
1943
1944 /**
1945  * gst_aggregator_merge_tags:
1946  * @self: a #GstAggregator
1947  * @tags: a #GstTagList to merge
1948  * @mode: the #GstTagMergeMode to use
1949  *
1950  * Adds tags to so-called pending tags, which will be processed
1951  * before pushing out data downstream.
1952  *
1953  * Note that this is provided for convenience, and the subclass is
1954  * not required to use this and can still do tag handling on its own.
1955  *
1956  * MT safe.
1957  */
1958 void
1959 gst_aggregator_merge_tags (GstAggregator * self,
1960     const GstTagList * tags, GstTagMergeMode mode)
1961 {
1962   GstTagList *otags;
1963
1964   g_return_if_fail (GST_IS_AGGREGATOR (self));
1965   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
1966
1967   /* FIXME Check if we can use OBJECT lock here! */
1968   GST_OBJECT_LOCK (self);
1969   if (tags)
1970     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
1971   otags = self->priv->tags;
1972   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
1973   if (otags)
1974     gst_tag_list_unref (otags);
1975   self->priv->tags_changed = TRUE;
1976   GST_OBJECT_UNLOCK (self);
1977 }
1978
1979 /**
1980  * gst_aggregator_set_latency:
1981  * @self: a #GstAggregator
1982  * @min_latency: minimum latency
1983  * @max_latency: maximum latency
1984  *
1985  * Lets #GstAggregator sub-classes tell the baseclass what their internal
1986  * latency is. Will also post a LATENCY message on the bus so the pipeline
1987  * can reconfigure its global latency.
1988  */
1989 void
1990 gst_aggregator_set_latency (GstAggregator * self,
1991     GstClockTime min_latency, GstClockTime max_latency)
1992 {
1993   g_return_if_fail (GST_IS_AGGREGATOR (self));
1994   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
1995   g_return_if_fail (max_latency >= min_latency);
1996
1997   GST_OBJECT_LOCK (self);
1998   self->priv->sub_latency_min = min_latency;
1999   self->priv->sub_latency_max = max_latency;
2000   GST_OBJECT_UNLOCK (self);
2001
2002   gst_element_post_message (GST_ELEMENT_CAST (self),
2003       gst_message_new_latency (GST_OBJECT_CAST (self)));
2004 }