aggregator: Flushing is always in pad lock, no need to atomics
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72 static void gst_aggregator_set_latency_property (GstAggregator * agg,
73     gint64 latency);
74 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
75
76
77 /* Locking order, locks in this element must always be taken in this order
78  *
79  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
80  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
81  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
82  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
83  * standard element object lock -> GST_OBJECT_LOCK(agg)
84  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
85  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
86  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
87  */
88
89
90 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
91
92 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
93 #define GST_CAT_DEFAULT aggregator_debug
94
95 /* GstAggregatorPad definitions */
96 #define PAD_LOCK(pad)   G_STMT_START {                                  \
97   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
98         g_thread_self());                                               \
99   g_mutex_lock(&pad->priv->lock);                                       \
100   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
101         g_thread_self());                                               \
102   } G_STMT_END
103
104 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
105   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
106       g_thread_self());                                                 \
107   g_mutex_unlock(&pad->priv->lock);                                     \
108   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
109         g_thread_self());                                               \
110   } G_STMT_END
111
112
113 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
114   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
115         g_thread_self());                                               \
116   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
117       (&((GstAggregatorPad*)pad)->priv->lock));                         \
118   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
119         g_thread_self());                                               \
120   } G_STMT_END
121
122 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
123   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
124         g_thread_self());                                              \
125   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
126   } G_STMT_END
127
128
129 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
130   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
131         g_thread_self());                                               \
132   g_mutex_lock(&pad->priv->flush_lock);                                 \
133   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
134         g_thread_self());                                               \
135   } G_STMT_END
136
137 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
138   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
139         g_thread_self());                                               \
140   g_mutex_unlock(&pad->priv->flush_lock);                               \
141   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
142         g_thread_self());                                               \
143   } G_STMT_END
144
145 #define SRC_LOCK(self)   G_STMT_START {                             \
146   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
147       g_thread_self());                                             \
148   g_mutex_lock(&self->priv->src_lock);                              \
149   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
150         g_thread_self());                                           \
151   } G_STMT_END
152
153 #define SRC_UNLOCK(self)  G_STMT_START {                            \
154   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
155         g_thread_self());                                           \
156   g_mutex_unlock(&self->priv->src_lock);                            \
157   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
158         g_thread_self());                                           \
159   } G_STMT_END
160
161 #define SRC_WAIT(self) G_STMT_START {                               \
162   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
163         g_thread_self());                                           \
164   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
165   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
166         g_thread_self());                                           \
167   } G_STMT_END
168
169 #define SRC_BROADCAST(self) G_STMT_START {                          \
170     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
171         g_thread_self());                                           \
172     if (self->priv->aggregate_id)                                   \
173       gst_clock_id_unschedule (self->priv->aggregate_id);           \
174     g_cond_broadcast(&(self->priv->src_cond));                      \
175   } G_STMT_END
176
177 struct _GstAggregatorPadPrivate
178 {
179   /* Following fields are protected by the PAD_LOCK */
180   gboolean flushing;
181   gboolean pending_flush_start;
182   gboolean pending_flush_stop;
183   gboolean pending_eos;
184
185   GstBuffer *buffer;
186   gboolean eos;
187
188   GMutex lock;
189   GCond event_cond;
190   /* This lock prevents a flush start processing happening while
191    * the chain function is also happening.
192    */
193   GMutex flush_lock;
194 };
195
196 static gboolean
197 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
198 {
199   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
200
201   PAD_LOCK (aggpad);
202   aggpad->priv->pending_eos = FALSE;
203   aggpad->priv->eos = FALSE;
204   aggpad->priv->flushing = FALSE;
205   PAD_UNLOCK (aggpad);
206
207   if (klass->flush)
208     return klass->flush (aggpad, agg);
209
210   return TRUE;
211 }
212
213 /*************************************
214  * GstAggregator implementation  *
215  *************************************/
216 static GstElementClass *aggregator_parent_class = NULL;
217
218 /* All members are protected by the object lock unless otherwise noted */
219
220 struct _GstAggregatorPrivate
221 {
222   gint padcount;
223
224   /* Our state is >= PAUSED */
225   gboolean running;             /* protected by src_lock */
226
227   gint seqnum;
228   gboolean send_stream_start;   /* protected by srcpad stream lock */
229   gboolean send_segment;
230   gboolean flush_seeking;
231   gboolean pending_flush_start;
232   gboolean send_eos;            /* protected by srcpad stream lock */
233   GstFlowReturn flow_return;
234
235   GstCaps *srccaps;             /* protected by the srcpad stream lock */
236
237   GstTagList *tags;
238   gboolean tags_changed;
239
240   gboolean peer_latency_live;   /* protected by src_lock */
241   GstClockTime peer_latency_min;        /* protected by src_lock */
242   GstClockTime peer_latency_max;        /* protected by src_lock */
243   gboolean has_peer_latency;
244
245   GstClockTime sub_latency_min; /* protected by src_lock */
246   GstClockTime sub_latency_max; /* protected by src_lock */
247
248   /* aggregate */
249   GstClockID aggregate_id;      /* protected by src_lock */
250   GMutex src_lock;
251   GCond src_cond;
252
253   /* properties */
254   gint64 latency;
255 };
256
257 typedef struct
258 {
259   GstEvent *event;
260   gboolean result;
261   gboolean flush;
262
263   gboolean one_actually_seeked;
264 } EventData;
265
266 #define DEFAULT_LATENCY        0
267
268 enum
269 {
270   PROP_0,
271   PROP_LATENCY,
272   PROP_LAST
273 };
274
275 /**
276  * gst_aggregator_iterate_sinkpads:
277  * @self: The #GstAggregator
278  * @func: (scope call): The function to call.
279  * @user_data: (closure): The data to pass to @func.
280  *
281  * Iterate the sinkpads of aggregator to call a function on them.
282  *
283  * This method guarantees that @func will be called only once for each
284  * sink pad.
285  */
286 gboolean
287 gst_aggregator_iterate_sinkpads (GstAggregator * self,
288     GstAggregatorPadForeachFunc func, gpointer user_data)
289 {
290   gboolean result = FALSE;
291   GstIterator *iter;
292   gboolean done = FALSE;
293   GValue item = { 0, };
294   GList *seen_pads = NULL;
295
296   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
297
298   if (!iter)
299     goto no_iter;
300
301   while (!done) {
302     switch (gst_iterator_next (iter, &item)) {
303       case GST_ITERATOR_OK:
304       {
305         GstAggregatorPad *pad;
306
307         pad = g_value_get_object (&item);
308
309         /* if already pushed, skip. FIXME, find something faster to tag pads */
310         if (pad == NULL || g_list_find (seen_pads, pad)) {
311           g_value_reset (&item);
312           break;
313         }
314
315         GST_LOG_OBJECT (pad, "calling function %s on pad",
316             GST_DEBUG_FUNCPTR_NAME (func));
317
318         result = func (self, pad, user_data);
319
320         done = !result;
321
322         seen_pads = g_list_prepend (seen_pads, pad);
323
324         g_value_reset (&item);
325         break;
326       }
327       case GST_ITERATOR_RESYNC:
328         gst_iterator_resync (iter);
329         break;
330       case GST_ITERATOR_ERROR:
331         GST_ERROR_OBJECT (self,
332             "Could not iterate over internally linked pads");
333         done = TRUE;
334         break;
335       case GST_ITERATOR_DONE:
336         done = TRUE;
337         break;
338     }
339   }
340   g_value_unset (&item);
341   gst_iterator_free (iter);
342
343   if (seen_pads == NULL) {
344     GST_DEBUG_OBJECT (self, "No pad seen");
345     return FALSE;
346   }
347
348   g_list_free (seen_pads);
349
350 no_iter:
351   return result;
352 }
353
354 static gboolean
355 gst_aggregator_check_pads_ready (GstAggregator * self)
356 {
357   GstAggregatorPad *pad;
358   GList *l, *sinkpads;
359
360   GST_LOG_OBJECT (self, "checking pads");
361
362   GST_OBJECT_LOCK (self);
363
364   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
365   if (sinkpads == NULL)
366     goto no_sinkpads;
367
368   for (l = sinkpads; l != NULL; l = l->next) {
369     pad = l->data;
370
371     PAD_LOCK (pad);
372     if (pad->priv->buffer == NULL && !pad->priv->eos) {
373       PAD_UNLOCK (pad);
374       goto pad_not_ready;
375     }
376     PAD_UNLOCK (pad);
377
378   }
379
380   GST_OBJECT_UNLOCK (self);
381   GST_LOG_OBJECT (self, "pads are ready");
382   return TRUE;
383
384 no_sinkpads:
385   {
386     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
387     GST_OBJECT_UNLOCK (self);
388     return FALSE;
389   }
390 pad_not_ready:
391   {
392     GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
393     GST_OBJECT_UNLOCK (self);
394     return FALSE;
395   }
396 }
397
398 static void
399 gst_aggregator_reset_flow_values (GstAggregator * self)
400 {
401   GST_OBJECT_LOCK (self);
402   self->priv->flow_return = GST_FLOW_FLUSHING;
403   self->priv->send_stream_start = TRUE;
404   self->priv->send_segment = TRUE;
405   gst_segment_init (&self->segment, GST_FORMAT_TIME);
406   GST_OBJECT_UNLOCK (self);
407 }
408
409 static inline void
410 gst_aggregator_push_mandatory_events (GstAggregator * self)
411 {
412   GstAggregatorPrivate *priv = self->priv;
413   GstEvent *segment = NULL;
414   GstEvent *tags = NULL;
415
416   if (self->priv->send_stream_start) {
417     gchar s_id[32];
418
419     GST_INFO_OBJECT (self, "pushing stream start");
420     /* stream-start (FIXME: create id based on input ids) */
421     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
422     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
423       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
424     }
425     self->priv->send_stream_start = FALSE;
426   }
427
428   if (self->priv->srccaps) {
429
430     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
431         self->priv->srccaps);
432     if (!gst_pad_push_event (self->srcpad,
433             gst_event_new_caps (self->priv->srccaps))) {
434       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
435     }
436     gst_caps_unref (self->priv->srccaps);
437     self->priv->srccaps = NULL;
438   }
439
440   GST_OBJECT_LOCK (self);
441   if (self->priv->send_segment && !self->priv->flush_seeking) {
442     segment = gst_event_new_segment (&self->segment);
443
444     if (!self->priv->seqnum)
445       self->priv->seqnum = gst_event_get_seqnum (segment);
446     else
447       gst_event_set_seqnum (segment, self->priv->seqnum);
448     self->priv->send_segment = FALSE;
449
450     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
451   }
452
453   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
454     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
455     priv->tags_changed = FALSE;
456   }
457   GST_OBJECT_UNLOCK (self);
458
459   if (segment)
460     gst_pad_push_event (self->srcpad, segment);
461   if (tags)
462     gst_pad_push_event (self->srcpad, tags);
463
464 }
465
466 /**
467  * gst_aggregator_set_src_caps:
468  * @self: The #GstAggregator
469  * @caps: The #GstCaps to set on the src pad.
470  *
471  * Sets the caps to be used on the src pad.
472  */
473 void
474 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
475 {
476   GST_PAD_STREAM_LOCK (self->srcpad);
477   gst_caps_replace (&self->priv->srccaps, caps);
478   gst_aggregator_push_mandatory_events (self);
479   GST_PAD_STREAM_UNLOCK (self->srcpad);
480 }
481
482 /**
483  * gst_aggregator_finish_buffer:
484  * @self: The #GstAggregator
485  * @buffer: (transfer full): the #GstBuffer to push.
486  *
487  * This method will push the provided output buffer downstream. If needed,
488  * mandatory events such as stream-start, caps, and segment events will be
489  * sent before pushing the buffer.
490  */
491 GstFlowReturn
492 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
493 {
494   gst_aggregator_push_mandatory_events (self);
495
496   GST_OBJECT_LOCK (self);
497   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
498     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
499     GST_OBJECT_UNLOCK (self);
500     return gst_pad_push (self->srcpad, buffer);
501   } else {
502     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
503         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
504     GST_OBJECT_UNLOCK (self);
505     gst_buffer_unref (buffer);
506     return GST_FLOW_OK;
507   }
508 }
509
510 static void
511 gst_aggregator_push_eos (GstAggregator * self)
512 {
513   GstEvent *event;
514   gst_aggregator_push_mandatory_events (self);
515
516   event = gst_event_new_eos ();
517
518   GST_OBJECT_LOCK (self);
519   self->priv->send_eos = FALSE;
520   gst_event_set_seqnum (event, self->priv->seqnum);
521   GST_OBJECT_UNLOCK (self);
522
523   gst_pad_push_event (self->srcpad, event);
524 }
525
526 static GstClockTime
527 gst_aggregator_get_next_time (GstAggregator * self)
528 {
529   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
530
531   if (klass->get_next_time)
532     return klass->get_next_time (self);
533
534   return GST_CLOCK_TIME_NONE;
535 }
536
537 static gboolean
538 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
539 {
540   GstClockTime latency;
541   GstClockTime start;
542   gboolean res;
543
544   *timeout = FALSE;
545
546   SRC_LOCK (self);
547
548   latency = gst_aggregator_get_latency_unlocked (self);
549
550   if (gst_aggregator_check_pads_ready (self)) {
551     GST_DEBUG_OBJECT (self, "all pads have data");
552     SRC_UNLOCK (self);
553
554     return TRUE;
555   }
556
557   /* Before waiting, check if we're actually still running */
558   if (!self->priv->running || !self->priv->send_eos) {
559     SRC_UNLOCK (self);
560
561     return FALSE;
562   }
563
564   start = gst_aggregator_get_next_time (self);
565
566   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
567       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
568       !GST_CLOCK_TIME_IS_VALID (start)) {
569     /* We wake up here when something happened, and below
570      * then check if we're ready now. If we return FALSE,
571      * we will be directly called again.
572      */
573     SRC_WAIT (self);
574   } else {
575     GstClockTime base_time, time;
576     GstClock *clock;
577     GstClockReturn status;
578     GstClockTimeDiff jitter;
579
580     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
581         GST_TIME_ARGS (start));
582
583     GST_OBJECT_LOCK (self);
584     base_time = GST_ELEMENT_CAST (self)->base_time;
585     clock = GST_ELEMENT_CLOCK (self);
586     if (clock)
587       gst_object_ref (clock);
588     GST_OBJECT_UNLOCK (self);
589
590     time = base_time + start;
591     time += latency;
592
593     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
594         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
595         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
596         GST_TIME_ARGS (time),
597         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
598         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
599         GST_TIME_ARGS (gst_clock_get_time (clock)));
600
601
602     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
603     gst_object_unref (clock);
604     SRC_UNLOCK (self);
605
606     jitter = 0;
607     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
608
609     SRC_LOCK (self);
610     if (self->priv->aggregate_id) {
611       gst_clock_id_unref (self->priv->aggregate_id);
612       self->priv->aggregate_id = NULL;
613     }
614
615     GST_DEBUG_OBJECT (self,
616         "clock returned %d (jitter: %s%" GST_TIME_FORMAT ")",
617         status, (jitter < 0 ? "-" : " "),
618         GST_TIME_ARGS ((jitter < 0 ? -jitter : jitter)));
619
620     /* we timed out */
621     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
622       SRC_UNLOCK (self);
623       *timeout = TRUE;
624       return TRUE;
625     }
626   }
627
628   res = gst_aggregator_check_pads_ready (self);
629   SRC_UNLOCK (self);
630
631   return res;
632 }
633
634 static void
635 gst_aggregator_aggregate_func (GstAggregator * self)
636 {
637   GstAggregatorPrivate *priv = self->priv;
638   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
639   gboolean timeout = FALSE;
640
641   if (self->priv->running == FALSE) {
642     GST_DEBUG_OBJECT (self, "Not running anymore");
643     return;
644   }
645
646   GST_LOG_OBJECT (self, "Checking aggregate");
647   while (priv->send_eos && priv->running) {
648     GstFlowReturn flow_return;
649
650     if (!gst_aggregator_wait_and_check (self, &timeout))
651       continue;
652
653     GST_TRACE_OBJECT (self, "Actually aggregating!");
654
655     flow_return = klass->aggregate (self, timeout);
656
657     GST_OBJECT_LOCK (self);
658     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking)
659       priv->flow_return = GST_FLOW_OK;
660     else
661       priv->flow_return = flow_return;
662     GST_OBJECT_UNLOCK (self);
663
664     if (flow_return == GST_FLOW_EOS) {
665       gst_aggregator_push_eos (self);
666     }
667
668     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
669
670     if (flow_return != GST_FLOW_OK)
671       break;
672   }
673
674   /* Pause the task here, the only ways to get here are:
675    * 1) We're stopping, in which case the task is stopped anyway
676    * 2) We got a flow error above, in which case it might take
677    *    some time to forward the flow return upstream and we
678    *    would otherwise call the task function over and over
679    *    again without doing anything
680    */
681   gst_pad_pause_task (self->srcpad);
682 }
683
684 static gboolean
685 gst_aggregator_start (GstAggregator * self)
686 {
687   GstAggregatorClass *klass;
688   gboolean result;
689
690   self->priv->running = TRUE;
691   self->priv->send_stream_start = TRUE;
692   self->priv->send_segment = TRUE;
693   self->priv->send_eos = TRUE;
694   self->priv->srccaps = NULL;
695   self->priv->flow_return = GST_FLOW_OK;
696
697   klass = GST_AGGREGATOR_GET_CLASS (self);
698
699   if (klass->start)
700     result = klass->start (self);
701   else
702     result = TRUE;
703
704   return result;
705 }
706
707 static gboolean
708 _check_pending_flush_stop (GstAggregatorPad * pad)
709 {
710   gboolean res;
711
712   PAD_LOCK (pad);
713   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
714   PAD_UNLOCK (pad);
715
716   return res;
717 }
718
719 static gboolean
720 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
721 {
722   gboolean res = TRUE;
723
724   GST_INFO_OBJECT (self, "%s srcpad task",
725       flush_start ? "Pausing" : "Stopping");
726
727   SRC_LOCK (self);
728   self->priv->running = FALSE;
729   SRC_BROADCAST (self);
730   SRC_UNLOCK (self);
731
732   if (flush_start) {
733     res = gst_pad_push_event (self->srcpad, flush_start);
734   }
735
736   gst_pad_stop_task (self->srcpad);
737
738   return res;
739 }
740
741 static void
742 gst_aggregator_start_srcpad_task (GstAggregator * self)
743 {
744   GST_INFO_OBJECT (self, "Starting srcpad task");
745
746   self->priv->running = TRUE;
747   gst_pad_start_task (GST_PAD (self->srcpad),
748       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
749 }
750
751 static GstFlowReturn
752 gst_aggregator_flush (GstAggregator * self)
753 {
754   GstFlowReturn ret = GST_FLOW_OK;
755   GstAggregatorPrivate *priv = self->priv;
756   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
757
758   GST_DEBUG_OBJECT (self, "Flushing everything");
759   GST_OBJECT_LOCK (self);
760   priv->send_segment = TRUE;
761   priv->flush_seeking = FALSE;
762   priv->tags_changed = FALSE;
763   GST_OBJECT_UNLOCK (self);
764   if (klass->flush)
765     ret = klass->flush (self);
766
767   return ret;
768 }
769
770
771 /* Called with GstAggregator's object lock held */
772
773 static gboolean
774 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
775 {
776   GList *tmp;
777   GstAggregatorPad *tmppad;
778
779   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
780     tmppad = (GstAggregatorPad *) tmp->data;
781
782     if (_check_pending_flush_stop (tmppad) == FALSE) {
783       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
784           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
785       return FALSE;
786     }
787   }
788
789   return TRUE;
790 }
791
792 static void
793 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad)
794 {
795   PAD_LOCK (aggpad);
796   aggpad->priv->flushing = TRUE;
797   gst_buffer_replace (&aggpad->priv->buffer, NULL);
798   PAD_BROADCAST_EVENT (aggpad);
799   PAD_UNLOCK (aggpad);
800 }
801
802
803 static void
804 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
805     GstEvent * event)
806 {
807   GstAggregatorPrivate *priv = self->priv;
808   GstAggregatorPadPrivate *padpriv = aggpad->priv;
809
810   gst_aggregator_pad_set_flushing (aggpad);
811
812   PAD_FLUSH_LOCK (aggpad);
813   PAD_LOCK (aggpad);
814   if (padpriv->pending_flush_start) {
815     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
816
817     padpriv->pending_flush_start = FALSE;
818     padpriv->pending_flush_stop = TRUE;
819   }
820   PAD_UNLOCK (aggpad);
821
822   GST_OBJECT_LOCK (self);
823   if (priv->flush_seeking) {
824     /* If flush_seeking we forward the first FLUSH_START */
825     if (priv->pending_flush_start) {
826       priv->pending_flush_start = FALSE;
827       GST_OBJECT_UNLOCK (self);
828
829       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
830       gst_aggregator_stop_srcpad_task (self, event);
831       priv->flow_return = GST_FLOW_OK;
832
833       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
834       GST_PAD_STREAM_LOCK (self->srcpad);
835       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
836       event = NULL;
837     } else {
838       GST_OBJECT_UNLOCK (self);
839       gst_event_unref (event);
840     }
841   } else {
842     GST_OBJECT_UNLOCK (self);
843     gst_event_unref (event);
844   }
845   PAD_FLUSH_UNLOCK (aggpad);
846
847   gst_aggregator_pad_drop_buffer (aggpad);
848 }
849
850 /* GstAggregator vmethods default implementations */
851 static gboolean
852 gst_aggregator_default_sink_event (GstAggregator * self,
853     GstAggregatorPad * aggpad, GstEvent * event)
854 {
855   gboolean res = TRUE;
856   GstPad *pad = GST_PAD (aggpad);
857   GstAggregatorPrivate *priv = self->priv;
858
859   switch (GST_EVENT_TYPE (event)) {
860     case GST_EVENT_FLUSH_START:
861     {
862       gst_aggregator_flush_start (self, aggpad, event);
863       /* We forward only in one case: right after flush_seeking */
864       event = NULL;
865       goto eat;
866     }
867     case GST_EVENT_FLUSH_STOP:
868     {
869       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
870
871       gst_aggregator_pad_flush (aggpad, self);
872       GST_OBJECT_LOCK (self);
873       if (priv->flush_seeking) {
874         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
875         if (gst_aggregator_all_flush_stop_received_locked (self)) {
876           GST_OBJECT_UNLOCK (self);
877           /* That means we received FLUSH_STOP/FLUSH_STOP on
878            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
879           gst_aggregator_flush (self);
880           gst_pad_push_event (self->srcpad, event);
881           event = NULL;
882           SRC_LOCK (self);
883           priv->send_eos = TRUE;
884           SRC_BROADCAST (self);
885           SRC_UNLOCK (self);
886
887           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
888           GST_PAD_STREAM_UNLOCK (self->srcpad);
889           gst_aggregator_start_srcpad_task (self);
890         } else {
891           GST_OBJECT_UNLOCK (self);
892         }
893       } else {
894         GST_OBJECT_UNLOCK (self);
895       }
896
897       /* We never forward the event */
898       goto eat;
899     }
900     case GST_EVENT_EOS:
901     {
902       GST_DEBUG_OBJECT (aggpad, "EOS");
903
904       /* We still have a buffer, and we don't want the subclass to have to
905        * check for it. Mark pending_eos, eos will be set when steal_buffer is
906        * called
907        */
908       SRC_LOCK (self);
909       PAD_LOCK (aggpad);
910       if (!aggpad->priv->buffer) {
911         aggpad->priv->eos = TRUE;
912       } else {
913         aggpad->priv->pending_eos = TRUE;
914       }
915       PAD_UNLOCK (aggpad);
916
917       SRC_BROADCAST (self);
918       SRC_UNLOCK (self);
919       goto eat;
920     }
921     case GST_EVENT_SEGMENT:
922     {
923       GST_OBJECT_LOCK (aggpad);
924       gst_event_copy_segment (event, &aggpad->segment);
925       GST_OBJECT_UNLOCK (aggpad);
926
927       GST_OBJECT_LOCK (self);
928       self->priv->seqnum = gst_event_get_seqnum (event);
929       GST_OBJECT_UNLOCK (self);
930       goto eat;
931     }
932     case GST_EVENT_STREAM_START:
933     {
934       goto eat;
935     }
936     case GST_EVENT_GAP:
937     {
938       /* FIXME: need API to handle GAP events properly */
939       GST_FIXME_OBJECT (self, "implement support for GAP events");
940       /* don't forward GAP events downstream */
941       goto eat;
942     }
943     case GST_EVENT_TAG:
944     {
945       GstTagList *tags;
946
947       gst_event_parse_tag (event, &tags);
948
949       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
950         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
951         gst_event_unref (event);
952         event = NULL;
953         goto eat;
954       }
955       break;
956     }
957     default:
958     {
959       break;
960     }
961   }
962
963   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
964   return gst_pad_event_default (pad, GST_OBJECT (self), event);
965
966 eat:
967   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
968   if (event)
969     gst_event_unref (event);
970
971   return res;
972 }
973
974 static inline gboolean
975 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
976     gpointer unused_udata)
977 {
978   gst_aggregator_pad_flush (pad, self);
979
980   return TRUE;
981 }
982
983 static gboolean
984 gst_aggregator_stop (GstAggregator * agg)
985 {
986   GstAggregatorClass *klass;
987   gboolean result;
988
989   gst_aggregator_reset_flow_values (agg);
990
991   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
992
993   klass = GST_AGGREGATOR_GET_CLASS (agg);
994
995   if (klass->stop)
996     result = klass->stop (agg);
997   else
998     result = TRUE;
999
1000   agg->priv->has_peer_latency = FALSE;
1001   agg->priv->peer_latency_live = FALSE;
1002   agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
1003
1004   if (agg->priv->tags)
1005     gst_tag_list_unref (agg->priv->tags);
1006   agg->priv->tags = NULL;
1007
1008   return result;
1009 }
1010
1011 /* GstElement vmethods implementations */
1012 static GstStateChangeReturn
1013 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1014 {
1015   GstStateChangeReturn ret;
1016   GstAggregator *self = GST_AGGREGATOR (element);
1017
1018   switch (transition) {
1019     case GST_STATE_CHANGE_READY_TO_PAUSED:
1020       if (!gst_aggregator_start (self))
1021         goto error_start;
1022       break;
1023     default:
1024       break;
1025   }
1026
1027   if ((ret =
1028           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1029               transition)) == GST_STATE_CHANGE_FAILURE)
1030     goto failure;
1031
1032
1033   switch (transition) {
1034     case GST_STATE_CHANGE_PAUSED_TO_READY:
1035       if (!gst_aggregator_stop (self)) {
1036         /* What to do in this case? Error out? */
1037         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1038       }
1039       break;
1040     default:
1041       break;
1042   }
1043
1044   return ret;
1045
1046 /* ERRORS */
1047 failure:
1048   {
1049     GST_ERROR_OBJECT (element, "parent failed state change");
1050     return ret;
1051   }
1052 error_start:
1053   {
1054     GST_ERROR_OBJECT (element, "Subclass failed to start");
1055     return GST_STATE_CHANGE_FAILURE;
1056   }
1057 }
1058
1059 static void
1060 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1061 {
1062   GstAggregator *self = GST_AGGREGATOR (element);
1063   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1064
1065   GST_INFO_OBJECT (pad, "Removing pad");
1066
1067   SRC_LOCK (self);
1068   gst_aggregator_pad_set_flushing (aggpad);
1069   gst_element_remove_pad (element, pad);
1070
1071   SRC_BROADCAST (self);
1072   SRC_UNLOCK (self);
1073 }
1074
1075 static GstPad *
1076 gst_aggregator_request_new_pad (GstElement * element,
1077     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1078 {
1079   GstAggregator *self;
1080   GstAggregatorPad *agg_pad;
1081
1082   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
1083   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1084
1085   self = GST_AGGREGATOR (element);
1086
1087   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
1088     gint serial = 0;
1089     gchar *name = NULL;
1090
1091     GST_OBJECT_LOCK (element);
1092     if (req_name == NULL || strlen (req_name) < 6
1093         || !g_str_has_prefix (req_name, "sink_")) {
1094       /* no name given when requesting the pad, use next available int */
1095       priv->padcount++;
1096     } else {
1097       /* parse serial number from requested padname */
1098       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1099       if (serial >= priv->padcount)
1100         priv->padcount = serial;
1101     }
1102
1103     name = g_strdup_printf ("sink_%u", priv->padcount);
1104     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1105         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1106     g_free (name);
1107
1108     GST_OBJECT_UNLOCK (element);
1109
1110   } else {
1111     return NULL;
1112   }
1113
1114   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1115
1116   if (priv->running)
1117     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1118
1119   /* add the pad to the element */
1120   gst_element_add_pad (element, GST_PAD (agg_pad));
1121
1122   return GST_PAD (agg_pad);
1123 }
1124
1125 /* Must be called with SRC_LOCK held */
1126
1127 static gboolean
1128 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1129 {
1130   gboolean query_ret, live;
1131   GstClockTime our_latency, min, max;
1132
1133   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1134
1135   if (!query_ret) {
1136     GST_WARNING_OBJECT (self, "Latency query failed");
1137     return FALSE;
1138   }
1139
1140   gst_query_parse_latency (query, &live, &min, &max);
1141
1142   our_latency = self->priv->latency;
1143
1144   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1145     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1146         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1147     return FALSE;
1148   }
1149
1150   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1151     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1152         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1153             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1154             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1155     return FALSE;
1156   }
1157
1158   self->priv->peer_latency_live = live;
1159   self->priv->peer_latency_min = min;
1160   self->priv->peer_latency_max = max;
1161   self->priv->has_peer_latency = TRUE;
1162
1163   /* add our own */
1164   min += our_latency;
1165   min += self->priv->sub_latency_min;
1166   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1167       && GST_CLOCK_TIME_IS_VALID (max))
1168     max += self->priv->sub_latency_max;
1169   else
1170     max = GST_CLOCK_TIME_NONE;
1171
1172   if (live && min > max) {
1173     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1174         ("%s", "Latency too big"),
1175         ("The requested latency value is too big for the current pipeline.  "
1176             "Limiting to %" G_GINT64_FORMAT, max));
1177     min = max;
1178     /* FIXME: This could in theory become negative, but in
1179      * that case all is lost anyway */
1180     self->priv->latency -= min - max;
1181     /* FIXME: shouldn't we g_object_notify() the change here? */
1182   }
1183
1184   SRC_BROADCAST (self);
1185
1186   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1187       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1188
1189   gst_query_set_latency (query, live, min, max);
1190
1191   return query_ret;
1192 }
1193
1194 /*
1195  * MUST be called with the src_lock held.
1196  *
1197  * See  gst_aggregator_get_latency() for doc
1198  */
1199 static GstClockTime
1200 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1201 {
1202   GstClockTime latency;
1203
1204   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1205
1206   if (!self->priv->has_peer_latency) {
1207     GstQuery *query = gst_query_new_latency ();
1208     gboolean ret;
1209
1210     ret = gst_aggregator_query_latency_unlocked (self, query);
1211     gst_query_unref (query);
1212     if (!ret)
1213       return GST_CLOCK_TIME_NONE;
1214   }
1215
1216   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1217     return GST_CLOCK_TIME_NONE;
1218
1219   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1220   latency = self->priv->peer_latency_min;
1221
1222   /* add our own */
1223   latency += self->priv->latency;
1224   latency += self->priv->sub_latency_min;
1225
1226   return latency;
1227 }
1228
1229 /**
1230  * gst_aggregator_get_latency:
1231  * @self: a #GstAggregator
1232  *
1233  * Retrieves the latency values reported by @self in response to the latency
1234  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1235  * will not wait for the clock.
1236  *
1237  * Typically only called by subclasses.
1238  *
1239  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1240  */
1241 GstClockTime
1242 gst_aggregator_get_latency (GstAggregator * self)
1243 {
1244   GstClockTime ret;
1245
1246   SRC_LOCK (self);
1247   ret = gst_aggregator_get_latency_unlocked (self);
1248   SRC_UNLOCK (self);
1249
1250   return ret;
1251 }
1252
1253 static gboolean
1254 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1255 {
1256   GstAggregator *self = GST_AGGREGATOR (element);
1257
1258   GST_STATE_LOCK (element);
1259   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1260       GST_STATE (element) < GST_STATE_PAUSED) {
1261     gdouble rate;
1262     GstFormat fmt;
1263     GstSeekFlags flags;
1264     GstSeekType start_type, stop_type;
1265     gint64 start, stop;
1266
1267     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1268         &start, &stop_type, &stop);
1269
1270     GST_OBJECT_LOCK (self);
1271     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1272         stop_type, stop, NULL);
1273     self->priv->seqnum = gst_event_get_seqnum (event);
1274     GST_OBJECT_UNLOCK (self);
1275
1276     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1277   }
1278   GST_STATE_UNLOCK (element);
1279
1280
1281   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1282       event);
1283 }
1284
1285 static gboolean
1286 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1287 {
1288   gboolean res = TRUE;
1289
1290   switch (GST_QUERY_TYPE (query)) {
1291     case GST_QUERY_SEEKING:
1292     {
1293       GstFormat format;
1294
1295       /* don't pass it along as some (file)sink might claim it does
1296        * whereas with a collectpads in between that will not likely work */
1297       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1298       gst_query_set_seeking (query, format, FALSE, 0, -1);
1299       res = TRUE;
1300
1301       break;
1302     }
1303     case GST_QUERY_LATENCY:
1304       SRC_LOCK (self);
1305       res = gst_aggregator_query_latency_unlocked (self, query);
1306       SRC_UNLOCK (self);
1307       break;
1308     default:
1309       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1310   }
1311
1312   return res;
1313 }
1314
1315 static gboolean
1316 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1317 {
1318   EventData *evdata = user_data;
1319   gboolean ret = TRUE;
1320   GstPad *peer = gst_pad_get_peer (pad);
1321   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1322
1323   if (peer) {
1324     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1325     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1326     gst_object_unref (peer);
1327   }
1328
1329   if (ret == FALSE) {
1330     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
1331       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1332
1333     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1334       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1335
1336       if (gst_pad_query (peer, seeking)) {
1337         gboolean seekable;
1338
1339         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1340
1341         if (seekable == FALSE) {
1342           GST_INFO_OBJECT (pad,
1343               "Source not seekable, We failed but it does not matter!");
1344
1345           ret = TRUE;
1346         }
1347       } else {
1348         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1349       }
1350
1351       gst_query_unref (seeking);
1352     }
1353
1354     if (evdata->flush) {
1355       PAD_LOCK (aggpad);
1356       aggpad->priv->pending_flush_start = FALSE;
1357       aggpad->priv->pending_flush_stop = FALSE;
1358       PAD_UNLOCK (aggpad);
1359     }
1360   } else {
1361     evdata->one_actually_seeked = TRUE;
1362   }
1363
1364   evdata->result &= ret;
1365
1366   /* Always send to all pads */
1367   return FALSE;
1368 }
1369
1370 static EventData
1371 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1372     GstEvent * event, gboolean flush)
1373 {
1374   EventData evdata;
1375
1376   evdata.event = event;
1377   evdata.result = TRUE;
1378   evdata.flush = flush;
1379   evdata.one_actually_seeked = FALSE;
1380
1381   /* We first need to set all pads as flushing in a first pass
1382    * as flush_start flush_stop is sometimes sent synchronously
1383    * while we send the seek event */
1384   if (flush) {
1385     GList *l;
1386
1387     GST_OBJECT_LOCK (self);
1388     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1389       GstAggregatorPad *pad = l->data;
1390
1391       PAD_LOCK (pad);
1392       pad->priv->pending_flush_start = TRUE;
1393       pad->priv->pending_flush_stop = FALSE;
1394       PAD_UNLOCK (pad);
1395     }
1396     GST_OBJECT_UNLOCK (self);
1397   }
1398
1399   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
1400
1401   gst_event_unref (event);
1402
1403   return evdata;
1404 }
1405
1406 static gboolean
1407 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1408 {
1409   gdouble rate;
1410   GstFormat fmt;
1411   GstSeekFlags flags;
1412   GstSeekType start_type, stop_type;
1413   gint64 start, stop;
1414   gboolean flush;
1415   EventData evdata;
1416   GstAggregatorPrivate *priv = self->priv;
1417
1418   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1419       &start, &stop_type, &stop);
1420
1421   GST_INFO_OBJECT (self, "starting SEEK");
1422
1423   flush = flags & GST_SEEK_FLAG_FLUSH;
1424
1425   GST_OBJECT_LOCK (self);
1426   if (flush) {
1427     priv->pending_flush_start = TRUE;
1428     priv->flush_seeking = TRUE;
1429   }
1430
1431   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1432       stop_type, stop, NULL);
1433   GST_OBJECT_UNLOCK (self);
1434
1435   /* forward the seek upstream */
1436   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
1437   event = NULL;
1438
1439   if (!evdata.result || !evdata.one_actually_seeked) {
1440     GST_OBJECT_LOCK (self);
1441     priv->flush_seeking = FALSE;
1442     priv->pending_flush_start = FALSE;
1443     GST_OBJECT_UNLOCK (self);
1444   }
1445
1446   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1447
1448   return evdata.result;
1449 }
1450
1451 static gboolean
1452 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
1453 {
1454   EventData evdata;
1455   gboolean res = TRUE;
1456
1457   switch (GST_EVENT_TYPE (event)) {
1458     case GST_EVENT_SEEK:
1459     {
1460       gst_event_ref (event);
1461       res = gst_aggregator_do_seek (self, event);
1462       gst_event_unref (event);
1463       event = NULL;
1464       goto done;
1465     }
1466     case GST_EVENT_NAVIGATION:
1467     {
1468       /* navigation is rather pointless. */
1469       res = FALSE;
1470       gst_event_unref (event);
1471       goto done;
1472     }
1473     default:
1474     {
1475       break;
1476     }
1477   }
1478
1479   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE);
1480   res = evdata.result;
1481
1482 done:
1483   return res;
1484 }
1485
1486 static gboolean
1487 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
1488     GstEvent * event)
1489 {
1490   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1491
1492   return klass->src_event (GST_AGGREGATOR (parent), event);
1493 }
1494
1495 static gboolean
1496 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
1497     GstQuery * query)
1498 {
1499   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1500
1501   return klass->src_query (GST_AGGREGATOR (parent), query);
1502 }
1503
1504 static gboolean
1505 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
1506     GstObject * parent, GstPadMode mode, gboolean active)
1507 {
1508   GstAggregator *self = GST_AGGREGATOR (parent);
1509   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1510
1511   if (klass->src_activate) {
1512     if (klass->src_activate (self, mode, active) == FALSE) {
1513       return FALSE;
1514     }
1515   }
1516
1517   if (active == TRUE) {
1518     switch (mode) {
1519       case GST_PAD_MODE_PUSH:
1520       {
1521         GST_INFO_OBJECT (pad, "Activating pad!");
1522         gst_aggregator_start_srcpad_task (self);
1523         return TRUE;
1524       }
1525       default:
1526       {
1527         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1528         return FALSE;
1529       }
1530     }
1531   }
1532
1533   /* deactivating */
1534   GST_INFO_OBJECT (self, "Deactivating srcpad");
1535   gst_aggregator_stop_srcpad_task (self, FALSE);
1536
1537   return TRUE;
1538 }
1539
1540 static gboolean
1541 gst_aggregator_default_sink_query (GstAggregator * self,
1542     GstAggregatorPad * aggpad, GstQuery * query)
1543 {
1544   GstPad *pad = GST_PAD (aggpad);
1545
1546   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1547 }
1548
1549 static void
1550 gst_aggregator_finalize (GObject * object)
1551 {
1552   GstAggregator *self = (GstAggregator *) object;
1553
1554   g_mutex_clear (&self->priv->src_lock);
1555   g_cond_clear (&self->priv->src_cond);
1556
1557   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1558 }
1559
1560 /*
1561  * gst_aggregator_set_latency_property:
1562  * @agg: a #GstAggregator
1563  * @latency: the new latency value.
1564  *
1565  * Sets the new latency value to @latency. This value is used to limit the
1566  * amount of time a pad waits for data to appear before considering the pad
1567  * as unresponsive.
1568  */
1569 static void
1570 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1571 {
1572   gboolean changed;
1573   GstClockTime min, max;
1574
1575   g_return_if_fail (GST_IS_AGGREGATOR (self));
1576   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
1577
1578   SRC_LOCK (self);
1579   if (self->priv->peer_latency_live) {
1580     min = self->priv->peer_latency_min;
1581     max = self->priv->peer_latency_max;
1582     /* add our own */
1583     min += latency;
1584     min += self->priv->sub_latency_min;
1585     if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1586         && GST_CLOCK_TIME_IS_VALID (max))
1587       max += self->priv->sub_latency_max;
1588     else
1589       max = GST_CLOCK_TIME_NONE;
1590
1591     if (GST_CLOCK_TIME_IS_VALID (max) && min > max) {
1592       GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1593           ("%s", "Latency too big"),
1594           ("The requested latency value is too big for the latency in the "
1595               "current pipeline.  Limiting to %" G_GINT64_FORMAT, max));
1596       /* FIXME: This could in theory become negative, but in
1597        * that case all is lost anyway */
1598       latency -= min - max;
1599       /* FIXME: shouldn't we g_object_notify() the change here? */
1600     }
1601   }
1602
1603   changed = (self->priv->latency != latency);
1604   self->priv->latency = latency;
1605
1606   if (changed)
1607     SRC_BROADCAST (self);
1608   SRC_UNLOCK (self);
1609
1610   if (changed)
1611     gst_element_post_message (GST_ELEMENT_CAST (self),
1612         gst_message_new_latency (GST_OBJECT_CAST (self)));
1613 }
1614
1615 /*
1616  * gst_aggregator_get_latency_property:
1617  * @agg: a #GstAggregator
1618  *
1619  * Gets the latency value. See gst_aggregator_set_latency for
1620  * more details.
1621  *
1622  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
1623  * before a pad is deemed unresponsive. A value of -1 means an
1624  * unlimited time.
1625  */
1626 static gint64
1627 gst_aggregator_get_latency_property (GstAggregator * agg)
1628 {
1629   gint64 res;
1630
1631   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
1632
1633   GST_OBJECT_LOCK (agg);
1634   res = agg->priv->latency;
1635   GST_OBJECT_UNLOCK (agg);
1636
1637   return res;
1638 }
1639
1640 static void
1641 gst_aggregator_set_property (GObject * object, guint prop_id,
1642     const GValue * value, GParamSpec * pspec)
1643 {
1644   GstAggregator *agg = GST_AGGREGATOR (object);
1645
1646   switch (prop_id) {
1647     case PROP_LATENCY:
1648       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
1649       break;
1650     default:
1651       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1652       break;
1653   }
1654 }
1655
1656 static void
1657 gst_aggregator_get_property (GObject * object, guint prop_id,
1658     GValue * value, GParamSpec * pspec)
1659 {
1660   GstAggregator *agg = GST_AGGREGATOR (object);
1661
1662   switch (prop_id) {
1663     case PROP_LATENCY:
1664       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
1665       break;
1666     default:
1667       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1668       break;
1669   }
1670 }
1671
1672 /* GObject vmethods implementations */
1673 static void
1674 gst_aggregator_class_init (GstAggregatorClass * klass)
1675 {
1676   GObjectClass *gobject_class = (GObjectClass *) klass;
1677   GstElementClass *gstelement_class = (GstElementClass *) klass;
1678
1679   aggregator_parent_class = g_type_class_peek_parent (klass);
1680   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1681
1682   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1683       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1684
1685   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1686
1687   klass->sink_event = gst_aggregator_default_sink_event;
1688   klass->sink_query = gst_aggregator_default_sink_query;
1689
1690   klass->src_event = gst_aggregator_default_src_event;
1691   klass->src_query = gst_aggregator_default_src_query;
1692
1693   gstelement_class->request_new_pad =
1694       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
1695   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
1696   gstelement_class->release_pad =
1697       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
1698   gstelement_class->change_state =
1699       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
1700
1701   gobject_class->set_property = gst_aggregator_set_property;
1702   gobject_class->get_property = gst_aggregator_get_property;
1703   gobject_class->finalize = gst_aggregator_finalize;
1704
1705   g_object_class_install_property (gobject_class, PROP_LATENCY,
1706       g_param_spec_int64 ("latency", "Buffer latency",
1707           "Additional latency in live mode to allow upstream "
1708           "to take longer to produce buffers for the current "
1709           "position", 0,
1710           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
1711           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1712
1713   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
1714 }
1715
1716 static void
1717 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1718 {
1719   GstPadTemplate *pad_template;
1720   GstAggregatorPrivate *priv;
1721
1722   g_return_if_fail (klass->aggregate != NULL);
1723
1724   self->priv =
1725       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1726       GstAggregatorPrivate);
1727
1728   priv = self->priv;
1729
1730   pad_template =
1731       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1732   g_return_if_fail (pad_template != NULL);
1733
1734   priv->padcount = -1;
1735   priv->tags_changed = FALSE;
1736
1737   self->priv->peer_latency_live = FALSE;
1738   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
1739   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
1740   self->priv->has_peer_latency = FALSE;
1741   gst_aggregator_reset_flow_values (self);
1742
1743   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1744
1745   gst_pad_set_event_function (self->srcpad,
1746       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
1747   gst_pad_set_query_function (self->srcpad,
1748       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
1749   gst_pad_set_activatemode_function (self->srcpad,
1750       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
1751
1752   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1753
1754   self->priv->latency = DEFAULT_LATENCY;
1755
1756   g_mutex_init (&self->priv->src_lock);
1757   g_cond_init (&self->priv->src_cond);
1758 }
1759
1760 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1761  * method to get to the padtemplates */
1762 GType
1763 gst_aggregator_get_type (void)
1764 {
1765   static volatile gsize type = 0;
1766
1767   if (g_once_init_enter (&type)) {
1768     GType _type;
1769     static const GTypeInfo info = {
1770       sizeof (GstAggregatorClass),
1771       NULL,
1772       NULL,
1773       (GClassInitFunc) gst_aggregator_class_init,
1774       NULL,
1775       NULL,
1776       sizeof (GstAggregator),
1777       0,
1778       (GInstanceInitFunc) gst_aggregator_init,
1779     };
1780
1781     _type = g_type_register_static (GST_TYPE_ELEMENT,
1782         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1783     g_once_init_leave (&type, _type);
1784   }
1785   return type;
1786 }
1787
1788 static GstFlowReturn
1789 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1790 {
1791   GstBuffer *actual_buf = buffer;
1792   GstAggregator *self = GST_AGGREGATOR (object);
1793   GstAggregatorPrivate *priv = self->priv;
1794   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1795   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1796   GstFlowReturn flow_return;
1797
1798   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1799
1800   PAD_FLUSH_LOCK (aggpad);
1801
1802   PAD_LOCK (aggpad);
1803   if (aggpad->priv->pending_eos == TRUE)
1804     goto eos;
1805
1806   while (aggpad->priv->buffer && !aggpad->priv->flushing)
1807     PAD_WAIT_EVENT (aggpad);
1808
1809   if (aggpad->priv->flushing)
1810     goto flushing;
1811
1812   PAD_UNLOCK (aggpad);
1813
1814   if (aggclass->clip) {
1815     aggclass->clip (self, aggpad, buffer, &actual_buf);
1816   }
1817
1818   SRC_LOCK (self);
1819   PAD_LOCK (aggpad);
1820   if (aggpad->priv->buffer)
1821     gst_buffer_unref (aggpad->priv->buffer);
1822   aggpad->priv->buffer = actual_buf;
1823   PAD_UNLOCK (aggpad);
1824   PAD_FLUSH_UNLOCK (aggpad);
1825
1826   SRC_BROADCAST (self);
1827   SRC_UNLOCK (self);
1828
1829   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1830
1831   GST_OBJECT_LOCK (self);
1832   flow_return = priv->flow_return;
1833   GST_OBJECT_UNLOCK (self);
1834
1835   return flow_return;
1836
1837 flushing:
1838   PAD_UNLOCK (aggpad);
1839   PAD_FLUSH_UNLOCK (aggpad);
1840
1841   gst_buffer_unref (buffer);
1842   GST_DEBUG_OBJECT (aggpad, "We are flushing");
1843
1844   return GST_FLOW_FLUSHING;
1845
1846 eos:
1847   PAD_UNLOCK (aggpad);
1848   PAD_FLUSH_UNLOCK (aggpad);
1849
1850   gst_buffer_unref (buffer);
1851   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1852
1853   return GST_FLOW_EOS;
1854 }
1855
1856 static gboolean
1857 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
1858     GstQuery * query)
1859 {
1860   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1861   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1862
1863   if (GST_QUERY_IS_SERIALIZED (query)) {
1864     PAD_LOCK (aggpad);
1865
1866     while (aggpad->priv->buffer && !aggpad->priv->flushing)
1867       PAD_WAIT_EVENT (aggpad);
1868
1869     if (aggpad->priv->flushing)
1870       goto flushing;
1871
1872     PAD_UNLOCK (aggpad);
1873   }
1874
1875   return klass->sink_query (GST_AGGREGATOR (parent),
1876       GST_AGGREGATOR_PAD (pad), query);
1877
1878 flushing:
1879   PAD_UNLOCK (aggpad);
1880   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
1881   return FALSE;
1882 }
1883
1884 static gboolean
1885 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
1886     GstEvent * event)
1887 {
1888   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1889   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1890
1891   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
1892       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
1893     PAD_LOCK (aggpad);
1894
1895
1896     while (aggpad->priv->buffer && !aggpad->priv->flushing)
1897       PAD_WAIT_EVENT (aggpad);
1898
1899     if (aggpad->priv->flushing
1900         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
1901       goto flushing;
1902
1903     PAD_UNLOCK (aggpad);
1904   }
1905
1906   return klass->sink_event (GST_AGGREGATOR (parent),
1907       GST_AGGREGATOR_PAD (pad), event);
1908
1909 flushing:
1910   PAD_UNLOCK (aggpad);
1911   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
1912   if (GST_EVENT_IS_STICKY (event))
1913     gst_pad_store_sticky_event (pad, event);
1914   gst_event_unref (event);
1915   return FALSE;
1916 }
1917
1918 static gboolean
1919 gst_aggregator_pad_activate_mode_func (GstPad * pad,
1920     GstObject * parent, GstPadMode mode, gboolean active)
1921 {
1922   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1923
1924   if (active == FALSE) {
1925     gst_aggregator_pad_set_flushing (aggpad);
1926   } else {
1927     PAD_LOCK (aggpad);
1928     aggpad->priv->flushing = FALSE;
1929     PAD_BROADCAST_EVENT (aggpad);
1930     PAD_UNLOCK (aggpad);
1931   }
1932
1933   return TRUE;
1934 }
1935
1936 /***********************************
1937  * GstAggregatorPad implementation  *
1938  ************************************/
1939 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1940
1941 static void
1942 gst_aggregator_pad_constructed (GObject * object)
1943 {
1944   GstPad *pad = GST_PAD (object);
1945
1946   gst_pad_set_chain_function (pad,
1947       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
1948   gst_pad_set_event_function (pad,
1949       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func));
1950   gst_pad_set_query_function (pad,
1951       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
1952   gst_pad_set_activatemode_function (pad,
1953       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
1954 }
1955
1956 static void
1957 gst_aggregator_pad_finalize (GObject * object)
1958 {
1959   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1960
1961   g_cond_clear (&pad->priv->event_cond);
1962   g_mutex_clear (&pad->priv->flush_lock);
1963   g_mutex_clear (&pad->priv->lock);
1964
1965   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
1966 }
1967
1968 static void
1969 gst_aggregator_pad_dispose (GObject * object)
1970 {
1971   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1972
1973   gst_aggregator_pad_drop_buffer (pad);
1974
1975   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
1976 }
1977
1978 static void
1979 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
1980 {
1981   GObjectClass *gobject_class = (GObjectClass *) klass;
1982
1983   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
1984
1985   gobject_class->constructed = gst_aggregator_pad_constructed;
1986   gobject_class->finalize = gst_aggregator_pad_finalize;
1987   gobject_class->dispose = gst_aggregator_pad_dispose;
1988 }
1989
1990 static void
1991 gst_aggregator_pad_init (GstAggregatorPad * pad)
1992 {
1993   pad->priv =
1994       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
1995       GstAggregatorPadPrivate);
1996
1997   pad->priv->buffer = NULL;
1998   g_cond_init (&pad->priv->event_cond);
1999
2000   g_mutex_init (&pad->priv->flush_lock);
2001   g_mutex_init (&pad->priv->lock);
2002 }
2003
2004 /**
2005  * gst_aggregator_pad_steal_buffer:
2006  * @pad: the pad to get buffer from
2007  *
2008  * Steal the ref to the buffer currently queued in @pad.
2009  *
2010  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2011  *   queued. You should unref the buffer after usage.
2012  */
2013 GstBuffer *
2014 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
2015 {
2016   GstBuffer *buffer = NULL;
2017
2018   PAD_LOCK (pad);
2019   if (pad->priv->buffer) {
2020     GST_TRACE_OBJECT (pad, "Consuming buffer");
2021     buffer = pad->priv->buffer;
2022     pad->priv->buffer = NULL;
2023     if (pad->priv->pending_eos) {
2024       pad->priv->pending_eos = FALSE;
2025       pad->priv->eos = TRUE;
2026     }
2027     PAD_BROADCAST_EVENT (pad);
2028     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2029   }
2030   PAD_UNLOCK (pad);
2031
2032   return buffer;
2033 }
2034
2035 /**
2036  * gst_aggregator_pad_drop_buffer:
2037  * @pad: the pad where to drop any pending buffer
2038  *
2039  * Drop the buffer currently queued in @pad.
2040  *
2041  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2042  */
2043 gboolean
2044 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2045 {
2046   GstBuffer *buf;
2047
2048   buf = gst_aggregator_pad_steal_buffer (pad);
2049
2050   if (buf == NULL)
2051     return FALSE;
2052
2053   gst_buffer_unref (buf);
2054   return TRUE;
2055 }
2056
2057 /**
2058  * gst_aggregator_pad_get_buffer:
2059  * @pad: the pad to get buffer from
2060  *
2061  * Returns: (transfer full): A reference to the buffer in @pad or
2062  * NULL if no buffer was queued. You should unref the buffer after
2063  * usage.
2064  */
2065 GstBuffer *
2066 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
2067 {
2068   GstBuffer *buffer = NULL;
2069
2070   PAD_LOCK (pad);
2071   if (pad->priv->buffer)
2072     buffer = gst_buffer_ref (pad->priv->buffer);
2073   PAD_UNLOCK (pad);
2074
2075   return buffer;
2076 }
2077
2078 gboolean
2079 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2080 {
2081   gboolean is_eos;
2082
2083   PAD_LOCK (pad);
2084   is_eos = pad->priv->eos;
2085   PAD_UNLOCK (pad);
2086
2087   return is_eos;
2088 }
2089
2090 /**
2091  * gst_aggregator_merge_tags:
2092  * @self: a #GstAggregator
2093  * @tags: a #GstTagList to merge
2094  * @mode: the #GstTagMergeMode to use
2095  *
2096  * Adds tags to so-called pending tags, which will be processed
2097  * before pushing out data downstream.
2098  *
2099  * Note that this is provided for convenience, and the subclass is
2100  * not required to use this and can still do tag handling on its own.
2101  *
2102  * MT safe.
2103  */
2104 void
2105 gst_aggregator_merge_tags (GstAggregator * self,
2106     const GstTagList * tags, GstTagMergeMode mode)
2107 {
2108   GstTagList *otags;
2109
2110   g_return_if_fail (GST_IS_AGGREGATOR (self));
2111   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
2112
2113   /* FIXME Check if we can use OBJECT lock here! */
2114   GST_OBJECT_LOCK (self);
2115   if (tags)
2116     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2117   otags = self->priv->tags;
2118   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2119   if (otags)
2120     gst_tag_list_unref (otags);
2121   self->priv->tags_changed = TRUE;
2122   GST_OBJECT_UNLOCK (self);
2123 }
2124
2125 /**
2126  * gst_aggregator_set_latency:
2127  * @self: a #GstAggregator
2128  * @min_latency: minimum latency
2129  * @max_latency: maximum latency
2130  *
2131  * Lets #GstAggregator sub-classes tell the baseclass what their internal
2132  * latency is. Will also post a LATENCY message on the bus so the pipeline
2133  * can reconfigure its global latency.
2134  */
2135 void
2136 gst_aggregator_set_latency (GstAggregator * self,
2137     GstClockTime min_latency, GstClockTime max_latency)
2138 {
2139   gboolean changed = FALSE;
2140
2141   g_return_if_fail (GST_IS_AGGREGATOR (self));
2142   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
2143   g_return_if_fail (max_latency >= min_latency);
2144
2145   SRC_LOCK (self);
2146   if (self->priv->sub_latency_min != min_latency) {
2147     self->priv->sub_latency_min = min_latency;
2148     changed = TRUE;
2149   }
2150   if (self->priv->sub_latency_max != max_latency) {
2151     self->priv->sub_latency_max = max_latency;
2152     changed = TRUE;
2153   }
2154
2155   if (changed)
2156     SRC_BROADCAST (self);
2157   SRC_UNLOCK (self);
2158
2159   if (changed) {
2160     gst_element_post_message (GST_ELEMENT_CAST (self),
2161         gst_message_new_latency (GST_OBJECT_CAST (self)));
2162   }
2163 }