66b1362d50e561740ec15b461a77f12abdfb10ea
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Implementers should at least implement
33  *    the aggregate () vmethod.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72 static void gst_aggregator_set_latency_property (GstAggregator * agg,
73     gint64 latency);
74 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
75
76
77 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
78 #define GST_CAT_DEFAULT aggregator_debug
79
80 /* GstAggregatorPad definitions */
81 #define PAD_LOCK_EVENT(pad)   G_STMT_START {                            \
82   GST_TRACE_OBJECT (pad, "Taking EVENT lock from thread %p",            \
83         g_thread_self());                                               \
84   g_mutex_lock(&pad->priv->event_lock);                                 \
85   GST_TRACE_OBJECT (pad, "Took EVENT lock from thread %p",              \
86         g_thread_self());                                               \
87   } G_STMT_END
88
89 #define PAD_UNLOCK_EVENT(pad)  G_STMT_START {                           \
90   GST_TRACE_OBJECT (pad, "Releasing EVENT lock from thread %p",         \
91         g_thread_self());                                               \
92   g_mutex_unlock(&pad->priv->event_lock);                               \
93   GST_TRACE_OBJECT (pad, "Release EVENT lock from thread %p",           \
94         g_thread_self());                                               \
95   } G_STMT_END
96
97
98 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
99   GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p",                \
100         g_thread_self());                                               \
101   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
102       &(pad->priv->event_lock));                                        \
103   GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p",           \
104         g_thread_self());                                               \
105   } G_STMT_END
106
107 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
108   GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p",               \
109         g_thread_self());                                              \
110   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
111   } G_STMT_END
112
113 #define GST_AGGREGATOR_SETCAPS_LOCK(self)   G_STMT_START {        \
114   GST_TRACE_OBJECT (self, "Taking SETCAPS lock from thread %p",   \
115         g_thread_self());                                         \
116   g_mutex_lock(&self->priv->setcaps_lock);                        \
117   GST_TRACE_OBJECT (self, "Took SETCAPS lock from thread %p",     \
118         g_thread_self());                                         \
119   } G_STMT_END
120
121 #define GST_AGGREGATOR_SETCAPS_UNLOCK(self)   G_STMT_START {        \
122   GST_TRACE_OBJECT (self, "Releasing SETCAPS lock from thread %p",  \
123         g_thread_self());                                           \
124   g_mutex_unlock(&self->priv->setcaps_lock);                        \
125   GST_TRACE_OBJECT (self, "Took SETCAPS lock from thread %p",       \
126         g_thread_self());                                           \
127   } G_STMT_END
128
129 #define PAD_STREAM_LOCK(pad)   G_STMT_START {                           \
130   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
131         g_thread_self());                                               \
132   g_mutex_lock(&pad->priv->stream_lock);                                \
133   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
134         g_thread_self());                                               \
135   } G_STMT_END
136
137 #define PAD_STREAM_UNLOCK(pad)  G_STMT_START {                          \
138   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
139         g_thread_self());                                               \
140   g_mutex_unlock(&pad->priv->stream_lock);                              \
141   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
142         g_thread_self());                                               \
143   } G_STMT_END
144
145 #define SRC_STREAM_LOCK(self)   G_STMT_START {                             \
146   GST_TRACE_OBJECT (self, "Taking src STREAM lock from thread %p",         \
147         g_thread_self());                                                  \
148   g_mutex_lock(&self->priv->src_lock);                                     \
149   GST_TRACE_OBJECT (self, "Took src STREAM lock from thread %p",           \
150         g_thread_self());                                                  \
151   } G_STMT_END
152
153 #define SRC_STREAM_UNLOCK(self)  G_STMT_START {                            \
154   GST_TRACE_OBJECT (self, "Releasing src STREAM lock from thread %p",      \
155         g_thread_self());                                                  \
156   g_mutex_unlock(&self->priv->src_lock);                                   \
157   GST_TRACE_OBJECT (self, "Released src STREAM lock from thread %p",       \
158         g_thread_self());                                                  \
159   } G_STMT_END
160
161 #define SRC_STREAM_WAIT(self) G_STMT_START {                               \
162   GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p",             \
163         g_thread_self());                                                  \
164   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));           \
165   GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p",        \
166         g_thread_self());                                                  \
167   } G_STMT_END
168
169 #define SRC_STREAM_BROADCAST_UNLOCKED(self) G_STMT_START {                 \
170     GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",           \
171         g_thread_self());                                                  \
172     if (self->priv->aggregate_id)                                          \
173       gst_clock_id_unschedule (self->priv->aggregate_id);                  \
174     g_cond_broadcast(&(self->priv->src_cond));                             \
175   } G_STMT_END
176
177 #define SRC_STREAM_BROADCAST(self) G_STMT_START {                          \
178     SRC_STREAM_LOCK (self);                                                \
179     SRC_STREAM_BROADCAST_UNLOCKED (self);                                  \
180     SRC_STREAM_UNLOCK (self);                                              \
181   } G_STMT_END
182
183 struct _GstAggregatorPadPrivate
184 {
185   gboolean pending_flush_start;
186   gboolean pending_flush_stop;
187   gboolean pending_eos;
188   gboolean flushing;
189
190   GMutex event_lock;
191   GCond event_cond;
192
193   GMutex stream_lock;
194 };
195
196 static gboolean
197 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
198 {
199   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
200
201   aggpad->eos = FALSE;
202   aggpad->priv->flushing = FALSE;
203
204   if (klass->flush)
205     return klass->flush (aggpad, agg);
206
207   return TRUE;
208 }
209
210 /*************************************
211  * GstAggregator implementation  *
212  *************************************/
213 static GstElementClass *aggregator_parent_class = NULL;
214
215 struct _GstAggregatorPrivate
216 {
217   gint padcount;
218
219   /* Our state is >= PAUSED */
220   gboolean running;
221
222
223   gint seqnum;
224   gboolean send_stream_start;
225   gboolean send_segment;
226   gboolean flush_seeking;
227   gboolean pending_flush_start;
228   gboolean send_eos;
229   GstFlowReturn flow_return;
230
231   GstCaps *srccaps;
232
233   GstTagList *tags;
234   gboolean tags_changed;
235
236   /* Lock to prevent two src setcaps from happening at the same time  */
237   GMutex setcaps_lock;
238
239   gboolean latency_live;
240   GstClockTime latency_min;
241   GstClockTime latency_max;
242
243   GstClockTime sub_latency_min;
244   GstClockTime sub_latency_max;
245
246   /* aggregate */
247   GstClockID aggregate_id;
248   GMutex src_lock;
249   GCond src_cond;
250
251   /* properties */
252   gint64 latency;
253 };
254
255 typedef struct
256 {
257   GstEvent *event;
258   gboolean result;
259   gboolean flush;
260
261   gboolean one_actually_seeked;
262 } EventData;
263
264 #define DEFAULT_LATENCY        0
265
266 enum
267 {
268   PROP_0,
269   PROP_LATENCY,
270   PROP_LAST
271 };
272
273 /**
274  * gst_aggregator_iterate_sinkpads:
275  * @self: The #GstAggregator
276  * @func: The function to call.
277  * @user_data: The data to pass to @func.
278  *
279  * Iterate the sinkpads of aggregator to call a function on them.
280  *
281  * This method guarantees that @func will be called only once for each
282  * sink pad.
283  */
284 gboolean
285 gst_aggregator_iterate_sinkpads (GstAggregator * self,
286     GstAggregatorPadForeachFunc func, gpointer user_data)
287 {
288   gboolean result = FALSE;
289   GstIterator *iter;
290   gboolean done = FALSE;
291   GValue item = { 0, };
292   GList *seen_pads = NULL;
293
294   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
295
296   if (!iter)
297     goto no_iter;
298
299   while (!done) {
300     switch (gst_iterator_next (iter, &item)) {
301       case GST_ITERATOR_OK:
302       {
303         GstAggregatorPad *pad;
304
305         pad = g_value_get_object (&item);
306
307         /* if already pushed, skip. FIXME, find something faster to tag pads */
308         if (pad == NULL || g_list_find (seen_pads, pad)) {
309           g_value_reset (&item);
310           break;
311         }
312
313         GST_LOG_OBJECT (pad, "calling function %s on pad",
314             GST_DEBUG_FUNCPTR_NAME (func));
315
316         result = func (self, pad, user_data);
317
318         done = !result;
319
320         seen_pads = g_list_prepend (seen_pads, pad);
321
322         g_value_reset (&item);
323         break;
324       }
325       case GST_ITERATOR_RESYNC:
326         gst_iterator_resync (iter);
327         break;
328       case GST_ITERATOR_ERROR:
329         GST_ERROR_OBJECT (self,
330             "Could not iterate over internally linked pads");
331         done = TRUE;
332         break;
333       case GST_ITERATOR_DONE:
334         done = TRUE;
335         break;
336     }
337   }
338   g_value_unset (&item);
339   gst_iterator_free (iter);
340
341   if (seen_pads == NULL) {
342     GST_DEBUG_OBJECT (self, "No pad seen");
343     return FALSE;
344   }
345
346   g_list_free (seen_pads);
347
348 no_iter:
349   return result;
350 }
351
352 static inline gboolean
353 gst_aggregator_check_all_pads_with_data_or_eos (GstAggregator * self,
354     GstAggregatorPad * aggpad, gpointer user_data)
355 {
356   if (aggpad->buffer || aggpad->eos) {
357     return TRUE;
358   }
359
360   GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
361
362   return FALSE;
363 }
364
365 static void
366 gst_aggregator_reset_flow_values (GstAggregator * self)
367 {
368   self->priv->flow_return = GST_FLOW_FLUSHING;
369   self->priv->send_stream_start = TRUE;
370   self->priv->send_segment = TRUE;
371   gst_segment_init (&self->segment, GST_FORMAT_TIME);
372 }
373
374 static inline void
375 gst_aggregator_push_mandatory_events (GstAggregator * self)
376 {
377   GstAggregatorPrivate *priv = self->priv;
378
379   if (g_atomic_int_get (&self->priv->send_stream_start)) {
380     gchar s_id[32];
381
382     GST_INFO_OBJECT (self, "pushing stream start");
383     /* stream-start (FIXME: create id based on input ids) */
384     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
385     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
386       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
387     }
388     g_atomic_int_set (&self->priv->send_stream_start, FALSE);
389   }
390
391   if (self->priv->srccaps) {
392
393     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
394         self->priv->srccaps);
395     if (!gst_pad_push_event (self->srcpad,
396             gst_event_new_caps (self->priv->srccaps))) {
397       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
398     }
399     gst_caps_unref (self->priv->srccaps);
400     self->priv->srccaps = NULL;
401   }
402
403   if (g_atomic_int_get (&self->priv->send_segment)) {
404     if (!g_atomic_int_get (&self->priv->flush_seeking)) {
405       GstEvent *segev = gst_event_new_segment (&self->segment);
406
407       if (!self->priv->seqnum)
408         self->priv->seqnum = gst_event_get_seqnum (segev);
409       else
410         gst_event_set_seqnum (segev, self->priv->seqnum);
411
412       GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
413       gst_pad_push_event (self->srcpad, segev);
414       g_atomic_int_set (&self->priv->send_segment, FALSE);
415     }
416   }
417
418   if (priv->tags && priv->tags_changed) {
419     gst_pad_push_event (self->srcpad,
420         gst_event_new_tag (gst_tag_list_ref (priv->tags)));
421     priv->tags_changed = FALSE;
422   }
423 }
424
425 /**
426  * gst_aggregator_set_src_caps:
427  * @self: The #GstAggregator
428  * @caps: The #GstCaps to set on the src pad.
429  *
430  * Sets the caps to be used on the src pad.
431  */
432 void
433 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
434 {
435   GST_AGGREGATOR_SETCAPS_LOCK (self);
436   gst_caps_replace (&self->priv->srccaps, caps);
437   gst_aggregator_push_mandatory_events (self);
438   GST_AGGREGATOR_SETCAPS_UNLOCK (self);
439 }
440
441 /**
442  * gst_aggregator_finish_buffer:
443  * @self: The #GstAggregator
444  * @buffer: the #GstBuffer to push.
445  *
446  * This method will take care of sending mandatory events before pushing
447  * the provided buffer.
448  */
449 GstFlowReturn
450 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
451 {
452   gst_aggregator_push_mandatory_events (self);
453
454   if (!g_atomic_int_get (&self->priv->flush_seeking) &&
455       gst_pad_is_active (self->srcpad)) {
456     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
457     return gst_pad_push (self->srcpad, buffer);
458   } else {
459     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
460         g_atomic_int_get (&self->priv->flush_seeking),
461         gst_pad_is_active (self->srcpad));
462     gst_buffer_unref (buffer);
463     return GST_FLOW_OK;
464   }
465 }
466
467 static void
468 gst_aggregator_push_eos (GstAggregator * self)
469 {
470   GstEvent *event;
471   gst_aggregator_push_mandatory_events (self);
472
473   self->priv->send_eos = FALSE;
474   event = gst_event_new_eos ();
475   gst_event_set_seqnum (event, self->priv->seqnum);
476   gst_pad_push_event (self->srcpad, event);
477 }
478
479 static GstClockTime
480 gst_aggregator_get_next_time (GstAggregator * self)
481 {
482   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
483
484   if (klass->get_next_time)
485     return klass->get_next_time (self);
486
487   return GST_CLOCK_TIME_NONE;
488 }
489
490 /* called with the src STREAM lock */
491 static gboolean
492 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
493 {
494   GstClockTime latency_max, latency_min;
495   GstClockTime start;
496   gboolean live, res;
497
498   *timeout = FALSE;
499
500   SRC_STREAM_LOCK (self);
501
502   gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
503
504   if (gst_aggregator_iterate_sinkpads (self,
505           gst_aggregator_check_all_pads_with_data_or_eos, NULL)) {
506     GST_DEBUG_OBJECT (self, "all pads have data");
507     SRC_STREAM_UNLOCK (self);
508
509     return TRUE;
510   }
511
512   /* Before waiting, check if we're actually still running */
513   if (!self->priv->running || !self->priv->send_eos) {
514     SRC_STREAM_UNLOCK (self);
515
516     return FALSE;
517   }
518
519   start = gst_aggregator_get_next_time (self);
520
521   if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
522       || !GST_CLOCK_TIME_IS_VALID (start)) {
523     /* We wake up here when something happened, and below
524      * then check if we're ready now. If we return FALSE,
525      * we will be directly called again.
526      */
527     SRC_STREAM_WAIT (self);
528   } else {
529     GstClockTime base_time, time;
530     GstClock *clock;
531     GstClockReturn status;
532
533     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
534         GST_TIME_ARGS (start));
535
536     GST_OBJECT_LOCK (self);
537     base_time = GST_ELEMENT_CAST (self)->base_time;
538     clock = GST_ELEMENT_CLOCK (self);
539     if (clock)
540       gst_object_ref (clock);
541     GST_OBJECT_UNLOCK (self);
542
543     time = base_time + start;
544
545     if (GST_CLOCK_TIME_IS_VALID (latency_min)) {
546       time += latency_min;
547     } else {
548       time += self->priv->latency;
549     }
550
551     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
552         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
553         " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT
554         " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time),
555         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
556         GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max),
557         GST_TIME_ARGS (latency_min),
558         GST_TIME_ARGS (gst_clock_get_time (clock)));
559
560     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
561     gst_object_unref (clock);
562     SRC_STREAM_UNLOCK (self);
563
564     status = gst_clock_id_wait (self->priv->aggregate_id, NULL);
565
566     SRC_STREAM_LOCK (self);
567     if (self->priv->aggregate_id) {
568       gst_clock_id_unref (self->priv->aggregate_id);
569       self->priv->aggregate_id = NULL;
570     }
571
572     GST_DEBUG_OBJECT (self, "clock returned %d", status);
573
574     /* we timed out */
575     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
576       SRC_STREAM_UNLOCK (self);
577       *timeout = TRUE;
578       return TRUE;
579     }
580   }
581
582   res = gst_aggregator_iterate_sinkpads (self,
583       gst_aggregator_check_all_pads_with_data_or_eos, NULL);
584   SRC_STREAM_UNLOCK (self);
585
586   return res;
587 }
588
589 static void
590 gst_aggregator_aggregate_func (GstAggregator * self)
591 {
592   GstAggregatorPrivate *priv = self->priv;
593   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
594   gboolean timeout = FALSE;
595
596   if (self->priv->running == FALSE) {
597     GST_DEBUG_OBJECT (self, "Not running anymore");
598     return;
599   }
600
601   GST_LOG_OBJECT (self, "Checking aggregate");
602   while (priv->send_eos && priv->running) {
603     if (!gst_aggregator_wait_and_check (self, &timeout))
604       continue;
605
606     GST_TRACE_OBJECT (self, "Actually aggregating!");
607
608     priv->flow_return = klass->aggregate (self, timeout);
609
610     if (priv->flow_return == GST_FLOW_EOS) {
611       gst_aggregator_push_eos (self);
612     }
613
614     if (priv->flow_return == GST_FLOW_FLUSHING &&
615         g_atomic_int_get (&priv->flush_seeking))
616       priv->flow_return = GST_FLOW_OK;
617
618     GST_LOG_OBJECT (self, "flow return is %s",
619         gst_flow_get_name (priv->flow_return));
620
621     if (priv->flow_return != GST_FLOW_OK)
622       break;
623   }
624 }
625
626 static gboolean
627 gst_aggregator_start (GstAggregator * self)
628 {
629   GstAggregatorClass *klass;
630   gboolean result;
631
632   self->priv->running = TRUE;
633   self->priv->send_stream_start = TRUE;
634   self->priv->send_segment = TRUE;
635   self->priv->send_eos = TRUE;
636   self->priv->srccaps = NULL;
637   self->priv->flow_return = GST_FLOW_OK;
638
639   klass = GST_AGGREGATOR_GET_CLASS (self);
640
641   if (klass->start)
642     result = klass->start (self);
643   else
644     result = TRUE;
645
646   return result;
647 }
648
649 static gboolean
650 _check_pending_flush_stop (GstAggregatorPad * pad)
651 {
652   return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
653 }
654
655 static gboolean
656 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
657 {
658   gboolean res = TRUE;
659
660   GST_INFO_OBJECT (self, "%s srcpad task",
661       flush_start ? "Pausing" : "Stopping");
662
663   self->priv->running = FALSE;
664   SRC_STREAM_BROADCAST (self);
665
666   if (flush_start) {
667     res = gst_pad_push_event (self->srcpad, flush_start);
668   }
669
670   gst_pad_stop_task (self->srcpad);
671   SRC_STREAM_BROADCAST (self);
672
673   return res;
674 }
675
676 static void
677 gst_aggregator_start_srcpad_task (GstAggregator * self)
678 {
679   GST_INFO_OBJECT (self, "Starting srcpad task");
680
681   self->priv->running = TRUE;
682   gst_pad_start_task (GST_PAD (self->srcpad),
683       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
684 }
685
686 static GstFlowReturn
687 gst_aggregator_flush (GstAggregator * self)
688 {
689   GstFlowReturn ret = GST_FLOW_OK;
690   GstAggregatorPrivate *priv = self->priv;
691   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
692
693   GST_DEBUG_OBJECT (self, "Flushing everything");
694   g_atomic_int_set (&priv->send_segment, TRUE);
695   g_atomic_int_set (&priv->flush_seeking, FALSE);
696   g_atomic_int_set (&priv->tags_changed, FALSE);
697   if (klass->flush)
698     ret = klass->flush (self);
699
700   return ret;
701 }
702
703 static gboolean
704 gst_aggregator_all_flush_stop_received (GstAggregator * self)
705 {
706   GList *tmp;
707   GstAggregatorPad *tmppad;
708
709   GST_OBJECT_LOCK (self);
710   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
711     tmppad = (GstAggregatorPad *) tmp->data;
712
713     if (_check_pending_flush_stop (tmppad) == FALSE) {
714       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
715           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
716       GST_OBJECT_UNLOCK (self);
717       return FALSE;
718     }
719   }
720   GST_OBJECT_UNLOCK (self);
721
722   return TRUE;
723 }
724
725 static void
726 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
727     GstEvent * event)
728 {
729   GstBuffer *tmpbuf;
730   GstAggregatorPrivate *priv = self->priv;
731   GstAggregatorPadPrivate *padpriv = aggpad->priv;
732
733   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
734   /*  Remove pad buffer and wake up the streaming thread */
735   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
736   gst_buffer_replace (&tmpbuf, NULL);
737   PAD_STREAM_LOCK (aggpad);
738   if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
739           TRUE, FALSE) == TRUE) {
740     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
741     g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
742   }
743
744   if (g_atomic_int_get (&priv->flush_seeking)) {
745     /* If flush_seeking we forward the first FLUSH_START */
746     if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
747             TRUE, FALSE) == TRUE) {
748
749       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
750       gst_aggregator_stop_srcpad_task (self, event);
751       priv->flow_return = GST_FLOW_OK;
752
753       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
754       GST_PAD_STREAM_LOCK (self->srcpad);
755       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
756       event = NULL;
757     } else {
758       gst_event_unref (event);
759     }
760   } else {
761     gst_event_unref (event);
762   }
763   PAD_STREAM_UNLOCK (aggpad);
764
765   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
766   gst_buffer_replace (&tmpbuf, NULL);
767 }
768
769 /* GstAggregator vmethods default implementations */
770 static gboolean
771 gst_aggregator_default_sink_event (GstAggregator * self,
772     GstAggregatorPad * aggpad, GstEvent * event)
773 {
774   gboolean res = TRUE;
775   GstPad *pad = GST_PAD (aggpad);
776   GstAggregatorPrivate *priv = self->priv;
777
778   switch (GST_EVENT_TYPE (event)) {
779     case GST_EVENT_FLUSH_START:
780     {
781       gst_aggregator_flush_start (self, aggpad, event);
782       /* We forward only in one case: right after flush_seeking */
783       event = NULL;
784       goto eat;
785     }
786     case GST_EVENT_FLUSH_STOP:
787     {
788       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
789
790       gst_aggregator_pad_flush (aggpad, self);
791       if (g_atomic_int_get (&priv->flush_seeking)) {
792         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
793
794         if (g_atomic_int_get (&priv->flush_seeking)) {
795           if (gst_aggregator_all_flush_stop_received (self)) {
796             /* That means we received FLUSH_STOP/FLUSH_STOP on
797              * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
798             gst_aggregator_flush (self);
799             gst_pad_push_event (self->srcpad, event);
800             priv->send_eos = TRUE;
801             event = NULL;
802             SRC_STREAM_BROADCAST (self);
803
804             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
805             GST_PAD_STREAM_UNLOCK (self->srcpad);
806             gst_aggregator_start_srcpad_task (self);
807           }
808         }
809       }
810
811       /* We never forward the event */
812       goto eat;
813     }
814     case GST_EVENT_EOS:
815     {
816       GST_DEBUG_OBJECT (aggpad, "EOS");
817
818       /* We still have a buffer, and we don't want the subclass to have to
819        * check for it. Mark pending_eos, eos will be set when steal_buffer is
820        * called
821        */
822       PAD_LOCK_EVENT (aggpad);
823       if (!aggpad->buffer) {
824         aggpad->eos = TRUE;
825       } else {
826         aggpad->priv->pending_eos = TRUE;
827       }
828       PAD_UNLOCK_EVENT (aggpad);
829
830       SRC_STREAM_BROADCAST (self);
831       goto eat;
832     }
833     case GST_EVENT_SEGMENT:
834     {
835       PAD_LOCK_EVENT (aggpad);
836       gst_event_copy_segment (event, &aggpad->segment);
837       self->priv->seqnum = gst_event_get_seqnum (event);
838       PAD_UNLOCK_EVENT (aggpad);
839       goto eat;
840     }
841     case GST_EVENT_STREAM_START:
842     {
843       goto eat;
844     }
845     case GST_EVENT_TAG:
846     {
847       GstTagList *tags;
848
849       gst_event_parse_tag (event, &tags);
850
851       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
852         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
853         gst_event_unref (event);
854         event = NULL;
855         goto eat;
856       }
857       break;
858     }
859     default:
860     {
861       break;
862     }
863   }
864
865   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
866   return gst_pad_event_default (pad, GST_OBJECT (self), event);
867
868 eat:
869   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
870   if (event)
871     gst_event_unref (event);
872
873   return res;
874 }
875
876 static inline gboolean
877 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
878     gpointer unused_udata)
879 {
880   gst_aggregator_pad_flush (pad, self);
881
882   return TRUE;
883 }
884
885 static gboolean
886 gst_aggregator_stop (GstAggregator * agg)
887 {
888   GstAggregatorClass *klass;
889   gboolean result;
890
891   gst_aggregator_reset_flow_values (agg);
892
893   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
894
895   if (agg->priv->tags)
896     gst_tag_list_unref (agg->priv->tags);
897   agg->priv->tags = NULL;
898
899   klass = GST_AGGREGATOR_GET_CLASS (agg);
900
901   if (klass->stop)
902     result = klass->stop (agg);
903   else
904     result = TRUE;
905
906   return result;
907 }
908
909 /* GstElement vmethods implementations */
910 static GstStateChangeReturn
911 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
912 {
913   GstStateChangeReturn ret;
914   GstAggregator *self = GST_AGGREGATOR (element);
915
916   switch (transition) {
917     case GST_STATE_CHANGE_READY_TO_PAUSED:
918       if (!gst_aggregator_start (self))
919         goto error_start;
920       break;
921     default:
922       break;
923   }
924
925   if ((ret =
926           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
927               transition)) == GST_STATE_CHANGE_FAILURE)
928     goto failure;
929
930
931   switch (transition) {
932     case GST_STATE_CHANGE_PAUSED_TO_READY:
933       if (!gst_aggregator_stop (self)) {
934         /* What to do in this case? Error out? */
935         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
936       }
937       break;
938     default:
939       break;
940   }
941
942   return ret;
943
944 /* ERRORS */
945 failure:
946   {
947     GST_ERROR_OBJECT (element, "parent failed state change");
948     return ret;
949   }
950 error_start:
951   {
952     GST_ERROR_OBJECT (element, "Subclass failed to start");
953     return GST_STATE_CHANGE_FAILURE;
954   }
955 }
956
957 static void
958 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
959 {
960   GstAggregator *self = GST_AGGREGATOR (element);
961   GstBuffer *tmpbuf;
962
963   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
964
965   GST_INFO_OBJECT (pad, "Removing pad");
966
967   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
968   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
969   gst_buffer_replace (&tmpbuf, NULL);
970   gst_element_remove_pad (element, pad);
971
972   SRC_STREAM_BROADCAST (self);
973 }
974
975 static GstPad *
976 gst_aggregator_request_new_pad (GstElement * element,
977     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
978 {
979   GstAggregator *self;
980   GstAggregatorPad *agg_pad;
981
982   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
983   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
984
985   self = GST_AGGREGATOR (element);
986
987   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
988     gint serial = 0;
989     gchar *name = NULL;
990
991     GST_OBJECT_LOCK (element);
992     if (req_name == NULL || strlen (req_name) < 6
993         || !g_str_has_prefix (req_name, "sink_")) {
994       /* no name given when requesting the pad, use next available int */
995       priv->padcount++;
996     } else {
997       /* parse serial number from requested padname */
998       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
999       if (serial >= priv->padcount)
1000         priv->padcount = serial;
1001     }
1002
1003     name = g_strdup_printf ("sink_%u", priv->padcount);
1004     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1005         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1006     g_free (name);
1007
1008     GST_OBJECT_UNLOCK (element);
1009
1010   } else {
1011     return NULL;
1012   }
1013
1014   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1015
1016   if (priv->running)
1017     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1018
1019   /* add the pad to the element */
1020   gst_element_add_pad (element, GST_PAD (agg_pad));
1021
1022   return GST_PAD (agg_pad);
1023 }
1024
1025 typedef struct
1026 {
1027   GstClockTime min, max;
1028   gboolean live;
1029 } LatencyData;
1030
1031 static gboolean
1032 gst_aggregator_query_sink_latency_foreach (GstAggregator * self,
1033     GstAggregatorPad * pad, gpointer user_data)
1034 {
1035   LatencyData *data = user_data;
1036   GstClockTime min, max;
1037   GstQuery *query;
1038   gboolean live, res;
1039
1040   query = gst_query_new_latency ();
1041   res = gst_pad_peer_query (GST_PAD_CAST (pad), query);
1042
1043   if (res) {
1044     gst_query_parse_latency (query, &live, &min, &max);
1045
1046     GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT
1047         " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1048
1049     if (min != GST_CLOCK_TIME_NONE && min > data->min)
1050       data->min = min;
1051
1052     if (max != GST_CLOCK_TIME_NONE &&
1053         ((data->max != GST_CLOCK_TIME_NONE && max < data->max) ||
1054             (data->max == GST_CLOCK_TIME_NONE)))
1055       data->max = max;
1056
1057     data->live |= live;
1058   }
1059
1060   gst_query_unref (query);
1061
1062   return TRUE;
1063 }
1064
1065 /**
1066  * gst_aggregator_get_latency:
1067  * @self: a #GstAggregator
1068  * @live: (out) (allow-none): whether @self is live
1069  * @min_latency: (out) (allow-none): the configured minimum latency of @self
1070  * @max_latency: (out) (allow-none): the configured maximum latency of @self
1071  *
1072  * Retreives the latency values reported by @self in response to the latency
1073  * query.
1074  *
1075  * Typically only called by subclasses.
1076  */
1077 void
1078 gst_aggregator_get_latency (GstAggregator * self, gboolean * live,
1079     GstClockTime * min_latency, GstClockTime * max_latency)
1080 {
1081   GstClockTime our_latency;
1082   GstClockTime min, max;
1083
1084   g_return_if_fail (GST_IS_AGGREGATOR (self));
1085
1086   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1087   min = self->priv->latency_min;
1088   max = self->priv->latency_max;
1089
1090   min += self->priv->sub_latency_min;
1091   if (GST_CLOCK_TIME_IS_VALID (max)
1092       && GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max))
1093     max += self->priv->sub_latency_max;
1094
1095   our_latency = self->priv->latency;
1096   if (GST_CLOCK_TIME_IS_VALID (our_latency)) {
1097     min += our_latency;
1098     if (GST_CLOCK_TIME_IS_VALID (max))
1099       max += our_latency;
1100   }
1101
1102   if (live)
1103     *live = self->priv->latency_live;
1104   if (min_latency)
1105     *min_latency = min;
1106   if (max_latency)
1107     *max_latency = max;
1108 }
1109
1110 static gboolean
1111 gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
1112 {
1113   GstClockTime our_latency;
1114   LatencyData data;
1115
1116   data.min = 0;
1117   data.max = GST_CLOCK_TIME_NONE;
1118   data.live = FALSE;
1119
1120   /* query upstream's latency */
1121   SRC_STREAM_LOCK (self);
1122   gst_aggregator_iterate_sinkpads (self,
1123       gst_aggregator_query_sink_latency_foreach, &data);
1124   SRC_STREAM_UNLOCK (self);
1125
1126   our_latency = self->priv->latency;
1127
1128   if (data.live && GST_CLOCK_TIME_IS_VALID (our_latency) &&
1129       our_latency > data.max) {
1130     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1131         ("%s", "Latency too big"),
1132         ("The requested latency value is too big for the current pipeline.  "
1133             "Limiting to %" G_GINT64_FORMAT, data.max));
1134     self->priv->latency = data.max;
1135     /* FIXME: shouldn't we g_object_notify() the change here? */
1136   }
1137
1138   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (data.min))) {
1139     GST_WARNING_OBJECT (self, "Invalid minimum latency, using 0");
1140     data.min = 0;
1141   }
1142
1143   if (G_UNLIKELY (data.min > data.max)) {
1144     GST_WARNING_OBJECT (self, "Minimum latency is greater than maximum latency "
1145         "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT "). "
1146         "Clamping it at the maximum latency", data.min, data.max);
1147     data.min = data.max;
1148   }
1149
1150   self->priv->latency_live = data.live;
1151   self->priv->latency_min = data.min;
1152   self->priv->latency_max = data.max;
1153
1154   /* add our own */
1155   if (GST_CLOCK_TIME_IS_VALID (our_latency)) {
1156     if (GST_CLOCK_TIME_IS_VALID (data.min))
1157       data.min += our_latency;
1158     if (GST_CLOCK_TIME_IS_VALID (data.max))
1159       data.max += our_latency;
1160   }
1161
1162   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_min)
1163       && GST_CLOCK_TIME_IS_VALID (data.min))
1164     data.min += self->priv->sub_latency_min;
1165   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1166       && GST_CLOCK_TIME_IS_VALID (data.max))
1167     data.max += self->priv->sub_latency_max;
1168
1169   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1170       " max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min,
1171       data.max);
1172
1173   gst_query_set_latency (query, data.live, data.min, data.max);
1174
1175   return TRUE;
1176 }
1177
1178 static gboolean
1179 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1180 {
1181   GstAggregator *self = GST_AGGREGATOR (element);
1182
1183   GST_STATE_LOCK (element);
1184   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1185       GST_STATE (element) < GST_STATE_PAUSED) {
1186     gdouble rate;
1187     GstFormat fmt;
1188     GstSeekFlags flags;
1189     GstSeekType start_type, stop_type;
1190     gint64 start, stop;
1191
1192     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1193         &start, &stop_type, &stop);
1194     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1195         stop_type, stop, NULL);
1196
1197     self->priv->seqnum = gst_event_get_seqnum (event);
1198     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1199   }
1200   GST_STATE_UNLOCK (element);
1201
1202
1203   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1204       event);
1205 }
1206
1207 static gboolean
1208 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1209 {
1210   gboolean res = TRUE;
1211
1212   switch (GST_QUERY_TYPE (query)) {
1213     case GST_QUERY_SEEKING:
1214     {
1215       GstFormat format;
1216
1217       /* don't pass it along as some (file)sink might claim it does
1218        * whereas with a collectpads in between that will not likely work */
1219       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1220       gst_query_set_seeking (query, format, FALSE, 0, -1);
1221       res = TRUE;
1222
1223       goto discard;
1224     }
1225     case GST_QUERY_LATENCY:
1226     {
1227       gboolean ret;
1228
1229       ret = gst_aggregator_query_latency (self, query);
1230       /* Wake up the src thread again, due to changed latencies
1231        * or changed live-ness we might have to adjust if we wait
1232        * on a deadline at all and how long.
1233        */
1234       SRC_STREAM_BROADCAST (self);
1235       return ret;
1236     }
1237     default:
1238       break;
1239   }
1240
1241   return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1242
1243 discard:
1244   return res;
1245 }
1246
1247 static gboolean
1248 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1249 {
1250   EventData *evdata = user_data;
1251   gboolean ret = TRUE;
1252   GstPad *peer = gst_pad_get_peer (pad);
1253   GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv;
1254
1255   if (peer) {
1256     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1257     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1258     gst_object_unref (peer);
1259   }
1260
1261   if (ret == FALSE) {
1262     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
1263       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1264
1265     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1266       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1267
1268       if (gst_pad_query (peer, seeking)) {
1269         gboolean seekable;
1270
1271         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1272
1273         if (seekable == FALSE) {
1274           GST_INFO_OBJECT (pad,
1275               "Source not seekable, We failed but it does not matter!");
1276
1277           ret = TRUE;
1278         }
1279       } else {
1280         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1281       }
1282     }
1283
1284     if (evdata->flush) {
1285       padpriv->pending_flush_start = FALSE;
1286       padpriv->pending_flush_stop = FALSE;
1287     }
1288   } else {
1289     evdata->one_actually_seeked = TRUE;
1290   }
1291
1292   evdata->result &= ret;
1293
1294   /* Always send to all pads */
1295   return FALSE;
1296 }
1297
1298 static gboolean
1299 gst_aggregator_set_flush_pending (GstAggregator * self, GstAggregatorPad * pad,
1300     gpointer udata)
1301 {
1302   pad->priv->pending_flush_start = TRUE;
1303   pad->priv->pending_flush_stop = FALSE;
1304
1305   return TRUE;
1306 }
1307
1308 static EventData
1309 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1310     GstEvent * event, gboolean flush)
1311 {
1312   EventData evdata;
1313
1314   evdata.event = event;
1315   evdata.result = TRUE;
1316   evdata.flush = flush;
1317   evdata.one_actually_seeked = FALSE;
1318
1319   /* We first need to set all pads as flushing in a first pass
1320    * as flush_start flush_stop is sometimes sent synchronously
1321    * while we send the seek event */
1322   if (flush)
1323     gst_aggregator_iterate_sinkpads (self,
1324         gst_aggregator_set_flush_pending, NULL);
1325
1326   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
1327
1328   gst_event_unref (event);
1329
1330   return evdata;
1331 }
1332
1333 static gboolean
1334 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1335 {
1336   gdouble rate;
1337   GstFormat fmt;
1338   GstSeekFlags flags;
1339   GstSeekType start_type, stop_type;
1340   gint64 start, stop;
1341   gboolean flush;
1342   EventData evdata;
1343   GstAggregatorPrivate *priv = self->priv;
1344
1345   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1346       &start, &stop_type, &stop);
1347
1348   GST_INFO_OBJECT (self, "starting SEEK");
1349
1350   flush = flags & GST_SEEK_FLAG_FLUSH;
1351
1352   if (flush) {
1353     g_atomic_int_set (&priv->pending_flush_start, TRUE);
1354     g_atomic_int_set (&priv->flush_seeking, TRUE);
1355   }
1356
1357   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1358       stop_type, stop, NULL);
1359
1360   /* forward the seek upstream */
1361   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
1362   event = NULL;
1363
1364   if (!evdata.result || !evdata.one_actually_seeked) {
1365     g_atomic_int_set (&priv->flush_seeking, FALSE);
1366     g_atomic_int_set (&priv->pending_flush_start, FALSE);
1367   }
1368
1369   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1370
1371   return evdata.result;
1372 }
1373
1374 static gboolean
1375 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
1376 {
1377   EventData evdata;
1378   gboolean res = TRUE;
1379
1380   switch (GST_EVENT_TYPE (event)) {
1381     case GST_EVENT_SEEK:
1382     {
1383       gst_event_ref (event);
1384       res = gst_aggregator_do_seek (self, event);
1385       gst_event_unref (event);
1386       event = NULL;
1387       goto done;
1388     }
1389     case GST_EVENT_NAVIGATION:
1390     {
1391       /* navigation is rather pointless. */
1392       res = FALSE;
1393       gst_event_unref (event);
1394       goto done;
1395     }
1396     default:
1397     {
1398       break;
1399     }
1400   }
1401
1402   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE);
1403   res = evdata.result;
1404
1405 done:
1406   return res;
1407 }
1408
1409 static gboolean
1410 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
1411     GstEvent * event)
1412 {
1413   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1414
1415   return klass->src_event (GST_AGGREGATOR (parent), event);
1416 }
1417
1418 static gboolean
1419 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
1420     GstQuery * query)
1421 {
1422   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1423
1424   return klass->src_query (GST_AGGREGATOR (parent), query);
1425 }
1426
1427 static gboolean
1428 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
1429     GstObject * parent, GstPadMode mode, gboolean active)
1430 {
1431   GstAggregator *self = GST_AGGREGATOR (parent);
1432   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1433
1434   if (klass->src_activate) {
1435     if (klass->src_activate (self, mode, active) == FALSE) {
1436       return FALSE;
1437     }
1438   }
1439
1440   if (active == TRUE) {
1441     switch (mode) {
1442       case GST_PAD_MODE_PUSH:
1443       {
1444         GST_INFO_OBJECT (pad, "Activating pad!");
1445         gst_aggregator_start_srcpad_task (self);
1446         return TRUE;
1447       }
1448       default:
1449       {
1450         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1451         return FALSE;
1452       }
1453     }
1454   }
1455
1456   /* deactivating */
1457   GST_INFO_OBJECT (self, "Deactivating srcpad");
1458   gst_aggregator_stop_srcpad_task (self, FALSE);
1459
1460   return TRUE;
1461 }
1462
1463 static gboolean
1464 gst_aggregator_default_sink_query (GstAggregator * self,
1465     GstAggregatorPad * aggpad, GstQuery * query)
1466 {
1467   GstPad *pad = GST_PAD (aggpad);
1468
1469   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1470 }
1471
1472 static void
1473 gst_aggregator_finalize (GObject * object)
1474 {
1475   GstAggregator *self = (GstAggregator *) object;
1476
1477   g_mutex_clear (&self->priv->setcaps_lock);
1478   g_mutex_clear (&self->priv->src_lock);
1479   g_cond_clear (&self->priv->src_cond);
1480
1481   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1482 }
1483
1484 /*
1485  * gst_aggregator_set_latency_property:
1486  * @agg: a #GstAggregator
1487  * @latency: the new latency value.
1488  *
1489  * Sets the new latency value to @latency. This value is used to limit the
1490  * amount of time a pad waits for data to appear before considering the pad
1491  * as unresponsive.
1492  */
1493 static void
1494 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1495 {
1496   gboolean changed;
1497
1498   g_return_if_fail (GST_IS_AGGREGATOR (self));
1499
1500   GST_OBJECT_LOCK (self);
1501
1502   if (self->priv->latency_live && self->priv->latency_max != 0 &&
1503       GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) {
1504     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1505         ("%s", "Latency too big"),
1506         ("The requested latency value is too big for the latency in the "
1507             "current pipeline.  Limiting to %" G_GINT64_FORMAT,
1508             self->priv->latency_max));
1509     latency = self->priv->latency_max;
1510   }
1511
1512   changed = (self->priv->latency != latency);
1513   self->priv->latency = latency;
1514   GST_OBJECT_UNLOCK (self);
1515
1516   if (changed)
1517     gst_element_post_message (GST_ELEMENT_CAST (self),
1518         gst_message_new_latency (GST_OBJECT_CAST (self)));
1519 }
1520
1521 /*
1522  * gst_aggregator_get_latency_property:
1523  * @agg: a #GstAggregator
1524  *
1525  * Gets the latency value. See gst_aggregator_set_latency for
1526  * more details.
1527  *
1528  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
1529  * before a pad is deemed unresponsive. A value of -1 means an
1530  * unlimited time.
1531  */
1532 static gint64
1533 gst_aggregator_get_latency_property (GstAggregator * agg)
1534 {
1535   gint64 res;
1536
1537   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
1538
1539   GST_OBJECT_LOCK (agg);
1540   res = agg->priv->latency;
1541   GST_OBJECT_UNLOCK (agg);
1542
1543   return res;
1544 }
1545
1546 static void
1547 gst_aggregator_set_property (GObject * object, guint prop_id,
1548     const GValue * value, GParamSpec * pspec)
1549 {
1550   GstAggregator *agg = GST_AGGREGATOR (object);
1551
1552   switch (prop_id) {
1553     case PROP_LATENCY:
1554       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
1555       break;
1556     default:
1557       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1558       break;
1559   }
1560 }
1561
1562 static void
1563 gst_aggregator_get_property (GObject * object, guint prop_id,
1564     GValue * value, GParamSpec * pspec)
1565 {
1566   GstAggregator *agg = GST_AGGREGATOR (object);
1567
1568   switch (prop_id) {
1569     case PROP_LATENCY:
1570       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
1571       break;
1572     default:
1573       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1574       break;
1575   }
1576 }
1577
1578 /* GObject vmethods implementations */
1579 static void
1580 gst_aggregator_class_init (GstAggregatorClass * klass)
1581 {
1582   GObjectClass *gobject_class = (GObjectClass *) klass;
1583   GstElementClass *gstelement_class = (GstElementClass *) klass;
1584
1585   aggregator_parent_class = g_type_class_peek_parent (klass);
1586   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1587
1588   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1589       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1590
1591   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1592
1593   klass->sink_event = gst_aggregator_default_sink_event;
1594   klass->sink_query = gst_aggregator_default_sink_query;
1595
1596   klass->src_event = gst_aggregator_default_src_event;
1597   klass->src_query = gst_aggregator_default_src_query;
1598
1599   gstelement_class->request_new_pad =
1600       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
1601   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
1602   gstelement_class->release_pad =
1603       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
1604   gstelement_class->change_state =
1605       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
1606
1607   gobject_class->set_property = gst_aggregator_set_property;
1608   gobject_class->get_property = gst_aggregator_get_property;
1609   gobject_class->finalize = gst_aggregator_finalize;
1610
1611   g_object_class_install_property (gobject_class, PROP_LATENCY,
1612       g_param_spec_int64 ("latency", "Buffer latency",
1613           "Additional latency in live mode to allow upstream "
1614           "to take longer to produce buffers for the current "
1615           "position", 0,
1616           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
1617           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1618 }
1619
1620 static void
1621 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1622 {
1623   GstPadTemplate *pad_template;
1624   GstAggregatorPrivate *priv;
1625
1626   g_return_if_fail (klass->aggregate != NULL);
1627
1628   self->priv =
1629       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1630       GstAggregatorPrivate);
1631
1632   priv = self->priv;
1633
1634   pad_template =
1635       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1636   g_return_if_fail (pad_template != NULL);
1637
1638   priv->padcount = -1;
1639   priv->tags_changed = FALSE;
1640
1641   self->priv->latency_live = FALSE;
1642   self->priv->latency_min = self->priv->sub_latency_min = 0;
1643   self->priv->latency_max = self->priv->sub_latency_max = GST_CLOCK_TIME_NONE;
1644   gst_aggregator_reset_flow_values (self);
1645
1646   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1647
1648   gst_pad_set_event_function (self->srcpad,
1649       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
1650   gst_pad_set_query_function (self->srcpad,
1651       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
1652   gst_pad_set_activatemode_function (self->srcpad,
1653       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
1654
1655   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1656
1657   self->priv->latency = DEFAULT_LATENCY;
1658
1659   g_mutex_init (&self->priv->setcaps_lock);
1660   g_mutex_init (&self->priv->src_lock);
1661   g_cond_init (&self->priv->src_cond);
1662 }
1663
1664 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1665  * method to get to the padtemplates */
1666 GType
1667 gst_aggregator_get_type (void)
1668 {
1669   static volatile gsize type = 0;
1670
1671   if (g_once_init_enter (&type)) {
1672     GType _type;
1673     static const GTypeInfo info = {
1674       sizeof (GstAggregatorClass),
1675       NULL,
1676       NULL,
1677       (GClassInitFunc) gst_aggregator_class_init,
1678       NULL,
1679       NULL,
1680       sizeof (GstAggregator),
1681       0,
1682       (GInstanceInitFunc) gst_aggregator_init,
1683     };
1684
1685     _type = g_type_register_static (GST_TYPE_ELEMENT,
1686         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1687     g_once_init_leave (&type, _type);
1688   }
1689   return type;
1690 }
1691
1692 static GstFlowReturn
1693 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1694 {
1695   GstBuffer *actual_buf = buffer;
1696   GstAggregator *self = GST_AGGREGATOR (object);
1697   GstAggregatorPrivate *priv = self->priv;
1698   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1699   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1700
1701   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1702
1703   PAD_STREAM_LOCK (aggpad);
1704
1705   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1706     goto flushing;
1707
1708   if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
1709     goto eos;
1710
1711   PAD_LOCK_EVENT (aggpad);
1712
1713   while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1714     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1715     PAD_WAIT_EVENT (aggpad);
1716   }
1717   PAD_UNLOCK_EVENT (aggpad);
1718
1719   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1720     goto flushing;
1721
1722   if (aggclass->clip) {
1723     aggclass->clip (self, aggpad, buffer, &actual_buf);
1724   }
1725
1726   PAD_LOCK_EVENT (aggpad);
1727   if (aggpad->buffer)
1728     gst_buffer_unref (aggpad->buffer);
1729   aggpad->buffer = actual_buf;
1730   PAD_UNLOCK_EVENT (aggpad);
1731   PAD_STREAM_UNLOCK (aggpad);
1732
1733   SRC_STREAM_LOCK (self);
1734   if (gst_aggregator_iterate_sinkpads (self,
1735           gst_aggregator_check_all_pads_with_data_or_eos, NULL))
1736     SRC_STREAM_BROADCAST_UNLOCKED (self);
1737   SRC_STREAM_UNLOCK (self);
1738
1739   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1740
1741   return priv->flow_return;
1742
1743 flushing:
1744   PAD_STREAM_UNLOCK (aggpad);
1745
1746   gst_buffer_unref (buffer);
1747   GST_DEBUG_OBJECT (aggpad, "We are flushing");
1748
1749   return GST_FLOW_FLUSHING;
1750
1751 eos:
1752   PAD_STREAM_UNLOCK (aggpad);
1753
1754   gst_buffer_unref (buffer);
1755   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1756
1757   return GST_FLOW_EOS;
1758 }
1759
1760 static gboolean
1761 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
1762     GstQuery * query)
1763 {
1764   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1765   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1766
1767   if (GST_QUERY_IS_SERIALIZED (query)) {
1768     PAD_LOCK_EVENT (aggpad);
1769
1770     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
1771       PAD_UNLOCK_EVENT (aggpad);
1772       goto flushing;
1773     }
1774
1775     while (aggpad->buffer
1776         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1777       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1778       PAD_WAIT_EVENT (aggpad);
1779     }
1780     PAD_UNLOCK_EVENT (aggpad);
1781
1782     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1783       goto flushing;
1784   }
1785
1786   return klass->sink_query (GST_AGGREGATOR (parent),
1787       GST_AGGREGATOR_PAD (pad), query);
1788
1789 flushing:
1790   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
1791   return FALSE;
1792 }
1793
1794 static gboolean
1795 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
1796     GstEvent * event)
1797 {
1798   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1799   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1800
1801   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
1802       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
1803     PAD_LOCK_EVENT (aggpad);
1804
1805     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1806         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
1807       PAD_UNLOCK_EVENT (aggpad);
1808       goto flushing;
1809     }
1810
1811     while (aggpad->buffer
1812         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1813       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1814       PAD_WAIT_EVENT (aggpad);
1815     }
1816     PAD_UNLOCK_EVENT (aggpad);
1817
1818     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1819         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
1820       goto flushing;
1821   }
1822
1823   return klass->sink_event (GST_AGGREGATOR (parent),
1824       GST_AGGREGATOR_PAD (pad), event);
1825
1826 flushing:
1827   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
1828   if (GST_EVENT_IS_STICKY (event))
1829     gst_pad_store_sticky_event (pad, event);
1830   gst_event_unref (event);
1831   return FALSE;
1832 }
1833
1834 static gboolean
1835 gst_aggregator_pad_activate_mode_func (GstPad * pad,
1836     GstObject * parent, GstPadMode mode, gboolean active)
1837 {
1838   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1839
1840   if (active == FALSE) {
1841     PAD_LOCK_EVENT (aggpad);
1842     g_atomic_int_set (&aggpad->priv->flushing, TRUE);
1843     gst_buffer_replace (&aggpad->buffer, NULL);
1844     PAD_BROADCAST_EVENT (aggpad);
1845     PAD_UNLOCK_EVENT (aggpad);
1846   } else {
1847     g_atomic_int_set (&aggpad->priv->flushing, FALSE);
1848     PAD_LOCK_EVENT (aggpad);
1849     PAD_BROADCAST_EVENT (aggpad);
1850     PAD_UNLOCK_EVENT (aggpad);
1851   }
1852
1853   return TRUE;
1854 }
1855
1856 /***********************************
1857  * GstAggregatorPad implementation  *
1858  ************************************/
1859 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1860
1861 static void
1862 gst_aggregator_pad_constructed (GObject * object)
1863 {
1864   GstPad *pad = GST_PAD (object);
1865
1866   gst_pad_set_chain_function (pad,
1867       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
1868   gst_pad_set_event_function (pad,
1869       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func));
1870   gst_pad_set_query_function (pad,
1871       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
1872   gst_pad_set_activatemode_function (pad,
1873       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
1874 }
1875
1876 static void
1877 gst_aggregator_pad_finalize (GObject * object)
1878 {
1879   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1880
1881   g_mutex_clear (&pad->priv->event_lock);
1882   g_cond_clear (&pad->priv->event_cond);
1883   g_mutex_clear (&pad->priv->stream_lock);
1884
1885   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
1886 }
1887
1888 static void
1889 gst_aggregator_pad_dispose (GObject * object)
1890 {
1891   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1892   GstBuffer *buf;
1893
1894   buf = gst_aggregator_pad_steal_buffer (pad);
1895   if (buf)
1896     gst_buffer_unref (buf);
1897
1898   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
1899 }
1900
1901 static void
1902 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
1903 {
1904   GObjectClass *gobject_class = (GObjectClass *) klass;
1905
1906   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
1907
1908   gobject_class->constructed = gst_aggregator_pad_constructed;
1909   gobject_class->finalize = gst_aggregator_pad_finalize;
1910   gobject_class->dispose = gst_aggregator_pad_dispose;
1911 }
1912
1913 static void
1914 gst_aggregator_pad_init (GstAggregatorPad * pad)
1915 {
1916   pad->priv =
1917       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
1918       GstAggregatorPadPrivate);
1919
1920   pad->buffer = NULL;
1921   g_mutex_init (&pad->priv->event_lock);
1922   g_cond_init (&pad->priv->event_cond);
1923
1924   g_mutex_init (&pad->priv->stream_lock);
1925 }
1926
1927 /**
1928  * gst_aggregator_pad_steal_buffer:
1929  * @pad: the pad to get buffer from
1930  *
1931  * Steal the ref to the buffer currently queued in @pad.
1932  *
1933  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
1934  *   queued. You should unref the buffer after usage.
1935  */
1936 GstBuffer *
1937 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
1938 {
1939   GstBuffer *buffer = NULL;
1940
1941   PAD_LOCK_EVENT (pad);
1942   if (pad->buffer) {
1943     GST_TRACE_OBJECT (pad, "Consuming buffer");
1944     buffer = pad->buffer;
1945     pad->buffer = NULL;
1946     if (pad->priv->pending_eos) {
1947       pad->priv->pending_eos = FALSE;
1948       pad->eos = TRUE;
1949     }
1950     PAD_BROADCAST_EVENT (pad);
1951     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
1952   }
1953   PAD_UNLOCK_EVENT (pad);
1954
1955   return buffer;
1956 }
1957
1958 /**
1959  * gst_aggregator_pad_get_buffer:
1960  * @pad: the pad to get buffer from
1961  *
1962  * Returns: (transfer full): A reference to the buffer in @pad or
1963  * NULL if no buffer was queued. You should unref the buffer after
1964  * usage.
1965  */
1966 GstBuffer *
1967 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
1968 {
1969   GstBuffer *buffer = NULL;
1970
1971   PAD_LOCK_EVENT (pad);
1972   if (pad->buffer)
1973     buffer = gst_buffer_ref (pad->buffer);
1974   PAD_UNLOCK_EVENT (pad);
1975
1976   return buffer;
1977 }
1978
1979 /**
1980  * gst_aggregator_merge_tags:
1981  * @self: a #GstAggregator
1982  * @tags: a #GstTagList to merge
1983  * @mode: the #GstTagMergeMode to use
1984  *
1985  * Adds tags to so-called pending tags, which will be processed
1986  * before pushing out data downstream.
1987  *
1988  * Note that this is provided for convenience, and the subclass is
1989  * not required to use this and can still do tag handling on its own.
1990  *
1991  * MT safe.
1992  */
1993 void
1994 gst_aggregator_merge_tags (GstAggregator * self,
1995     const GstTagList * tags, GstTagMergeMode mode)
1996 {
1997   GstTagList *otags;
1998
1999   g_return_if_fail (GST_IS_AGGREGATOR (self));
2000   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
2001
2002   /* FIXME Check if we can use OBJECT lock here! */
2003   GST_OBJECT_LOCK (self);
2004   if (tags)
2005     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2006   otags = self->priv->tags;
2007   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2008   if (otags)
2009     gst_tag_list_unref (otags);
2010   self->priv->tags_changed = TRUE;
2011   GST_OBJECT_UNLOCK (self);
2012 }
2013
2014 /**
2015  * gst_aggregator_set_latency:
2016  * @self: a #GstAggregator
2017  * @min_latency: minimum latency
2018  * @max_latency: maximum latency
2019  *
2020  * Lets #GstAggregator sub-classes tell the baseclass what their internal
2021  * latency is. Will also post a LATENCY message on the bus so the pipeline
2022  * can reconfigure its global latency.
2023  */
2024 void
2025 gst_aggregator_set_latency (GstAggregator * self,
2026     GstClockTime min_latency, GstClockTime max_latency)
2027 {
2028   g_return_if_fail (GST_IS_AGGREGATOR (self));
2029   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
2030   g_return_if_fail (max_latency >= min_latency);
2031
2032   GST_OBJECT_LOCK (self);
2033   self->priv->sub_latency_min = min_latency;
2034   self->priv->sub_latency_max = max_latency;
2035   GST_OBJECT_UNLOCK (self);
2036
2037   gst_element_post_message (GST_ELEMENT_CAST (self),
2038       gst_message_new_latency (GST_OBJECT_CAST (self)));
2039 }