72cd8af073b2c19b4dd79b08061ed95158d95385
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Implementers should at least implement
33  *    the aggregate () vmethod.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72 static void gst_aggregator_set_latency_property (GstAggregator * agg,
73     gint64 latency);
74 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
75
76
77 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
78 #define GST_CAT_DEFAULT aggregator_debug
79
80 /* GstAggregatorPad definitions */
81 #define PAD_LOCK_EVENT(pad)   G_STMT_START {                            \
82   GST_LOG_OBJECT (pad, "Taking EVENT lock from thread %p",              \
83         g_thread_self());                                               \
84   g_mutex_lock(&pad->priv->event_lock);                                 \
85   GST_LOG_OBJECT (pad, "Took EVENT lock from thread %p",              \
86         g_thread_self());                                               \
87   } G_STMT_END
88
89 #define PAD_UNLOCK_EVENT(pad)  G_STMT_START {                           \
90   GST_LOG_OBJECT (pad, "Releasing EVENT lock from thread %p",          \
91         g_thread_self());                                               \
92   g_mutex_unlock(&pad->priv->event_lock);                               \
93   GST_LOG_OBJECT (pad, "Release EVENT lock from thread %p",          \
94         g_thread_self());                                               \
95   } G_STMT_END
96
97
98 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
99   GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p",               \
100         g_thread_self());                                               \
101   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),       \
102       &(pad->priv->event_lock));                                        \
103   GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p",               \
104         g_thread_self());                                               \
105   } G_STMT_END
106
107 #define PAD_BROADCAST_EVENT(pad) {                                          \
108   GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p",               \
109         g_thread_self());                                                   \
110   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \
111   }
112
113 #define GST_AGGREGATOR_SETCAPS_LOCK(self)   G_STMT_START {        \
114   GST_LOG_OBJECT (self, "Taking SETCAPS lock from thread %p",   \
115         g_thread_self());                                         \
116   g_mutex_lock(&self->priv->setcaps_lock);                         \
117   GST_LOG_OBJECT (self, "Took SETCAPS lock from thread %p",     \
118         g_thread_self());                                         \
119   } G_STMT_END
120
121 #define GST_AGGREGATOR_SETCAPS_UNLOCK(self)   G_STMT_START {        \
122   GST_LOG_OBJECT (self, "Releasing SETCAPS lock from thread %p",  \
123         g_thread_self());                                           \
124   g_mutex_unlock(&self->priv->setcaps_lock);                         \
125   GST_LOG_OBJECT (self, "Took SETCAPS lock from thread %p",       \
126         g_thread_self());                                           \
127   } G_STMT_END
128
129 #define PAD_STREAM_LOCK(pad)   G_STMT_START {                            \
130   GST_LOG_OBJECT (pad, "Taking lock from thread %p",              \
131         g_thread_self());                                               \
132   g_mutex_lock(&pad->priv->stream_lock);                                 \
133   GST_LOG_OBJECT (pad, "Took lock from thread %p",              \
134         g_thread_self());                                               \
135   } G_STMT_END
136
137 #define PAD_STREAM_UNLOCK(pad)  G_STMT_START {                           \
138   GST_LOG_OBJECT (pad, "Releasing lock from thread %p",          \
139         g_thread_self());                                               \
140   g_mutex_unlock(&pad->priv->stream_lock);                               \
141   GST_LOG_OBJECT (pad, "Release lock from thread %p",          \
142         g_thread_self());                                               \
143   } G_STMT_END
144
145 #define SRC_STREAM_LOCK(self)   G_STMT_START {                             \
146   GST_LOG_OBJECT (self, "Taking src STREAM lock from thread %p",           \
147         g_thread_self());                                                  \
148   g_mutex_lock(&self->priv->src_lock);                                     \
149   GST_LOG_OBJECT (self, "Took src STREAM lock from thread %p",             \
150         g_thread_self());                                                  \
151   } G_STMT_END
152
153 #define SRC_STREAM_UNLOCK(self)  G_STMT_START {                            \
154   GST_LOG_OBJECT (self, "Releasing src STREAM lock from thread %p",        \
155         g_thread_self());                                                  \
156   g_mutex_unlock(&self->priv->src_lock);                                   \
157   GST_LOG_OBJECT (self, "Release src STREAM lock from thread %p",          \
158         g_thread_self());                                                  \
159   } G_STMT_END
160
161 #define SRC_STREAM_WAIT(self) G_STMT_START {                               \
162   GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p",             \
163         g_thread_self());                                                  \
164   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));           \
165   GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p",        \
166         g_thread_self());                                                  \
167   } G_STMT_END
168
169 #define SRC_STREAM_BROADCAST_UNLOCKED(self) G_STMT_START {                 \
170     GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",           \
171         g_thread_self());                                                  \
172     if (self->priv->aggregate_id)                                          \
173       gst_clock_id_unschedule (self->priv->aggregate_id);                  \
174     g_cond_broadcast(&(self->priv->src_cond));                             \
175   } G_STMT_END
176
177 #define SRC_STREAM_BROADCAST(self) G_STMT_START {                          \
178     SRC_STREAM_LOCK (self);                                                \
179     SRC_STREAM_BROADCAST_UNLOCKED (self);                                  \
180     SRC_STREAM_UNLOCK (self);                                              \
181   } G_STMT_END
182
183 struct _GstAggregatorPadPrivate
184 {
185   gboolean pending_flush_start;
186   gboolean pending_flush_stop;
187   gboolean pending_eos;
188   gboolean flushing;
189
190   GMutex event_lock;
191   GCond event_cond;
192
193   GMutex stream_lock;
194 };
195
196 static gboolean
197 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
198 {
199   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
200
201   aggpad->eos = FALSE;
202   aggpad->priv->flushing = FALSE;
203
204   if (klass->flush)
205     return klass->flush (aggpad, agg);
206
207   return TRUE;
208 }
209
210 /*************************************
211  * GstAggregator implementation  *
212  *************************************/
213 static GstElementClass *aggregator_parent_class = NULL;
214
215 struct _GstAggregatorPrivate
216 {
217   gint padcount;
218
219   /* Our state is >= PAUSED */
220   gboolean running;
221
222
223   gint seqnum;
224   gboolean send_stream_start;
225   gboolean send_segment;
226   gboolean flush_seeking;
227   gboolean pending_flush_start;
228   gboolean send_eos;
229   GstFlowReturn flow_return;
230
231   GstCaps *srccaps;
232
233   GstTagList *tags;
234   gboolean tags_changed;
235
236   /* Lock to prevent two src setcaps from happening at the same time  */
237   GMutex setcaps_lock;
238
239   gboolean latency_live;
240   GstClockTime latency_min;
241   GstClockTime latency_max;
242
243   GstClockTime sub_latency_min;
244   GstClockTime sub_latency_max;
245
246   /* aggregate */
247   GstClockID aggregate_id;
248   GMutex src_lock;
249   GCond src_cond;
250 };
251
252 typedef struct
253 {
254   GstEvent *event;
255   gboolean result;
256   gboolean flush;
257
258   gboolean one_actually_seeked;
259 } EventData;
260
261 #define DEFAULT_LATENCY        0
262
263 enum
264 {
265   PROP_0,
266   PROP_LATENCY,
267   PROP_LAST
268 };
269
270 /**
271  * gst_aggregator_iterate_sinkpads:
272  * @self: The #GstAggregator
273  * @func: The function to call.
274  * @user_data: The data to pass to @func.
275  *
276  * Iterate the sinkpads of aggregator to call a function on them.
277  *
278  * This method guarantees that @func will be called only once for each
279  * sink pad.
280  */
281 gboolean
282 gst_aggregator_iterate_sinkpads (GstAggregator * self,
283     GstAggregatorPadForeachFunc func, gpointer user_data)
284 {
285   gboolean result = FALSE;
286   GstIterator *iter;
287   gboolean done = FALSE;
288   GValue item = { 0, };
289   GList *seen_pads = NULL;
290
291   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
292
293   if (!iter)
294     goto no_iter;
295
296   while (!done) {
297     switch (gst_iterator_next (iter, &item)) {
298       case GST_ITERATOR_OK:
299       {
300         GstAggregatorPad *pad;
301
302         pad = g_value_get_object (&item);
303
304         /* if already pushed, skip. FIXME, find something faster to tag pads */
305         if (pad == NULL || g_list_find (seen_pads, pad)) {
306           g_value_reset (&item);
307           break;
308         }
309
310         GST_LOG_OBJECT (pad, "calling function %s on pad",
311             GST_DEBUG_FUNCPTR_NAME (func));
312
313         result = func (self, pad, user_data);
314
315         done = !result;
316
317         seen_pads = g_list_prepend (seen_pads, pad);
318
319         g_value_reset (&item);
320         break;
321       }
322       case GST_ITERATOR_RESYNC:
323         gst_iterator_resync (iter);
324         break;
325       case GST_ITERATOR_ERROR:
326         GST_ERROR_OBJECT (self,
327             "Could not iterate over internally linked pads");
328         done = TRUE;
329         break;
330       case GST_ITERATOR_DONE:
331         done = TRUE;
332         break;
333     }
334   }
335   g_value_unset (&item);
336   gst_iterator_free (iter);
337
338   if (seen_pads == NULL) {
339     GST_DEBUG_OBJECT (self, "No pad seen");
340     return FALSE;
341   }
342
343   g_list_free (seen_pads);
344
345 no_iter:
346   return result;
347 }
348
349 static inline gboolean
350 gst_aggregator_check_all_pads_with_data_or_eos (GstAggregator * self,
351     GstAggregatorPad * aggpad, gpointer user_data)
352 {
353   if (aggpad->buffer || aggpad->eos) {
354     return TRUE;
355   }
356
357   GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
358
359   return FALSE;
360 }
361
362 static void
363 gst_aggregator_reset_flow_values (GstAggregator * self)
364 {
365   self->priv->flow_return = GST_FLOW_FLUSHING;
366   self->priv->send_stream_start = TRUE;
367   self->priv->send_segment = TRUE;
368   gst_segment_init (&self->segment, GST_FORMAT_TIME);
369 }
370
371 static inline void
372 gst_aggregator_push_mandatory_events (GstAggregator * self)
373 {
374   GstAggregatorPrivate *priv = self->priv;
375
376   if (g_atomic_int_get (&self->priv->send_stream_start)) {
377     gchar s_id[32];
378
379     GST_INFO_OBJECT (self, "pushing stream start");
380     /* stream-start (FIXME: create id based on input ids) */
381     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
382     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
383       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
384     }
385     g_atomic_int_set (&self->priv->send_stream_start, FALSE);
386   }
387
388   if (self->priv->srccaps) {
389
390     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
391         self->priv->srccaps);
392     if (!gst_pad_push_event (self->srcpad,
393             gst_event_new_caps (self->priv->srccaps))) {
394       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
395     }
396     gst_caps_unref (self->priv->srccaps);
397     self->priv->srccaps = NULL;
398   }
399
400   if (g_atomic_int_get (&self->priv->send_segment)) {
401     if (!g_atomic_int_get (&self->priv->flush_seeking)) {
402       GstEvent *segev = gst_event_new_segment (&self->segment);
403
404       if (!self->priv->seqnum)
405         self->priv->seqnum = gst_event_get_seqnum (segev);
406       else
407         gst_event_set_seqnum (segev, self->priv->seqnum);
408
409       GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
410       gst_pad_push_event (self->srcpad, segev);
411       g_atomic_int_set (&self->priv->send_segment, FALSE);
412     }
413   }
414
415   if (priv->tags && priv->tags_changed) {
416     gst_pad_push_event (self->srcpad,
417         gst_event_new_tag (gst_tag_list_ref (priv->tags)));
418     priv->tags_changed = FALSE;
419   }
420 }
421
422 /**
423  * gst_aggregator_set_src_caps:
424  * @self: The #GstAggregator
425  * @caps: The #GstCaps to set on the src pad.
426  *
427  * Sets the caps to be used on the src pad.
428  */
429 void
430 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
431 {
432   GST_AGGREGATOR_SETCAPS_LOCK (self);
433   gst_caps_replace (&self->priv->srccaps, caps);
434   gst_aggregator_push_mandatory_events (self);
435   GST_AGGREGATOR_SETCAPS_UNLOCK (self);
436 }
437
438 /**
439  * gst_aggregator_finish_buffer:
440  * @self: The #GstAggregator
441  * @buffer: the #GstBuffer to push.
442  *
443  * This method will take care of sending mandatory events before pushing
444  * the provided buffer.
445  */
446 GstFlowReturn
447 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
448 {
449   gst_aggregator_push_mandatory_events (self);
450
451   if (!g_atomic_int_get (&self->priv->flush_seeking) &&
452       gst_pad_is_active (self->srcpad)) {
453     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
454     return gst_pad_push (self->srcpad, buffer);
455   } else {
456     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
457         g_atomic_int_get (&self->priv->flush_seeking),
458         gst_pad_is_active (self->srcpad));
459     gst_buffer_unref (buffer);
460     return GST_FLOW_OK;
461   }
462 }
463
464 static void
465 gst_aggregator_push_eos (GstAggregator * self)
466 {
467   GstEvent *event;
468   gst_aggregator_push_mandatory_events (self);
469
470   self->priv->send_eos = FALSE;
471   event = gst_event_new_eos ();
472   gst_event_set_seqnum (event, self->priv->seqnum);
473   gst_pad_push_event (self->srcpad, event);
474 }
475
476 static GstClockTime
477 gst_aggregator_get_next_time (GstAggregator * self)
478 {
479   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
480
481   if (klass->get_next_time)
482     return klass->get_next_time (self);
483
484   return GST_CLOCK_TIME_NONE;
485 }
486
487 /* called with the src STREAM lock */
488 static gboolean
489 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
490 {
491   GstClockTime latency_max, latency_min;
492   GstClockTime start;
493   gboolean live, res;
494
495   *timeout = FALSE;
496
497   SRC_STREAM_LOCK (self);
498
499   gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
500
501   if (gst_aggregator_iterate_sinkpads (self,
502           gst_aggregator_check_all_pads_with_data_or_eos, NULL)) {
503     GST_DEBUG_OBJECT (self, "all pads have data");
504     SRC_STREAM_UNLOCK (self);
505
506     return TRUE;
507   }
508
509   /* Before waiting, check if we're actually still running */
510   if (!self->priv->running || !self->priv->send_eos) {
511     SRC_STREAM_UNLOCK (self);
512
513     return FALSE;
514   }
515
516   start = gst_aggregator_get_next_time (self);
517
518   if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
519       || !GST_CLOCK_TIME_IS_VALID (start)) {
520     /* We wake up here when something happened, and below
521      * then check if we're ready now. If we return FALSE,
522      * we will be directly called again.
523      */
524     SRC_STREAM_WAIT (self);
525   } else {
526     GstClockTime base_time, time;
527     GstClock *clock;
528     GstClockReturn status;
529
530     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
531         GST_TIME_ARGS (start));
532
533     GST_OBJECT_LOCK (self);
534     base_time = GST_ELEMENT_CAST (self)->base_time;
535     clock = GST_ELEMENT_CLOCK (self);
536     if (clock)
537       gst_object_ref (clock);
538     GST_OBJECT_UNLOCK (self);
539
540     time = base_time + start;
541
542     if (GST_CLOCK_TIME_IS_VALID (latency_min)) {
543       time += latency_min;
544     } else {
545       time += self->latency;
546     }
547
548     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
549         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
550         " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT
551         " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time),
552         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
553         GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max),
554         GST_TIME_ARGS (latency_min),
555         GST_TIME_ARGS (gst_clock_get_time (clock)));
556
557     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
558     gst_object_unref (clock);
559     SRC_STREAM_UNLOCK (self);
560
561     status = gst_clock_id_wait (self->priv->aggregate_id, NULL);
562
563     SRC_STREAM_LOCK (self);
564     if (self->priv->aggregate_id) {
565       gst_clock_id_unref (self->priv->aggregate_id);
566       self->priv->aggregate_id = NULL;
567     }
568
569     GST_DEBUG_OBJECT (self, "clock returned %d", status);
570
571     /* we timed out */
572     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
573       SRC_STREAM_UNLOCK (self);
574       *timeout = TRUE;
575       return TRUE;
576     }
577   }
578
579   res = gst_aggregator_iterate_sinkpads (self,
580       gst_aggregator_check_all_pads_with_data_or_eos, NULL);
581   SRC_STREAM_UNLOCK (self);
582
583   return res;
584 }
585
586 static void
587 gst_aggregator_aggregate_func (GstAggregator * self)
588 {
589   GstAggregatorPrivate *priv = self->priv;
590   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
591   gboolean timeout = FALSE;
592
593   if (self->priv->running == FALSE) {
594     GST_DEBUG_OBJECT (self, "Not running anymore");
595     return;
596   }
597
598   GST_LOG_OBJECT (self, "Checking aggregate");
599   while (priv->send_eos && priv->running) {
600     if (!gst_aggregator_wait_and_check (self, &timeout))
601       continue;
602
603     GST_TRACE_OBJECT (self, "Actually aggregating!");
604
605     priv->flow_return = klass->aggregate (self, timeout);
606
607     if (priv->flow_return == GST_FLOW_EOS) {
608       gst_aggregator_push_eos (self);
609     }
610
611     if (priv->flow_return == GST_FLOW_FLUSHING &&
612         g_atomic_int_get (&priv->flush_seeking))
613       priv->flow_return = GST_FLOW_OK;
614
615     GST_LOG_OBJECT (self, "flow return is %s",
616         gst_flow_get_name (priv->flow_return));
617
618     if (priv->flow_return != GST_FLOW_OK)
619       break;
620   }
621 }
622
623 static gboolean
624 gst_aggregator_start (GstAggregator * self)
625 {
626   GstAggregatorClass *klass;
627   gboolean result;
628
629   self->priv->running = TRUE;
630   self->priv->send_stream_start = TRUE;
631   self->priv->send_segment = TRUE;
632   self->priv->send_eos = TRUE;
633   self->priv->srccaps = NULL;
634   self->priv->flow_return = GST_FLOW_OK;
635
636   klass = GST_AGGREGATOR_GET_CLASS (self);
637
638   if (klass->start)
639     result = klass->start (self);
640   else
641     result = TRUE;
642
643   return result;
644 }
645
646 static gboolean
647 _check_pending_flush_stop (GstAggregatorPad * pad)
648 {
649   return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
650 }
651
652 static gboolean
653 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
654 {
655   gboolean res = TRUE;
656
657   GST_INFO_OBJECT (self, "%s srcpad task",
658       flush_start ? "Pausing" : "Stopping");
659
660   self->priv->running = FALSE;
661   SRC_STREAM_BROADCAST (self);
662
663   if (flush_start) {
664     res = gst_pad_push_event (self->srcpad, flush_start);
665   }
666
667   gst_pad_stop_task (self->srcpad);
668   SRC_STREAM_BROADCAST (self);
669
670   return res;
671 }
672
673 static void
674 gst_aggregator_start_srcpad_task (GstAggregator * self)
675 {
676   GST_INFO_OBJECT (self, "Starting srcpad task");
677
678   self->priv->running = TRUE;
679   gst_pad_start_task (GST_PAD (self->srcpad),
680       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
681 }
682
683 static GstFlowReturn
684 gst_aggregator_flush (GstAggregator * self)
685 {
686   GstFlowReturn ret = GST_FLOW_OK;
687   GstAggregatorPrivate *priv = self->priv;
688   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
689
690   GST_DEBUG_OBJECT (self, "Flushing everything");
691   g_atomic_int_set (&priv->send_segment, TRUE);
692   g_atomic_int_set (&priv->flush_seeking, FALSE);
693   g_atomic_int_set (&priv->tags_changed, FALSE);
694   if (klass->flush)
695     ret = klass->flush (self);
696
697   return ret;
698 }
699
700 static gboolean
701 gst_aggregator_all_flush_stop_received (GstAggregator * self)
702 {
703   GList *tmp;
704   GstAggregatorPad *tmppad;
705
706   GST_OBJECT_LOCK (self);
707   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
708     tmppad = (GstAggregatorPad *) tmp->data;
709
710     if (_check_pending_flush_stop (tmppad) == FALSE) {
711       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
712           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
713       GST_OBJECT_UNLOCK (self);
714       return FALSE;
715     }
716   }
717   GST_OBJECT_UNLOCK (self);
718
719   return TRUE;
720 }
721
722 static void
723 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
724     GstEvent * event)
725 {
726   GstBuffer *tmpbuf;
727   GstAggregatorPrivate *priv = self->priv;
728   GstAggregatorPadPrivate *padpriv = aggpad->priv;
729
730   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
731   /*  Remove pad buffer and wake up the streaming thread */
732   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
733   gst_buffer_replace (&tmpbuf, NULL);
734   PAD_STREAM_LOCK (aggpad);
735   if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
736           TRUE, FALSE) == TRUE) {
737     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
738     g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
739   }
740
741   if (g_atomic_int_get (&priv->flush_seeking)) {
742     /* If flush_seeking we forward the first FLUSH_START */
743     if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
744             TRUE, FALSE) == TRUE) {
745
746       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
747       gst_aggregator_stop_srcpad_task (self, event);
748       priv->flow_return = GST_FLOW_OK;
749
750       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
751       GST_PAD_STREAM_LOCK (self->srcpad);
752       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
753       event = NULL;
754     } else {
755       gst_event_unref (event);
756     }
757   } else {
758     gst_event_unref (event);
759   }
760   PAD_STREAM_UNLOCK (aggpad);
761
762   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
763   gst_buffer_replace (&tmpbuf, NULL);
764 }
765
766 /* GstAggregator vmethods default implementations */
767 static gboolean
768 gst_aggregator_default_sink_event (GstAggregator * self,
769     GstAggregatorPad * aggpad, GstEvent * event)
770 {
771   gboolean res = TRUE;
772   GstPad *pad = GST_PAD (aggpad);
773   GstAggregatorPrivate *priv = self->priv;
774
775   switch (GST_EVENT_TYPE (event)) {
776     case GST_EVENT_FLUSH_START:
777     {
778       gst_aggregator_flush_start (self, aggpad, event);
779       /* We forward only in one case: right after flush_seeking */
780       event = NULL;
781       goto eat;
782     }
783     case GST_EVENT_FLUSH_STOP:
784     {
785       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
786
787       gst_aggregator_pad_flush (aggpad, self);
788       if (g_atomic_int_get (&priv->flush_seeking)) {
789         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
790
791         if (g_atomic_int_get (&priv->flush_seeking)) {
792           if (gst_aggregator_all_flush_stop_received (self)) {
793             /* That means we received FLUSH_STOP/FLUSH_STOP on
794              * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
795             gst_aggregator_flush (self);
796             gst_pad_push_event (self->srcpad, event);
797             priv->send_eos = TRUE;
798             event = NULL;
799             SRC_STREAM_BROADCAST (self);
800
801             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
802             GST_PAD_STREAM_UNLOCK (self->srcpad);
803             gst_aggregator_start_srcpad_task (self);
804           }
805         }
806       }
807
808       /* We never forward the event */
809       goto eat;
810     }
811     case GST_EVENT_EOS:
812     {
813       GST_DEBUG_OBJECT (aggpad, "EOS");
814
815       /* We still have a buffer, and we don't want the subclass to have to
816        * check for it. Mark pending_eos, eos will be set when steal_buffer is
817        * called
818        */
819       PAD_LOCK_EVENT (aggpad);
820       if (!aggpad->buffer) {
821         aggpad->eos = TRUE;
822       } else {
823         aggpad->priv->pending_eos = TRUE;
824       }
825       PAD_UNLOCK_EVENT (aggpad);
826
827       SRC_STREAM_BROADCAST (self);
828       goto eat;
829     }
830     case GST_EVENT_SEGMENT:
831     {
832       PAD_LOCK_EVENT (aggpad);
833       gst_event_copy_segment (event, &aggpad->segment);
834       self->priv->seqnum = gst_event_get_seqnum (event);
835       PAD_UNLOCK_EVENT (aggpad);
836       goto eat;
837     }
838     case GST_EVENT_STREAM_START:
839     {
840       goto eat;
841     }
842     case GST_EVENT_TAG:
843     {
844       GstTagList *tags;
845
846       gst_event_parse_tag (event, &tags);
847
848       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
849         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
850         gst_event_unref (event);
851         event = NULL;
852         goto eat;
853       }
854       break;
855     }
856     default:
857     {
858       break;
859     }
860   }
861
862   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
863   return gst_pad_event_default (pad, GST_OBJECT (self), event);
864
865 eat:
866   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
867   if (event)
868     gst_event_unref (event);
869
870   return res;
871 }
872
873 static inline gboolean
874 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
875     gpointer unused_udata)
876 {
877   gst_aggregator_pad_flush (pad, self);
878
879   return TRUE;
880 }
881
882 static gboolean
883 gst_aggregator_stop (GstAggregator * agg)
884 {
885   GstAggregatorClass *klass;
886   gboolean result;
887
888   gst_aggregator_reset_flow_values (agg);
889
890   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
891
892   if (agg->priv->tags)
893     gst_tag_list_unref (agg->priv->tags);
894   agg->priv->tags = NULL;
895
896   klass = GST_AGGREGATOR_GET_CLASS (agg);
897
898   if (klass->stop)
899     result = klass->stop (agg);
900   else
901     result = TRUE;
902
903   return result;
904 }
905
906 /* GstElement vmethods implementations */
907 static GstStateChangeReturn
908 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
909 {
910   GstStateChangeReturn ret;
911   GstAggregator *self = GST_AGGREGATOR (element);
912
913   switch (transition) {
914     case GST_STATE_CHANGE_READY_TO_PAUSED:
915       if (!gst_aggregator_start (self))
916         goto error_start;
917       break;
918     default:
919       break;
920   }
921
922   if ((ret =
923           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
924               transition)) == GST_STATE_CHANGE_FAILURE)
925     goto failure;
926
927
928   switch (transition) {
929     case GST_STATE_CHANGE_PAUSED_TO_READY:
930       if (!gst_aggregator_stop (self)) {
931         /* What to do in this case? Error out? */
932         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
933       }
934       break;
935     default:
936       break;
937   }
938
939   return ret;
940
941 /* ERRORS */
942 failure:
943   {
944     GST_ERROR_OBJECT (element, "parent failed state change");
945     return ret;
946   }
947 error_start:
948   {
949     GST_ERROR_OBJECT (element, "Subclass failed to start");
950     return GST_STATE_CHANGE_FAILURE;
951   }
952 }
953
954 static void
955 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
956 {
957   GstAggregator *self = GST_AGGREGATOR (element);
958   GstBuffer *tmpbuf;
959
960   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
961
962   GST_INFO_OBJECT (pad, "Removing pad");
963
964   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
965   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
966   gst_buffer_replace (&tmpbuf, NULL);
967   gst_element_remove_pad (element, pad);
968
969   SRC_STREAM_BROADCAST (self);
970 }
971
972 static GstPad *
973 gst_aggregator_request_new_pad (GstElement * element,
974     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
975 {
976   GstAggregator *self;
977   GstAggregatorPad *agg_pad;
978
979   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
980   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
981
982   self = GST_AGGREGATOR (element);
983
984   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
985     gint serial = 0;
986     gchar *name = NULL;
987
988     GST_OBJECT_LOCK (element);
989     if (req_name == NULL || strlen (req_name) < 6
990         || !g_str_has_prefix (req_name, "sink_")) {
991       /* no name given when requesting the pad, use next available int */
992       priv->padcount++;
993     } else {
994       /* parse serial number from requested padname */
995       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
996       if (serial >= priv->padcount)
997         priv->padcount = serial;
998     }
999
1000     name = g_strdup_printf ("sink_%u", priv->padcount);
1001     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1002         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1003     g_free (name);
1004
1005     GST_OBJECT_UNLOCK (element);
1006
1007   } else {
1008     return NULL;
1009   }
1010
1011   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1012
1013   if (priv->running)
1014     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1015
1016   /* add the pad to the element */
1017   gst_element_add_pad (element, GST_PAD (agg_pad));
1018
1019   return GST_PAD (agg_pad);
1020 }
1021
1022 typedef struct
1023 {
1024   GstClockTime min, max;
1025   gboolean live;
1026 } LatencyData;
1027
1028 static gboolean
1029 gst_aggregator_query_sink_latency_foreach (GstAggregator * self,
1030     GstAggregatorPad * pad, gpointer user_data)
1031 {
1032   LatencyData *data = user_data;
1033   GstClockTime min, max;
1034   GstQuery *query;
1035   gboolean live, res;
1036
1037   query = gst_query_new_latency ();
1038   res = gst_pad_peer_query (GST_PAD_CAST (pad), query);
1039
1040   if (res) {
1041     gst_query_parse_latency (query, &live, &min, &max);
1042
1043     GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT
1044         " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1045
1046     if (min != GST_CLOCK_TIME_NONE && min > data->min)
1047       data->min = min;
1048
1049     if (max != GST_CLOCK_TIME_NONE &&
1050         ((data->max != GST_CLOCK_TIME_NONE && max < data->max) ||
1051             (data->max == GST_CLOCK_TIME_NONE)))
1052       data->max = max;
1053
1054     data->live |= live;
1055   }
1056
1057   gst_query_unref (query);
1058
1059   return TRUE;
1060 }
1061
1062 /**
1063  * gst_aggregator_get_latency:
1064  * @self: a #GstAggregator
1065  * @live: (out) (allow-none): whether @self is live
1066  * @min_latency: (out) (allow-none): the configured minimum latency of @self
1067  * @max_latency: (out) (allow-none): the configured maximum latency of @self
1068  *
1069  * Retreives the latency values reported by @self in response to the latency
1070  * query.
1071  *
1072  * Typically only called by subclasses.
1073  */
1074 void
1075 gst_aggregator_get_latency (GstAggregator * self, gboolean * live,
1076     GstClockTime * min_latency, GstClockTime * max_latency)
1077 {
1078   GstClockTime min, max;
1079
1080   g_return_if_fail (GST_IS_AGGREGATOR (self));
1081
1082   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1083   min = self->priv->latency_min;
1084   max = self->priv->latency_max;
1085
1086   min += self->priv->sub_latency_min;
1087   if (GST_CLOCK_TIME_IS_VALID (max)
1088       && GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max))
1089     max += self->priv->sub_latency_max;
1090
1091   if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
1092     min += self->latency;
1093     if (GST_CLOCK_TIME_IS_VALID (max))
1094       max += self->latency;
1095   }
1096
1097   if (live)
1098     *live = self->priv->latency_live;
1099   if (min_latency)
1100     *min_latency = min;
1101   if (max_latency)
1102     *max_latency = max;
1103 }
1104
1105 static gboolean
1106 gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
1107 {
1108   LatencyData data;
1109
1110   data.min = 0;
1111   data.max = GST_CLOCK_TIME_NONE;
1112   data.live = FALSE;
1113
1114   /* query upstream's latency */
1115   SRC_STREAM_LOCK (self);
1116   gst_aggregator_iterate_sinkpads (self,
1117       gst_aggregator_query_sink_latency_foreach, &data);
1118   SRC_STREAM_UNLOCK (self);
1119
1120   if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) &&
1121       self->latency > data.max) {
1122     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1123         ("%s", "Latency too big"),
1124         ("The requested latency value is too big for the current pipeline.  "
1125             "Limiting to %" G_GINT64_FORMAT, data.max));
1126     self->latency = data.max;
1127   }
1128
1129   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (data.min))) {
1130     GST_WARNING_OBJECT (self, "Invalid minimum latency, using 0");
1131     data.min = 0;
1132   }
1133
1134   if (G_UNLIKELY (data.min > data.max)) {
1135     GST_WARNING_OBJECT (self, "Minimum latency is greater than maximum latency "
1136         "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT "). "
1137         "Clamping it at the maximum latency", data.min, data.max);
1138     data.min = data.max;
1139   }
1140
1141   self->priv->latency_live = data.live;
1142   self->priv->latency_min = data.min;
1143   self->priv->latency_max = data.max;
1144
1145   /* add our own */
1146   if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
1147     if (GST_CLOCK_TIME_IS_VALID (data.min))
1148       data.min += self->latency;
1149     if (GST_CLOCK_TIME_IS_VALID (data.max))
1150       data.max += self->latency;
1151   }
1152
1153   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_min)
1154       && GST_CLOCK_TIME_IS_VALID (data.min))
1155     data.min += self->priv->sub_latency_min;
1156   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1157       && GST_CLOCK_TIME_IS_VALID (data.max))
1158     data.max += self->priv->sub_latency_max;
1159
1160   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1161       " max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min,
1162       data.max);
1163
1164   gst_query_set_latency (query, data.live, data.min, data.max);
1165
1166   return TRUE;
1167 }
1168
1169 static gboolean
1170 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1171 {
1172   GstAggregator *self = GST_AGGREGATOR (element);
1173
1174   GST_STATE_LOCK (element);
1175   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1176       GST_STATE (element) < GST_STATE_PAUSED) {
1177     gdouble rate;
1178     GstFormat fmt;
1179     GstSeekFlags flags;
1180     GstSeekType start_type, stop_type;
1181     gint64 start, stop;
1182
1183     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1184         &start, &stop_type, &stop);
1185     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1186         stop_type, stop, NULL);
1187
1188     self->priv->seqnum = gst_event_get_seqnum (event);
1189     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1190   }
1191   GST_STATE_UNLOCK (element);
1192
1193
1194   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1195       event);
1196 }
1197
1198 static gboolean
1199 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1200 {
1201   gboolean res = TRUE;
1202
1203   switch (GST_QUERY_TYPE (query)) {
1204     case GST_QUERY_SEEKING:
1205     {
1206       GstFormat format;
1207
1208       /* don't pass it along as some (file)sink might claim it does
1209        * whereas with a collectpads in between that will not likely work */
1210       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1211       gst_query_set_seeking (query, format, FALSE, 0, -1);
1212       res = TRUE;
1213
1214       goto discard;
1215     }
1216     case GST_QUERY_LATENCY:
1217     {
1218       gboolean ret;
1219
1220       ret = gst_aggregator_query_latency (self, query);
1221       /* Wake up the src thread again, due to changed latencies
1222        * or changed live-ness we might have to adjust if we wait
1223        * on a deadline at all and how long.
1224        */
1225       SRC_STREAM_BROADCAST (self);
1226       return ret;
1227     }
1228     default:
1229       break;
1230   }
1231
1232   return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1233
1234 discard:
1235   return res;
1236 }
1237
1238 static gboolean
1239 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1240 {
1241   EventData *evdata = user_data;
1242   gboolean ret = TRUE;
1243   GstPad *peer = gst_pad_get_peer (pad);
1244   GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv;
1245
1246   if (peer) {
1247     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1248     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1249     gst_object_unref (peer);
1250   }
1251
1252   if (ret == FALSE) {
1253     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
1254       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1255
1256     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1257       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1258
1259       if (gst_pad_query (peer, seeking)) {
1260         gboolean seekable;
1261
1262         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1263
1264         if (seekable == FALSE) {
1265           GST_INFO_OBJECT (pad,
1266               "Source not seekable, We failed but it does not matter!");
1267
1268           ret = TRUE;
1269         }
1270       } else {
1271         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1272       }
1273     }
1274
1275     if (evdata->flush) {
1276       padpriv->pending_flush_start = FALSE;
1277       padpriv->pending_flush_stop = FALSE;
1278     }
1279   } else {
1280     evdata->one_actually_seeked = TRUE;
1281   }
1282
1283   evdata->result &= ret;
1284
1285   /* Always send to all pads */
1286   return FALSE;
1287 }
1288
1289 static gboolean
1290 gst_aggregator_set_flush_pending (GstAggregator * self, GstAggregatorPad * pad,
1291     gpointer udata)
1292 {
1293   pad->priv->pending_flush_start = TRUE;
1294   pad->priv->pending_flush_stop = FALSE;
1295
1296   return TRUE;
1297 }
1298
1299 static EventData
1300 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1301     GstEvent * event, gboolean flush)
1302 {
1303   EventData evdata;
1304
1305   evdata.event = event;
1306   evdata.result = TRUE;
1307   evdata.flush = flush;
1308   evdata.one_actually_seeked = FALSE;
1309
1310   /* We first need to set all pads as flushing in a first pass
1311    * as flush_start flush_stop is sometimes sent synchronously
1312    * while we send the seek event */
1313   if (flush)
1314     gst_aggregator_iterate_sinkpads (self,
1315         gst_aggregator_set_flush_pending, NULL);
1316
1317   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
1318
1319   gst_event_unref (event);
1320
1321   return evdata;
1322 }
1323
1324 static gboolean
1325 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1326 {
1327   gdouble rate;
1328   GstFormat fmt;
1329   GstSeekFlags flags;
1330   GstSeekType start_type, stop_type;
1331   gint64 start, stop;
1332   gboolean flush;
1333   EventData evdata;
1334   GstAggregatorPrivate *priv = self->priv;
1335
1336   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1337       &start, &stop_type, &stop);
1338
1339   GST_INFO_OBJECT (self, "starting SEEK");
1340
1341   flush = flags & GST_SEEK_FLAG_FLUSH;
1342
1343   if (flush) {
1344     g_atomic_int_set (&priv->pending_flush_start, TRUE);
1345     g_atomic_int_set (&priv->flush_seeking, TRUE);
1346   }
1347
1348   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1349       stop_type, stop, NULL);
1350
1351   /* forward the seek upstream */
1352   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
1353   event = NULL;
1354
1355   if (!evdata.result || !evdata.one_actually_seeked) {
1356     g_atomic_int_set (&priv->flush_seeking, FALSE);
1357     g_atomic_int_set (&priv->pending_flush_start, FALSE);
1358   }
1359
1360   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1361
1362   return evdata.result;
1363 }
1364
1365 static gboolean
1366 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
1367 {
1368   EventData evdata;
1369   gboolean res = TRUE;
1370
1371   switch (GST_EVENT_TYPE (event)) {
1372     case GST_EVENT_SEEK:
1373     {
1374       gst_event_ref (event);
1375       res = gst_aggregator_do_seek (self, event);
1376       gst_event_unref (event);
1377       event = NULL;
1378       goto done;
1379     }
1380     case GST_EVENT_NAVIGATION:
1381     {
1382       /* navigation is rather pointless. */
1383       res = FALSE;
1384       gst_event_unref (event);
1385       goto done;
1386     }
1387     default:
1388     {
1389       break;
1390     }
1391   }
1392
1393   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE);
1394   res = evdata.result;
1395
1396 done:
1397   return res;
1398 }
1399
1400 static gboolean
1401 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
1402     GstEvent * event)
1403 {
1404   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1405
1406   return klass->src_event (GST_AGGREGATOR (parent), event);
1407 }
1408
1409 static gboolean
1410 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
1411     GstQuery * query)
1412 {
1413   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1414
1415   return klass->src_query (GST_AGGREGATOR (parent), query);
1416 }
1417
1418 static gboolean
1419 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
1420     GstObject * parent, GstPadMode mode, gboolean active)
1421 {
1422   GstAggregator *self = GST_AGGREGATOR (parent);
1423   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1424
1425   if (klass->src_activate) {
1426     if (klass->src_activate (self, mode, active) == FALSE) {
1427       return FALSE;
1428     }
1429   }
1430
1431   if (active == TRUE) {
1432     switch (mode) {
1433       case GST_PAD_MODE_PUSH:
1434       {
1435         GST_INFO_OBJECT (pad, "Activating pad!");
1436         gst_aggregator_start_srcpad_task (self);
1437         return TRUE;
1438       }
1439       default:
1440       {
1441         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1442         return FALSE;
1443       }
1444     }
1445   }
1446
1447   /* deactivating */
1448   GST_INFO_OBJECT (self, "Deactivating srcpad");
1449   gst_aggregator_stop_srcpad_task (self, FALSE);
1450
1451   return TRUE;
1452 }
1453
1454 static gboolean
1455 gst_aggregator_default_sink_query (GstAggregator * self,
1456     GstAggregatorPad * aggpad, GstQuery * query)
1457 {
1458   GstPad *pad = GST_PAD (aggpad);
1459
1460   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1461 }
1462
1463 static void
1464 gst_aggregator_finalize (GObject * object)
1465 {
1466   GstAggregator *self = (GstAggregator *) object;
1467
1468   g_mutex_clear (&self->priv->setcaps_lock);
1469   g_mutex_clear (&self->priv->src_lock);
1470   g_cond_clear (&self->priv->src_cond);
1471
1472   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1473 }
1474
1475 static void
1476 gst_aggregator_dispose (GObject * object)
1477 {
1478   G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
1479 }
1480
1481 /*
1482  * gst_aggregator_set_latency_property:
1483  * @agg: a #GstAggregator
1484  * @latency: the new latency value.
1485  *
1486  * Sets the new latency value to @latency. This value is used to limit the
1487  * amount of time a pad waits for data to appear before considering the pad
1488  * as unresponsive.
1489  */
1490 static void
1491 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1492 {
1493   gboolean changed;
1494
1495   g_return_if_fail (GST_IS_AGGREGATOR (self));
1496
1497   GST_OBJECT_LOCK (self);
1498
1499   if (self->priv->latency_live && self->priv->latency_max != 0 &&
1500       GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) {
1501     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1502         ("%s", "Latency too big"),
1503         ("The requested latency value is too big for the latency in the "
1504             "current pipeline.  Limiting to %" G_GINT64_FORMAT,
1505             self->priv->latency_max));
1506     latency = self->priv->latency_max;
1507   }
1508
1509   changed = self->latency != latency;
1510   self->latency = latency;
1511   GST_OBJECT_UNLOCK (self);
1512
1513   if (changed)
1514     gst_element_post_message (GST_ELEMENT_CAST (self),
1515         gst_message_new_latency (GST_OBJECT_CAST (self)));
1516 }
1517
1518 /*
1519  * gst_aggregator_get_latency_property:
1520  * @agg: a #GstAggregator
1521  *
1522  * Gets the latency value. See gst_aggregator_set_latency for
1523  * more details.
1524  *
1525  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
1526  * before a pad is deemed unresponsive. A value of -1 means an
1527  * unlimited time.
1528  */
1529 static gint64
1530 gst_aggregator_get_latency_property (GstAggregator * agg)
1531 {
1532   gint64 res;
1533
1534   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
1535
1536   GST_OBJECT_LOCK (agg);
1537   res = agg->latency;
1538   GST_OBJECT_UNLOCK (agg);
1539
1540   return res;
1541 }
1542
1543 static void
1544 gst_aggregator_set_property (GObject * object, guint prop_id,
1545     const GValue * value, GParamSpec * pspec)
1546 {
1547   GstAggregator *agg = GST_AGGREGATOR (object);
1548
1549   switch (prop_id) {
1550     case PROP_LATENCY:
1551       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
1552       break;
1553     default:
1554       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1555       break;
1556   }
1557 }
1558
1559 static void
1560 gst_aggregator_get_property (GObject * object, guint prop_id,
1561     GValue * value, GParamSpec * pspec)
1562 {
1563   GstAggregator *agg = GST_AGGREGATOR (object);
1564
1565   switch (prop_id) {
1566     case PROP_LATENCY:
1567       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
1568       break;
1569     default:
1570       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1571       break;
1572   }
1573 }
1574
1575 /* GObject vmethods implementations */
1576 static void
1577 gst_aggregator_class_init (GstAggregatorClass * klass)
1578 {
1579   GObjectClass *gobject_class = (GObjectClass *) klass;
1580   GstElementClass *gstelement_class = (GstElementClass *) klass;
1581
1582   aggregator_parent_class = g_type_class_peek_parent (klass);
1583   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1584
1585   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1586       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1587
1588   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1589
1590   klass->sink_event = gst_aggregator_default_sink_event;
1591   klass->sink_query = gst_aggregator_default_sink_query;
1592
1593   klass->src_event = gst_aggregator_default_src_event;
1594   klass->src_query = gst_aggregator_default_src_query;
1595
1596   gstelement_class->request_new_pad =
1597       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
1598   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
1599   gstelement_class->release_pad =
1600       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
1601   gstelement_class->change_state =
1602       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
1603
1604   gobject_class->set_property = gst_aggregator_set_property;
1605   gobject_class->get_property = gst_aggregator_get_property;
1606   gobject_class->finalize = gst_aggregator_finalize;
1607   gobject_class->dispose = gst_aggregator_dispose;
1608
1609   g_object_class_install_property (gobject_class, PROP_LATENCY,
1610       g_param_spec_int64 ("latency", "Buffer latency",
1611           "Additional latency in live mode to allow upstream "
1612           "to take longer to produce buffers for the current "
1613           "position", 0,
1614           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
1615           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1616 }
1617
1618 static void
1619 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1620 {
1621   GstPadTemplate *pad_template;
1622   GstAggregatorPrivate *priv;
1623
1624   g_return_if_fail (klass->aggregate != NULL);
1625
1626   self->priv =
1627       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1628       GstAggregatorPrivate);
1629
1630   priv = self->priv;
1631
1632   pad_template =
1633       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1634   g_return_if_fail (pad_template != NULL);
1635
1636   priv->padcount = -1;
1637   priv->tags_changed = FALSE;
1638
1639   self->priv->latency_live = FALSE;
1640   self->priv->latency_min = self->priv->sub_latency_min = 0;
1641   self->priv->latency_max = self->priv->sub_latency_max = GST_CLOCK_TIME_NONE;
1642   gst_aggregator_reset_flow_values (self);
1643
1644   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1645
1646   gst_pad_set_event_function (self->srcpad,
1647       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
1648   gst_pad_set_query_function (self->srcpad,
1649       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
1650   gst_pad_set_activatemode_function (self->srcpad,
1651       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
1652
1653   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1654
1655   self->latency = 0;
1656
1657   g_mutex_init (&self->priv->setcaps_lock);
1658   g_mutex_init (&self->priv->src_lock);
1659   g_cond_init (&self->priv->src_cond);
1660 }
1661
1662 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1663  * method to get to the padtemplates */
1664 GType
1665 gst_aggregator_get_type (void)
1666 {
1667   static volatile gsize type = 0;
1668
1669   if (g_once_init_enter (&type)) {
1670     GType _type;
1671     static const GTypeInfo info = {
1672       sizeof (GstAggregatorClass),
1673       NULL,
1674       NULL,
1675       (GClassInitFunc) gst_aggregator_class_init,
1676       NULL,
1677       NULL,
1678       sizeof (GstAggregator),
1679       0,
1680       (GInstanceInitFunc) gst_aggregator_init,
1681     };
1682
1683     _type = g_type_register_static (GST_TYPE_ELEMENT,
1684         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1685     g_once_init_leave (&type, _type);
1686   }
1687   return type;
1688 }
1689
1690 static GstFlowReturn
1691 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1692 {
1693   GstBuffer *actual_buf = buffer;
1694   GstAggregator *self = GST_AGGREGATOR (object);
1695   GstAggregatorPrivate *priv = self->priv;
1696   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1697   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1698
1699   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1700
1701   PAD_STREAM_LOCK (aggpad);
1702
1703   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1704     goto flushing;
1705
1706   if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
1707     goto eos;
1708
1709   PAD_LOCK_EVENT (aggpad);
1710
1711   while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1712     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1713     PAD_WAIT_EVENT (aggpad);
1714   }
1715   PAD_UNLOCK_EVENT (aggpad);
1716
1717   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1718     goto flushing;
1719
1720   if (aggclass->clip) {
1721     aggclass->clip (self, aggpad, buffer, &actual_buf);
1722   }
1723
1724   PAD_LOCK_EVENT (aggpad);
1725   if (aggpad->buffer)
1726     gst_buffer_unref (aggpad->buffer);
1727   aggpad->buffer = actual_buf;
1728   PAD_UNLOCK_EVENT (aggpad);
1729   PAD_STREAM_UNLOCK (aggpad);
1730
1731   SRC_STREAM_LOCK (self);
1732   if (gst_aggregator_iterate_sinkpads (self,
1733           gst_aggregator_check_all_pads_with_data_or_eos, NULL))
1734     SRC_STREAM_BROADCAST_UNLOCKED (self);
1735   SRC_STREAM_UNLOCK (self);
1736
1737   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1738
1739   return priv->flow_return;
1740
1741 flushing:
1742   PAD_STREAM_UNLOCK (aggpad);
1743
1744   gst_buffer_unref (buffer);
1745   GST_DEBUG_OBJECT (aggpad, "We are flushing");
1746
1747   return GST_FLOW_FLUSHING;
1748
1749 eos:
1750   PAD_STREAM_UNLOCK (aggpad);
1751
1752   gst_buffer_unref (buffer);
1753   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1754
1755   return GST_FLOW_EOS;
1756 }
1757
1758 static gboolean
1759 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
1760     GstQuery * query)
1761 {
1762   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1763   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1764
1765   if (GST_QUERY_IS_SERIALIZED (query)) {
1766     PAD_LOCK_EVENT (aggpad);
1767
1768     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
1769       PAD_UNLOCK_EVENT (aggpad);
1770       goto flushing;
1771     }
1772
1773     while (aggpad->buffer
1774         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1775       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1776       PAD_WAIT_EVENT (aggpad);
1777     }
1778     PAD_UNLOCK_EVENT (aggpad);
1779
1780     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1781       goto flushing;
1782   }
1783
1784   return klass->sink_query (GST_AGGREGATOR (parent),
1785       GST_AGGREGATOR_PAD (pad), query);
1786
1787 flushing:
1788   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
1789   return FALSE;
1790 }
1791
1792 static gboolean
1793 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
1794     GstEvent * event)
1795 {
1796   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1797   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1798
1799   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
1800       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
1801     PAD_LOCK_EVENT (aggpad);
1802
1803     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1804         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
1805       PAD_UNLOCK_EVENT (aggpad);
1806       goto flushing;
1807     }
1808
1809     while (aggpad->buffer
1810         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1811       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1812       PAD_WAIT_EVENT (aggpad);
1813     }
1814     PAD_UNLOCK_EVENT (aggpad);
1815
1816     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1817         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
1818       goto flushing;
1819   }
1820
1821   return klass->sink_event (GST_AGGREGATOR (parent),
1822       GST_AGGREGATOR_PAD (pad), event);
1823
1824 flushing:
1825   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
1826   if (GST_EVENT_IS_STICKY (event))
1827     gst_pad_store_sticky_event (pad, event);
1828   gst_event_unref (event);
1829   return FALSE;
1830 }
1831
1832 static gboolean
1833 gst_aggregator_pad_activate_mode_func (GstPad * pad,
1834     GstObject * parent, GstPadMode mode, gboolean active)
1835 {
1836   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1837
1838   if (active == FALSE) {
1839     PAD_LOCK_EVENT (aggpad);
1840     g_atomic_int_set (&aggpad->priv->flushing, TRUE);
1841     gst_buffer_replace (&aggpad->buffer, NULL);
1842     PAD_BROADCAST_EVENT (aggpad);
1843     PAD_UNLOCK_EVENT (aggpad);
1844   } else {
1845     g_atomic_int_set (&aggpad->priv->flushing, FALSE);
1846     PAD_LOCK_EVENT (aggpad);
1847     PAD_BROADCAST_EVENT (aggpad);
1848     PAD_UNLOCK_EVENT (aggpad);
1849   }
1850
1851   return TRUE;
1852 }
1853
1854 /***********************************
1855  * GstAggregatorPad implementation  *
1856  ************************************/
1857 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1858
1859 static void
1860 gst_aggregator_pad_constructed (GObject * object)
1861 {
1862   GstPad *pad = GST_PAD (object);
1863
1864   gst_pad_set_chain_function (pad,
1865       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
1866   gst_pad_set_event_function (pad,
1867       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func));
1868   gst_pad_set_query_function (pad,
1869       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
1870   gst_pad_set_activatemode_function (pad,
1871       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
1872 }
1873
1874 static void
1875 gst_aggregator_pad_finalize (GObject * object)
1876 {
1877   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1878
1879   g_mutex_clear (&pad->priv->event_lock);
1880   g_cond_clear (&pad->priv->event_cond);
1881   g_mutex_clear (&pad->priv->stream_lock);
1882
1883   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
1884 }
1885
1886 static void
1887 gst_aggregator_pad_dispose (GObject * object)
1888 {
1889   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1890   GstBuffer *buf;
1891
1892   buf = gst_aggregator_pad_steal_buffer (pad);
1893   if (buf)
1894     gst_buffer_unref (buf);
1895
1896   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
1897 }
1898
1899 static void
1900 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
1901 {
1902   GObjectClass *gobject_class = (GObjectClass *) klass;
1903
1904   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
1905
1906   gobject_class->constructed = gst_aggregator_pad_constructed;
1907   gobject_class->finalize = gst_aggregator_pad_finalize;
1908   gobject_class->dispose = gst_aggregator_pad_dispose;
1909 }
1910
1911 static void
1912 gst_aggregator_pad_init (GstAggregatorPad * pad)
1913 {
1914   pad->priv =
1915       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
1916       GstAggregatorPadPrivate);
1917
1918   pad->buffer = NULL;
1919   g_mutex_init (&pad->priv->event_lock);
1920   g_cond_init (&pad->priv->event_cond);
1921
1922   g_mutex_init (&pad->priv->stream_lock);
1923 }
1924
1925 /**
1926  * gst_aggregator_pad_steal_buffer:
1927  * @pad: the pad to get buffer from
1928  *
1929  * Steal the ref to the buffer currently queued in @pad.
1930  *
1931  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
1932  *   queued. You should unref the buffer after usage.
1933  */
1934 GstBuffer *
1935 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
1936 {
1937   GstBuffer *buffer = NULL;
1938
1939   PAD_LOCK_EVENT (pad);
1940   if (pad->buffer) {
1941     GST_TRACE_OBJECT (pad, "Consuming buffer");
1942     buffer = pad->buffer;
1943     pad->buffer = NULL;
1944     if (pad->priv->pending_eos) {
1945       pad->priv->pending_eos = FALSE;
1946       pad->eos = TRUE;
1947     }
1948     PAD_BROADCAST_EVENT (pad);
1949     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
1950   }
1951   PAD_UNLOCK_EVENT (pad);
1952
1953   return buffer;
1954 }
1955
1956 /**
1957  * gst_aggregator_pad_get_buffer:
1958  * @pad: the pad to get buffer from
1959  *
1960  * Returns: (transfer full): A reference to the buffer in @pad or
1961  * NULL if no buffer was queued. You should unref the buffer after
1962  * usage.
1963  */
1964 GstBuffer *
1965 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
1966 {
1967   GstBuffer *buffer = NULL;
1968
1969   PAD_LOCK_EVENT (pad);
1970   if (pad->buffer)
1971     buffer = gst_buffer_ref (pad->buffer);
1972   PAD_UNLOCK_EVENT (pad);
1973
1974   return buffer;
1975 }
1976
1977 /**
1978  * gst_aggregator_merge_tags:
1979  * @self: a #GstAggregator
1980  * @tags: a #GstTagList to merge
1981  * @mode: the #GstTagMergeMode to use
1982  *
1983  * Adds tags to so-called pending tags, which will be processed
1984  * before pushing out data downstream.
1985  *
1986  * Note that this is provided for convenience, and the subclass is
1987  * not required to use this and can still do tag handling on its own.
1988  *
1989  * MT safe.
1990  */
1991 void
1992 gst_aggregator_merge_tags (GstAggregator * self,
1993     const GstTagList * tags, GstTagMergeMode mode)
1994 {
1995   GstTagList *otags;
1996
1997   g_return_if_fail (GST_IS_AGGREGATOR (self));
1998   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
1999
2000   /* FIXME Check if we can use OBJECT lock here! */
2001   GST_OBJECT_LOCK (self);
2002   if (tags)
2003     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2004   otags = self->priv->tags;
2005   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2006   if (otags)
2007     gst_tag_list_unref (otags);
2008   self->priv->tags_changed = TRUE;
2009   GST_OBJECT_UNLOCK (self);
2010 }
2011
2012 /**
2013  * gst_aggregator_set_latency:
2014  * @self: a #GstAggregator
2015  * @min_latency: minimum latency
2016  * @max_latency: maximum latency
2017  *
2018  * Lets #GstAggregator sub-classes tell the baseclass what their internal
2019  * latency is. Will also post a LATENCY message on the bus so the pipeline
2020  * can reconfigure its global latency.
2021  */
2022 void
2023 gst_aggregator_set_latency (GstAggregator * self,
2024     GstClockTime min_latency, GstClockTime max_latency)
2025 {
2026   g_return_if_fail (GST_IS_AGGREGATOR (self));
2027   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
2028   g_return_if_fail (max_latency >= min_latency);
2029
2030   GST_OBJECT_LOCK (self);
2031   self->priv->sub_latency_min = min_latency;
2032   self->priv->sub_latency_max = max_latency;
2033   GST_OBJECT_UNLOCK (self);
2034
2035   gst_element_post_message (GST_ELEMENT_CAST (self),
2036       gst_message_new_latency (GST_OBJECT_CAST (self)));
2037 }