99767d0e9d8a608e91e4e99515ff01e2969b395a
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Implementers should at least implement
33  *    the aggregate () vmethod.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72
73 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
74 #define GST_CAT_DEFAULT aggregator_debug
75
76 /* GstAggregatorPad definitions */
77 #define PAD_LOCK_EVENT(pad)   G_STMT_START {                            \
78   GST_LOG_OBJECT (pad, "Taking EVENT lock from thread %p",              \
79         g_thread_self());                                               \
80   g_mutex_lock(&pad->priv->event_lock);                                 \
81   GST_LOG_OBJECT (pad, "Took EVENT lock from thread %p",              \
82         g_thread_self());                                               \
83   } G_STMT_END
84
85 #define PAD_UNLOCK_EVENT(pad)  G_STMT_START {                           \
86   GST_LOG_OBJECT (pad, "Releasing EVENT lock from thread %p",          \
87         g_thread_self());                                               \
88   g_mutex_unlock(&pad->priv->event_lock);                               \
89   GST_LOG_OBJECT (pad, "Release EVENT lock from thread %p",          \
90         g_thread_self());                                               \
91   } G_STMT_END
92
93
94 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
95   GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p",               \
96         g_thread_self());                                               \
97   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),       \
98       &(pad->priv->event_lock));                                        \
99   GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p",               \
100         g_thread_self());                                               \
101   } G_STMT_END
102
103 #define PAD_BROADCAST_EVENT(pad) {                                          \
104   GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p",               \
105         g_thread_self());                                                   \
106   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \
107   }
108
109 #define GST_AGGREGATOR_SETCAPS_LOCK(self)   G_STMT_START {        \
110   GST_LOG_OBJECT (self, "Taking SETCAPS lock from thread %p",   \
111         g_thread_self());                                         \
112   g_mutex_lock(&self->priv->setcaps_lock);                         \
113   GST_LOG_OBJECT (self, "Took SETCAPS lock from thread %p",     \
114         g_thread_self());                                         \
115   } G_STMT_END
116
117 #define GST_AGGREGATOR_SETCAPS_UNLOCK(self)   G_STMT_START {        \
118   GST_LOG_OBJECT (self, "Releasing SETCAPS lock from thread %p",  \
119         g_thread_self());                                           \
120   g_mutex_unlock(&self->priv->setcaps_lock);                         \
121   GST_LOG_OBJECT (self, "Took SETCAPS lock from thread %p",       \
122         g_thread_self());                                           \
123   } G_STMT_END
124
125 #define PAD_STREAM_LOCK(pad)   G_STMT_START {                            \
126   GST_LOG_OBJECT (pad, "Taking lock from thread %p",              \
127         g_thread_self());                                               \
128   g_mutex_lock(&pad->priv->stream_lock);                                 \
129   GST_LOG_OBJECT (pad, "Took lock from thread %p",              \
130         g_thread_self());                                               \
131   } G_STMT_END
132
133 #define PAD_STREAM_UNLOCK(pad)  G_STMT_START {                           \
134   GST_LOG_OBJECT (pad, "Releasing lock from thread %p",          \
135         g_thread_self());                                               \
136   g_mutex_unlock(&pad->priv->stream_lock);                               \
137   GST_LOG_OBJECT (pad, "Release lock from thread %p",          \
138         g_thread_self());                                               \
139   } G_STMT_END
140
141 struct _GstAggregatorPadPrivate
142 {
143   gboolean pending_flush_start;
144   gboolean pending_flush_stop;
145   gboolean pending_eos;
146   gboolean flushing;
147
148   GMutex event_lock;
149   GCond event_cond;
150
151   GMutex stream_lock;
152 };
153
154 static gboolean
155 _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
156 {
157   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
158
159   aggpad->eos = FALSE;
160   aggpad->priv->flushing = FALSE;
161
162   if (klass->flush)
163     return klass->flush (aggpad, agg);
164
165   return TRUE;
166 }
167
168 /*************************************
169  * GstAggregator implementation  *
170  *************************************/
171 static GstElementClass *aggregator_parent_class = NULL;
172
173 #define MAIN_CONTEXT_LOCK(self) G_STMT_START {                       \
174   GST_LOG_OBJECT (self, "Getting MAIN_CONTEXT_LOCK in thread %p",    \
175         g_thread_self());                                            \
176   g_mutex_lock(&((GstAggregator*)self)->priv->mcontext_lock);    \
177   GST_LOG_OBJECT (self, "Got MAIN_CONTEXT_LOCK in thread %p",        \
178         g_thread_self());                                            \
179 } G_STMT_END
180
181 #define MAIN_CONTEXT_UNLOCK(self) G_STMT_START {                     \
182   g_mutex_unlock(&((GstAggregator*)self)->priv->mcontext_lock);  \
183   GST_LOG_OBJECT (self, "Unlocked MAIN_CONTEXT_LOCK in thread %p",   \
184         g_thread_self());                                            \
185 } G_STMT_END
186
187 struct _GstAggregatorPrivate
188 {
189   gint padcount;
190
191   GMainContext *mcontext;
192
193   /* Our state is >= PAUSED */
194   gboolean running;
195
196   /* Ensure that when we remove all sources from the maincontext
197    * we can not add any source, avoiding:
198    * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */
199   GMutex mcontext_lock;
200   GList *gsources;
201
202   gint seqnum;
203   gboolean send_stream_start;
204   gboolean send_segment;
205   gboolean flush_seeking;
206   gboolean pending_flush_start;
207   gboolean send_eos;
208   GstFlowReturn flow_return;
209
210   GstCaps *srccaps;
211
212   GstTagList *tags;
213   gboolean tags_changed;
214
215   /* Lock to prevent two src setcaps from happening at the same time  */
216   GMutex setcaps_lock;
217 };
218
219 typedef struct
220 {
221   GstEvent *event;
222   gboolean result;
223   gboolean flush;
224
225   gboolean one_actually_seeked;
226 } EventData;
227
228 /**
229  * gst_aggregator_iterate_sinkpads:
230  * @self: The #GstAggregator
231  * @func: The function to call.
232  * @user_data: The data to pass to @func.
233  *
234  * Iterate the sinkpads of aggregator to call a function on them.
235  *
236  * This method guarantees that @func will be called only once for each
237  * sink pad.
238  */
239 gboolean
240 gst_aggregator_iterate_sinkpads (GstAggregator * self,
241     GstAggregatorPadForeachFunc func, gpointer user_data)
242 {
243   gboolean result = FALSE;
244   GstIterator *iter;
245   gboolean done = FALSE;
246   GValue item = { 0, };
247   GList *seen_pads = NULL;
248
249   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
250
251   if (!iter)
252     goto no_iter;
253
254   while (!done) {
255     switch (gst_iterator_next (iter, &item)) {
256       case GST_ITERATOR_OK:
257       {
258         GstPad *pad;
259
260         pad = g_value_get_object (&item);
261
262         /* if already pushed, skip. FIXME, find something faster to tag pads */
263         if (pad == NULL || g_list_find (seen_pads, pad)) {
264           g_value_reset (&item);
265           break;
266         }
267
268         GST_LOG_OBJECT (self, "calling function on pad %s:%s",
269             GST_DEBUG_PAD_NAME (pad));
270         result = func (self, pad, user_data);
271
272         done = !result;
273
274         seen_pads = g_list_prepend (seen_pads, pad);
275
276         g_value_reset (&item);
277         break;
278       }
279       case GST_ITERATOR_RESYNC:
280         gst_iterator_resync (iter);
281         break;
282       case GST_ITERATOR_ERROR:
283         GST_ERROR_OBJECT (self,
284             "Could not iterate over internally linked pads");
285         done = TRUE;
286         break;
287       case GST_ITERATOR_DONE:
288         done = TRUE;
289         break;
290     }
291   }
292   g_value_unset (&item);
293   gst_iterator_free (iter);
294
295   if (seen_pads == NULL) {
296     GST_DEBUG_OBJECT (self, "No pad seen");
297     return FALSE;
298   }
299
300   g_list_free (seen_pads);
301
302 no_iter:
303   return result;
304 }
305
306 static inline gboolean
307 _check_all_pads_with_data_or_eos (GstAggregator * self,
308     GstAggregatorPad * aggpad)
309 {
310   if (aggpad->buffer || aggpad->eos) {
311     return TRUE;
312   }
313
314   GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
315
316   return FALSE;
317 }
318
319 static void
320 _reset_flow_values (GstAggregator * self)
321 {
322   self->priv->flow_return = GST_FLOW_FLUSHING;
323   self->priv->send_stream_start = TRUE;
324   self->priv->send_segment = TRUE;
325   gst_segment_init (&self->segment, GST_FORMAT_TIME);
326 }
327
328 static inline void
329 _push_mandatory_events (GstAggregator * self)
330 {
331   GstAggregatorPrivate *priv = self->priv;
332
333   if (g_atomic_int_get (&self->priv->send_stream_start)) {
334     gchar s_id[32];
335
336     GST_INFO_OBJECT (self, "pushing stream start");
337     /* stream-start (FIXME: create id based on input ids) */
338     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
339     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
340       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
341     }
342     g_atomic_int_set (&self->priv->send_stream_start, FALSE);
343   }
344
345   if (self->priv->srccaps) {
346
347     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
348         self->priv->srccaps);
349     if (!gst_pad_push_event (self->srcpad,
350             gst_event_new_caps (self->priv->srccaps))) {
351       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
352     }
353     gst_caps_unref (self->priv->srccaps);
354     self->priv->srccaps = NULL;
355   }
356
357   if (g_atomic_int_get (&self->priv->send_segment)) {
358     if (!g_atomic_int_get (&self->priv->flush_seeking)) {
359       GstEvent *segev = gst_event_new_segment (&self->segment);
360
361       if (!self->priv->seqnum)
362         self->priv->seqnum = gst_event_get_seqnum (segev);
363       else
364         gst_event_set_seqnum (segev, self->priv->seqnum);
365
366       GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
367       gst_pad_push_event (self->srcpad, segev);
368       g_atomic_int_set (&self->priv->send_segment, FALSE);
369     }
370   }
371
372   if (priv->tags && priv->tags_changed) {
373     gst_pad_push_event (self->srcpad,
374         gst_event_new_tag (gst_tag_list_ref (priv->tags)));
375     priv->tags_changed = FALSE;
376   }
377 }
378
379 /**
380  * gst_aggregator_set_src_caps:
381  * @self: The #GstAggregator
382  * @caps: The #GstCaps to set on the src pad.
383  *
384  * Sets the caps to be used on the src pad.
385  */
386 void
387 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
388 {
389   GST_AGGREGATOR_SETCAPS_LOCK (self);
390   gst_caps_replace (&self->priv->srccaps, caps);
391   _push_mandatory_events (self);
392   GST_AGGREGATOR_SETCAPS_UNLOCK (self);
393 }
394
395 /**
396  * gst_aggregator_finish_buffer:
397  * @self: The #GstAggregator
398  * @buffer: the #GstBuffer to push.
399  *
400  * This method will take care of sending mandatory events before pushing
401  * the provided buffer.
402  */
403 GstFlowReturn
404 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
405 {
406   _push_mandatory_events (self);
407
408   if (!g_atomic_int_get (&self->priv->flush_seeking) &&
409       gst_pad_is_active (self->srcpad)) {
410     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
411     return gst_pad_push (self->srcpad, buffer);
412   } else {
413     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
414         g_atomic_int_get (&self->priv->flush_seeking),
415         gst_pad_is_active (self->srcpad));
416     gst_buffer_unref (buffer);
417     return GST_FLOW_OK;
418   }
419 }
420
421 static void
422 _push_eos (GstAggregator * self)
423 {
424   GstEvent *event;
425   _push_mandatory_events (self);
426
427   self->priv->send_eos = FALSE;
428   event = gst_event_new_eos ();
429   gst_event_set_seqnum (event, self->priv->seqnum);
430   gst_pad_push_event (self->srcpad, event);
431 }
432
433
434 static void
435 _destroy_gsource (GSource * source)
436 {
437   g_source_destroy (source);
438   g_source_unref (source);
439 }
440
441 static void
442 _remove_all_sources (GstAggregator * self)
443 {
444   GstAggregatorPrivate *priv = self->priv;
445
446   MAIN_CONTEXT_LOCK (self);
447   g_list_free_full (priv->gsources, (GDestroyNotify) _destroy_gsource);
448   priv->gsources = NULL;
449   MAIN_CONTEXT_UNLOCK (self);
450 }
451
452 static gboolean
453 aggregate_func (GstAggregator * self)
454 {
455   GstAggregatorPrivate *priv = self->priv;
456   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
457
458   GST_LOG_OBJECT (self, "Checking aggregate");
459   while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
460           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
461           NULL) && priv->running) {
462     GST_TRACE_OBJECT (self, "Actually aggregating!");
463
464     priv->flow_return = klass->aggregate (self);
465
466     if (priv->flow_return == GST_FLOW_EOS) {
467       g_main_context_wakeup (self->priv->mcontext);
468       _remove_all_sources (self);
469       _push_eos (self);
470     }
471
472     if (priv->flow_return == GST_FLOW_FLUSHING &&
473         g_atomic_int_get (&priv->flush_seeking))
474       priv->flow_return = GST_FLOW_OK;
475
476     GST_LOG_OBJECT (self, "flow return is %s",
477         gst_flow_get_name (priv->flow_return));
478
479     if (priv->flow_return != GST_FLOW_OK)
480       break;
481   }
482
483   return G_SOURCE_REMOVE;
484 }
485
486 static void
487 iterate_main_context_func (GstAggregator * self)
488 {
489   if (self->priv->running == FALSE) {
490     GST_DEBUG_OBJECT (self, "Not running anymore");
491
492     return;
493   }
494
495   g_main_context_iteration (self->priv->mcontext, TRUE);
496 }
497
498 static gboolean
499 _start (GstAggregator * self)
500 {
501   self->priv->running = TRUE;
502   self->priv->send_stream_start = TRUE;
503   self->priv->send_segment = TRUE;
504   self->priv->send_eos = TRUE;
505   self->priv->srccaps = NULL;
506   self->priv->flow_return = GST_FLOW_OK;
507
508   return TRUE;
509 }
510
511 static gboolean
512 _check_pending_flush_stop (GstAggregatorPad * pad)
513 {
514   return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
515 }
516
517 static gboolean
518 _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
519 {
520   gboolean res = TRUE;
521
522   GST_INFO_OBJECT (self, "%s srcpad task",
523       flush_start ? "Pausing" : "Stopping");
524
525   self->priv->running = FALSE;
526
527   /*  Clean the stack of GSource set on the MainContext */
528   g_main_context_wakeup (self->priv->mcontext);
529   _remove_all_sources (self);
530   if (flush_start) {
531     res = gst_pad_push_event (self->srcpad, flush_start);
532   }
533
534   gst_pad_stop_task (self->srcpad);
535
536   return res;
537 }
538
539 static void
540 _start_srcpad_task (GstAggregator * self)
541 {
542   GST_INFO_OBJECT (self, "Starting srcpad task");
543
544   self->priv->running = TRUE;
545   gst_pad_start_task (GST_PAD (self->srcpad),
546       (GstTaskFunction) iterate_main_context_func, self, NULL);
547 }
548
549 static inline void
550 _add_aggregate_gsource (GstAggregator * self)
551 {
552   GSource *source;
553   GstAggregatorPrivate *priv = self->priv;
554
555   MAIN_CONTEXT_LOCK (self);
556   source = g_idle_source_new ();
557   g_source_set_callback (source, (GSourceFunc) aggregate_func, self, NULL);
558   priv->gsources = g_list_prepend (priv->gsources, source);
559   g_source_attach (source, priv->mcontext);
560   MAIN_CONTEXT_UNLOCK (self);
561 }
562
563 static GstFlowReturn
564 _flush (GstAggregator * self)
565 {
566   GstFlowReturn ret = GST_FLOW_OK;
567   GstAggregatorPrivate *priv = self->priv;
568   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
569
570   GST_DEBUG_OBJECT (self, "Flushing everything");
571   g_atomic_int_set (&priv->send_segment, TRUE);
572   g_atomic_int_set (&priv->flush_seeking, FALSE);
573   g_atomic_int_set (&priv->tags_changed, FALSE);
574   if (klass->flush)
575     ret = klass->flush (self);
576
577   return ret;
578 }
579
580 static gboolean
581 _all_flush_stop_received (GstAggregator * self)
582 {
583   GList *tmp;
584   GstAggregatorPad *tmppad;
585
586   GST_OBJECT_LOCK (self);
587   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
588     tmppad = (GstAggregatorPad *) tmp->data;
589
590     if (_check_pending_flush_stop (tmppad) == FALSE) {
591       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
592           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
593       GST_OBJECT_UNLOCK (self);
594       return FALSE;
595     }
596   }
597   GST_OBJECT_UNLOCK (self);
598
599   return TRUE;
600 }
601
602 static void
603 _flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
604 {
605   GstBuffer *tmpbuf;
606   GstAggregatorPrivate *priv = self->priv;
607   GstAggregatorPadPrivate *padpriv = aggpad->priv;
608
609   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
610   /*  Remove pad buffer and wake up the streaming thread */
611   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
612   gst_buffer_replace (&tmpbuf, NULL);
613   PAD_STREAM_LOCK (aggpad);
614   if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
615           TRUE, FALSE) == TRUE) {
616     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
617     g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
618   }
619
620   if (g_atomic_int_get (&priv->flush_seeking)) {
621     /* If flush_seeking we forward the first FLUSH_START */
622     if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
623             TRUE, FALSE) == TRUE) {
624
625       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
626       _stop_srcpad_task (self, event);
627       priv->flow_return = GST_FLOW_OK;
628
629       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
630       GST_PAD_STREAM_LOCK (self->srcpad);
631       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
632       event = NULL;
633     }
634   } else {
635     gst_event_unref (event);
636   }
637   PAD_STREAM_UNLOCK (aggpad);
638
639   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
640   gst_buffer_replace (&tmpbuf, NULL);
641 }
642
643 /* GstAggregator vmethods default implementations */
644 static gboolean
645 _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
646 {
647   gboolean res = TRUE;
648   GstPad *pad = GST_PAD (aggpad);
649   GstAggregatorPrivate *priv = self->priv;
650
651   switch (GST_EVENT_TYPE (event)) {
652     case GST_EVENT_FLUSH_START:
653     {
654       _flush_start (self, aggpad, event);
655       /* We forward only in one case: right after flush_seeking */
656       event = NULL;
657       goto eat;
658     }
659     case GST_EVENT_FLUSH_STOP:
660     {
661       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
662
663       _aggpad_flush (aggpad, self);
664       if (g_atomic_int_get (&priv->flush_seeking)) {
665         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
666
667         if (g_atomic_int_get (&priv->flush_seeking)) {
668           if (_all_flush_stop_received (self)) {
669             /* That means we received FLUSH_STOP/FLUSH_STOP on
670              * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
671             _flush (self);
672             gst_pad_push_event (self->srcpad, event);
673             priv->send_eos = TRUE;
674             event = NULL;
675             _add_aggregate_gsource (self);
676
677             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
678             GST_PAD_STREAM_UNLOCK (self->srcpad);
679             _start_srcpad_task (self);
680           }
681         }
682       }
683
684       /* We never forward the event */
685       goto eat;
686     }
687     case GST_EVENT_EOS:
688     {
689       GST_DEBUG_OBJECT (aggpad, "EOS");
690
691       /* We still have a buffer, and we don't want the subclass to have to
692        * check for it. Mark pending_eos, eos will be set when steal_buffer is
693        * called
694        */
695       PAD_LOCK_EVENT (aggpad);
696       if (!aggpad->buffer) {
697         aggpad->eos = TRUE;
698       } else {
699         aggpad->priv->pending_eos = TRUE;
700       }
701       PAD_UNLOCK_EVENT (aggpad);
702
703       _add_aggregate_gsource (self);
704       goto eat;
705     }
706     case GST_EVENT_SEGMENT:
707     {
708       PAD_LOCK_EVENT (aggpad);
709       gst_event_copy_segment (event, &aggpad->segment);
710       PAD_UNLOCK_EVENT (aggpad);
711       goto eat;
712     }
713     case GST_EVENT_STREAM_START:
714     {
715       goto eat;
716     }
717     case GST_EVENT_TAG:
718     {
719       GstTagList *tags;
720
721       gst_event_parse_tag (event, &tags);
722
723       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
724         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
725         gst_event_unref (event);
726         event = NULL;
727         goto eat;
728       }
729       break;
730     }
731     default:
732     {
733       break;
734     }
735   }
736
737   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
738   return gst_pad_event_default (pad, GST_OBJECT (self), event);
739
740 eat:
741   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
742   if (event)
743     gst_event_unref (event);
744
745   return res;
746 }
747
748 static gboolean
749 _flush_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
750 {
751   _aggpad_flush (pad, self);
752
753   return TRUE;
754 }
755
756 static gboolean
757 _stop (GstAggregator * agg)
758 {
759   _reset_flow_values (agg);
760
761   gst_aggregator_iterate_sinkpads (agg,
762       (GstAggregatorPadForeachFunc) _flush_pad, NULL);
763
764   if (agg->priv->tags)
765     gst_tag_list_unref (agg->priv->tags);
766   agg->priv->tags = NULL;
767
768   return TRUE;
769 }
770
771 /* GstElement vmethods implementations */
772 static GstStateChangeReturn
773 _change_state (GstElement * element, GstStateChange transition)
774 {
775   GstStateChangeReturn ret;
776   GstAggregator *self = GST_AGGREGATOR (element);
777   GstAggregatorClass *agg_class = GST_AGGREGATOR_GET_CLASS (self);
778
779
780   switch (transition) {
781     case GST_STATE_CHANGE_READY_TO_PAUSED:
782       agg_class->start (self);
783       break;
784     default:
785       break;
786   }
787
788   if ((ret =
789           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
790               transition)) == GST_STATE_CHANGE_FAILURE)
791     goto failure;
792
793
794   switch (transition) {
795     case GST_STATE_CHANGE_PAUSED_TO_READY:
796       agg_class->stop (self);
797       break;
798     default:
799       break;
800   }
801
802   return ret;
803
804 failure:
805   {
806     GST_ERROR_OBJECT (element, "parent failed state change");
807     return ret;
808   }
809 }
810
811 static void
812 _release_pad (GstElement * element, GstPad * pad)
813 {
814   GstBuffer *tmpbuf;
815
816   GstAggregator *self = GST_AGGREGATOR (element);
817   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
818
819   GST_INFO_OBJECT (pad, "Removing pad");
820
821   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
822   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
823   gst_buffer_replace (&tmpbuf, NULL);
824   gst_element_remove_pad (element, pad);
825
826   /* Something changed make sure we try to aggregate */
827   _add_aggregate_gsource (self);
828 }
829
830 static GstPad *
831 _request_new_pad (GstElement * element,
832     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
833 {
834   GstAggregator *self;
835   GstAggregatorPad *agg_pad;
836
837   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
838   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
839
840   self = GST_AGGREGATOR (element);
841
842   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
843     gint serial = 0;
844     gchar *name = NULL;
845
846     GST_OBJECT_LOCK (element);
847     if (req_name == NULL || strlen (req_name) < 6
848         || !g_str_has_prefix (req_name, "sink_")) {
849       /* no name given when requesting the pad, use next available int */
850       priv->padcount++;
851     } else {
852       /* parse serial number from requested padname */
853       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
854       if (serial >= priv->padcount)
855         priv->padcount = serial;
856     }
857
858     name = g_strdup_printf ("sink_%u", priv->padcount);
859     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
860         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
861     g_free (name);
862     GST_OBJECT_UNLOCK (element);
863
864   } else {
865     return NULL;
866   }
867
868   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
869
870   if (priv->running)
871     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
872
873   /* add the pad to the element */
874   gst_element_add_pad (element, GST_PAD (agg_pad));
875
876   return GST_PAD (agg_pad);
877 }
878
879 static gboolean
880 _send_event (GstElement * element, GstEvent * event)
881 {
882   GstAggregator *self = GST_AGGREGATOR (element);
883
884   GST_STATE_LOCK (element);
885   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
886       GST_STATE (element) < GST_STATE_PAUSED) {
887     gdouble rate;
888     GstFormat fmt;
889     GstSeekFlags flags;
890     GstSeekType start_type, stop_type;
891     gint64 start, stop;
892
893     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
894         &start, &stop_type, &stop);
895     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
896         stop_type, stop, NULL);
897
898     self->priv->seqnum = gst_event_get_seqnum (event);
899     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
900   }
901   GST_STATE_UNLOCK (element);
902
903
904   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
905       event);
906 }
907
908 static gboolean
909 _src_query (GstAggregator * self, GstQuery * query)
910 {
911   gboolean res = TRUE;
912
913   switch (GST_QUERY_TYPE (query)) {
914     case GST_QUERY_SEEKING:
915     {
916       GstFormat format;
917
918       /* don't pass it along as some (file)sink might claim it does
919        * whereas with a collectpads in between that will not likely work */
920       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
921       gst_query_set_seeking (query, format, FALSE, 0, -1);
922       res = TRUE;
923
924       goto discard;
925     }
926     default:
927       break;
928   }
929
930   return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
931
932 discard:
933   return res;
934 }
935
936 static gboolean
937 event_forward_func (GstPad * pad, EventData * evdata)
938 {
939   gboolean ret = TRUE;
940   GstPad *peer = gst_pad_get_peer (pad);
941   GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv;
942
943   if (peer) {
944     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
945     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
946     gst_object_unref (peer);
947   }
948
949   if (ret == FALSE) {
950     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
951       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
952
953     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
954       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
955
956       if (gst_pad_query (peer, seeking)) {
957         gboolean seekable;
958
959         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
960
961         if (seekable == FALSE) {
962           GST_INFO_OBJECT (pad,
963               "Source not seekable, We failed but it does not matter!");
964
965           ret = TRUE;
966         }
967       } else {
968         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
969       }
970     }
971
972     if (evdata->flush) {
973       padpriv->pending_flush_start = FALSE;
974       padpriv->pending_flush_stop = FALSE;
975     }
976   } else {
977     evdata->one_actually_seeked = TRUE;
978   }
979
980   evdata->result &= ret;
981
982   /* Always send to all pads */
983   return FALSE;
984 }
985
986 static gboolean
987 _set_flush_pending (GstAggregator * self, GstAggregatorPad * pad,
988     gpointer udata)
989 {
990   pad->priv->pending_flush_start = TRUE;
991   pad->priv->pending_flush_stop = FALSE;
992
993   return TRUE;
994 }
995
996 static EventData
997 _forward_event_to_all_sinkpads (GstAggregator * self, GstEvent * event,
998     gboolean flush)
999 {
1000   EventData evdata;
1001
1002   evdata.event = event;
1003   evdata.result = TRUE;
1004   evdata.flush = flush;
1005   evdata.one_actually_seeked = FALSE;
1006
1007   /* We first need to set all pads as flushing in a first pass
1008    * as flush_start flush_stop is sometimes sent synchronously
1009    * while we send the seek event */
1010   if (flush)
1011     gst_aggregator_iterate_sinkpads (self,
1012         (GstAggregatorPadForeachFunc) _set_flush_pending, NULL);
1013   gst_pad_forward (self->srcpad, (GstPadForwardFunction) event_forward_func,
1014       &evdata);
1015
1016   gst_event_unref (event);
1017
1018   return evdata;
1019 }
1020
1021 static gboolean
1022 _do_seek (GstAggregator * self, GstEvent * event)
1023 {
1024   gdouble rate;
1025   GstFormat fmt;
1026   GstSeekFlags flags;
1027   GstSeekType start_type, stop_type;
1028   gint64 start, stop;
1029   gboolean flush;
1030   EventData evdata;
1031   GstAggregatorPrivate *priv = self->priv;
1032
1033   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1034       &start, &stop_type, &stop);
1035
1036   GST_INFO_OBJECT (self, "starting SEEK");
1037
1038   flush = flags & GST_SEEK_FLAG_FLUSH;
1039
1040   if (flush) {
1041     g_atomic_int_set (&priv->pending_flush_start, TRUE);
1042     g_atomic_int_set (&priv->flush_seeking, TRUE);
1043   }
1044
1045   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1046       stop_type, stop, NULL);
1047
1048   /* forward the seek upstream */
1049   evdata = _forward_event_to_all_sinkpads (self, event, flush);
1050   event = NULL;
1051
1052   if (!evdata.result || !evdata.one_actually_seeked) {
1053     g_atomic_int_set (&priv->flush_seeking, FALSE);
1054     g_atomic_int_set (&priv->pending_flush_start, FALSE);
1055   }
1056
1057   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1058
1059   return evdata.result;
1060 }
1061
1062 static gboolean
1063 _src_event (GstAggregator * self, GstEvent * event)
1064 {
1065   EventData evdata;
1066   gboolean res = TRUE;
1067
1068   switch (GST_EVENT_TYPE (event)) {
1069     case GST_EVENT_SEEK:
1070     {
1071       gint old_seqnum = self->priv->seqnum;
1072       self->priv->seqnum = gst_event_get_seqnum (event);
1073       gst_event_ref (event);
1074       res = _do_seek (self, event);
1075       if (!res)
1076         self->priv->seqnum = old_seqnum;
1077       gst_event_unref (event);
1078       event = NULL;
1079       goto done;
1080     }
1081     case GST_EVENT_NAVIGATION:
1082     {
1083       /* navigation is rather pointless. */
1084       res = FALSE;
1085       gst_event_unref (event);
1086       goto done;
1087     }
1088     default:
1089     {
1090       break;
1091     }
1092   }
1093
1094   evdata = _forward_event_to_all_sinkpads (self, event, FALSE);
1095   res = evdata.result;
1096
1097 done:
1098   return res;
1099 }
1100
1101 static gboolean
1102 src_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
1103 {
1104   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1105
1106   return klass->src_event (GST_AGGREGATOR (parent), event);
1107 }
1108
1109 static gboolean
1110 src_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
1111 {
1112   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1113
1114   return klass->src_query (GST_AGGREGATOR (parent), query);
1115 }
1116
1117 static gboolean
1118 src_activate_mode (GstPad * pad,
1119     GstObject * parent, GstPadMode mode, gboolean active)
1120 {
1121   GstAggregator *self = GST_AGGREGATOR (parent);
1122   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1123
1124   if (klass->src_activate) {
1125     if (klass->src_activate (self, mode, active) == FALSE) {
1126       return FALSE;
1127     }
1128   }
1129
1130   if (active == TRUE) {
1131     switch (mode) {
1132       case GST_PAD_MODE_PUSH:
1133       {
1134         GST_INFO_OBJECT (pad, "Activating pad!");
1135         _start_srcpad_task (self);
1136         return TRUE;
1137       }
1138       default:
1139       {
1140         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1141         return FALSE;
1142       }
1143     }
1144   }
1145
1146   /* deactivating */
1147   GST_INFO_OBJECT (self, "Deactivating srcpad");
1148   _stop_srcpad_task (self, FALSE);
1149
1150   return TRUE;
1151 }
1152
1153 static gboolean
1154 _sink_query (GstAggregator * self, GstAggregatorPad * aggpad, GstQuery * query)
1155 {
1156   GstPad *pad = GST_PAD (aggpad);
1157
1158   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1159 }
1160
1161 static void
1162 gst_aggregator_finalize (GObject * object)
1163 {
1164   GstAggregator *self = (GstAggregator *) object;
1165
1166   g_mutex_clear (&self->priv->mcontext_lock);
1167   g_mutex_clear (&self->priv->setcaps_lock);
1168
1169   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1170 }
1171
1172 static void
1173 gst_aggregator_dispose (GObject * object)
1174 {
1175   GstAggregator *self = (GstAggregator *) object;
1176
1177   G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
1178
1179   g_main_context_unref (self->priv->mcontext);
1180   _remove_all_sources (self);
1181 }
1182
1183 /* GObject vmethods implementations */
1184 static void
1185 gst_aggregator_class_init (GstAggregatorClass * klass)
1186 {
1187   GObjectClass *gobject_class = (GObjectClass *) klass;
1188   GstElementClass *gstelement_class = (GstElementClass *) klass;
1189
1190   aggregator_parent_class = g_type_class_peek_parent (klass);
1191   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1192
1193   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1194       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1195
1196   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1197   klass->start = _start;
1198   klass->stop = _stop;
1199
1200   klass->sink_event = _sink_event;
1201   klass->sink_query = _sink_query;
1202
1203   klass->src_event = _src_event;
1204   klass->src_query = _src_query;
1205
1206   gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (_request_new_pad);
1207   gstelement_class->send_event = GST_DEBUG_FUNCPTR (_send_event);
1208   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad);
1209   gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state);
1210
1211   gobject_class->finalize = gst_aggregator_finalize;
1212   gobject_class->dispose = gst_aggregator_dispose;
1213 }
1214
1215 static void
1216 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1217 {
1218   GstPadTemplate *pad_template;
1219   GstAggregatorPrivate *priv;
1220
1221   g_return_if_fail (klass->aggregate != NULL);
1222
1223   self->priv =
1224       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1225       GstAggregatorPrivate);
1226
1227   priv = self->priv;
1228
1229   pad_template =
1230       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1231   g_return_if_fail (pad_template != NULL);
1232
1233   priv->padcount = -1;
1234   priv->tags_changed = FALSE;
1235   _reset_flow_values (self);
1236
1237   priv->mcontext = g_main_context_new ();
1238   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1239
1240   gst_pad_set_event_function (self->srcpad,
1241       GST_DEBUG_FUNCPTR ((GstPadEventFunction) src_event_func));
1242   gst_pad_set_query_function (self->srcpad,
1243       GST_DEBUG_FUNCPTR ((GstPadQueryFunction) src_query_func));
1244   gst_pad_set_activatemode_function (self->srcpad,
1245       GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) src_activate_mode));
1246
1247   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1248
1249   g_mutex_init (&self->priv->mcontext_lock);
1250   g_mutex_init (&self->priv->setcaps_lock);
1251 }
1252
1253 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1254  * method to get to the padtemplates */
1255 GType
1256 gst_aggregator_get_type (void)
1257 {
1258   static volatile gsize type = 0;
1259
1260   if (g_once_init_enter (&type)) {
1261     GType _type;
1262     static const GTypeInfo info = {
1263       sizeof (GstAggregatorClass),
1264       NULL,
1265       NULL,
1266       (GClassInitFunc) gst_aggregator_class_init,
1267       NULL,
1268       NULL,
1269       sizeof (GstAggregator),
1270       0,
1271       (GInstanceInitFunc) gst_aggregator_init,
1272     };
1273
1274     _type = g_type_register_static (GST_TYPE_ELEMENT,
1275         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1276     g_once_init_leave (&type, _type);
1277   }
1278   return type;
1279 }
1280
1281 static GstFlowReturn
1282 _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1283 {
1284   GstBuffer *actual_buf = buffer;
1285   GstAggregator *self = GST_AGGREGATOR (object);
1286   GstAggregatorPrivate *priv = self->priv;
1287   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1288   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1289
1290   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1291
1292   PAD_STREAM_LOCK (aggpad);
1293   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1294     goto flushing;
1295
1296   if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
1297     goto eos;
1298
1299   PAD_LOCK_EVENT (aggpad);
1300   if (aggpad->buffer) {
1301     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1302     PAD_WAIT_EVENT (aggpad);
1303   }
1304   PAD_UNLOCK_EVENT (aggpad);
1305
1306   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1307     goto flushing;
1308
1309   if (aggclass->clip) {
1310     aggclass->clip (self, aggpad, buffer, &actual_buf);
1311   }
1312
1313   PAD_LOCK_EVENT (aggpad);
1314   if (aggpad->buffer)
1315     gst_buffer_unref (aggpad->buffer);
1316   aggpad->buffer = actual_buf;
1317   PAD_UNLOCK_EVENT (aggpad);
1318   PAD_STREAM_UNLOCK (aggpad);
1319
1320   _add_aggregate_gsource (self);
1321
1322   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1323
1324   return priv->flow_return;
1325
1326 flushing:
1327   PAD_STREAM_UNLOCK (aggpad);
1328
1329   gst_buffer_unref (buffer);
1330   GST_DEBUG_OBJECT (aggpad, "We are flushing");
1331
1332   return GST_FLOW_FLUSHING;
1333
1334 eos:
1335   PAD_STREAM_UNLOCK (aggpad);
1336
1337   gst_buffer_unref (buffer);
1338   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1339
1340   return GST_FLOW_EOS;
1341 }
1342
1343 static gboolean
1344 pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
1345 {
1346   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1347
1348   return klass->sink_query (GST_AGGREGATOR (parent),
1349       GST_AGGREGATOR_PAD (pad), query);
1350 }
1351
1352 static gboolean
1353 pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
1354 {
1355   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1356
1357   return klass->sink_event (GST_AGGREGATOR (parent),
1358       GST_AGGREGATOR_PAD (pad), event);
1359 }
1360
1361 static gboolean
1362 pad_activate_mode_func (GstPad * pad,
1363     GstObject * parent, GstPadMode mode, gboolean active)
1364 {
1365   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1366
1367   if (active == FALSE) {
1368     PAD_LOCK_EVENT (aggpad);
1369     g_atomic_int_set (&aggpad->priv->flushing, TRUE);
1370     gst_buffer_replace (&aggpad->buffer, NULL);
1371     PAD_BROADCAST_EVENT (aggpad);
1372     PAD_UNLOCK_EVENT (aggpad);
1373   } else {
1374     g_atomic_int_set (&aggpad->priv->flushing, FALSE);
1375     PAD_LOCK_EVENT (aggpad);
1376     PAD_BROADCAST_EVENT (aggpad);
1377     PAD_UNLOCK_EVENT (aggpad);
1378   }
1379
1380   return TRUE;
1381 }
1382
1383 /***********************************
1384  * GstAggregatorPad implementation  *
1385  ************************************/
1386 static GstPadClass *aggregator_pad_parent_class = NULL;
1387 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1388
1389 static void
1390 _pad_constructed (GObject * object)
1391 {
1392   GstPad *pad = GST_PAD (object);
1393
1394   gst_pad_set_chain_function (pad,
1395       GST_DEBUG_FUNCPTR ((GstPadChainFunction) _chain));
1396   gst_pad_set_event_function (pad,
1397       GST_DEBUG_FUNCPTR ((GstPadEventFunction) pad_event_func));
1398   gst_pad_set_query_function (pad,
1399       GST_DEBUG_FUNCPTR ((GstPadQueryFunction) pad_query_func));
1400   gst_pad_set_activatemode_function (pad,
1401       GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) pad_activate_mode_func));
1402 }
1403
1404 static void
1405 gst_aggregator_pad_finalize (GObject * object)
1406 {
1407   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1408
1409   g_mutex_clear (&pad->priv->event_lock);
1410   g_cond_clear (&pad->priv->event_cond);
1411   g_mutex_clear (&pad->priv->stream_lock);
1412
1413   G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object);
1414 }
1415
1416 static void
1417 gst_aggregator_pad_dispose (GObject * object)
1418 {
1419   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1420   GstBuffer *buf;
1421
1422   buf = gst_aggregator_pad_steal_buffer (pad);
1423   if (buf)
1424     gst_buffer_unref (buf);
1425
1426   G_OBJECT_CLASS (aggregator_pad_parent_class)->dispose (object);
1427 }
1428
1429 static void
1430 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
1431 {
1432   GObjectClass *gobject_class = (GObjectClass *) klass;
1433
1434   aggregator_pad_parent_class = g_type_class_peek_parent (klass);
1435   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
1436
1437   gobject_class->constructed = GST_DEBUG_FUNCPTR (_pad_constructed);
1438   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_aggregator_pad_finalize);
1439   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_aggregator_pad_dispose);
1440 }
1441
1442 static void
1443 gst_aggregator_pad_init (GstAggregatorPad * pad)
1444 {
1445   pad->priv =
1446       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
1447       GstAggregatorPadPrivate);
1448
1449   pad->buffer = NULL;
1450   g_mutex_init (&pad->priv->event_lock);
1451   g_cond_init (&pad->priv->event_cond);
1452
1453   g_mutex_init (&pad->priv->stream_lock);
1454 }
1455
1456 /**
1457  * gst_aggregator_pad_steal_buffer:
1458  * @pad: the pad to get buffer from
1459  *
1460  * Steal the ref to the buffer currently queued in @pad.
1461  *
1462  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
1463  *   queued. You should unref the buffer after usage.
1464  */
1465 GstBuffer *
1466 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
1467 {
1468   GstBuffer *buffer = NULL;
1469
1470   PAD_LOCK_EVENT (pad);
1471   if (pad->buffer) {
1472     GST_TRACE_OBJECT (pad, "Consuming buffer");
1473     buffer = pad->buffer;
1474     pad->buffer = NULL;
1475     if (pad->priv->pending_eos) {
1476       pad->priv->pending_eos = FALSE;
1477       pad->eos = TRUE;
1478     }
1479     PAD_BROADCAST_EVENT (pad);
1480     GST_DEBUG_OBJECT (pad, "Consummed: %" GST_PTR_FORMAT, buffer);
1481   }
1482   PAD_UNLOCK_EVENT (pad);
1483
1484   return buffer;
1485 }
1486
1487 /**
1488  * gst_aggregator_pad_get_buffer:
1489  * @pad: the pad to get buffer from
1490  *
1491  * Returns: (transfer full): A reference to the buffer in @pad or
1492  * NULL if no buffer was queued. You should unref the buffer after
1493  * usage.
1494  */
1495 GstBuffer *
1496 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
1497 {
1498   GstBuffer *buffer = NULL;
1499
1500   PAD_LOCK_EVENT (pad);
1501   if (pad->buffer)
1502     buffer = gst_buffer_ref (pad->buffer);
1503   PAD_UNLOCK_EVENT (pad);
1504
1505   return buffer;
1506 }
1507
1508 /**
1509  * gst_aggregator_merge_tags:
1510  * @self: a #GstAggregator
1511  * @tags: a #GstTagList to merge
1512  * @mode: the #GstTagMergeMode to use
1513  *
1514  * Adds tags to so-called pending tags, which will be processed
1515  * before pushing out data downstream.
1516  *
1517  * Note that this is provided for convenience, and the subclass is
1518  * not required to use this and can still do tag handling on its own.
1519  *
1520  * MT safe.
1521  */
1522 void
1523 gst_aggregator_merge_tags (GstAggregator * self,
1524     const GstTagList * tags, GstTagMergeMode mode)
1525 {
1526   GstTagList *otags;
1527
1528   g_return_if_fail (GST_IS_AGGREGATOR (self));
1529   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
1530
1531   /* FIXME Check if we can use OBJECT lock here! */
1532   GST_OBJECT_LOCK (self);
1533   if (tags)
1534     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
1535   otags = self->priv->tags;
1536   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
1537   if (otags)
1538     gst_tag_list_unref (otags);
1539   self->priv->tags_changed = TRUE;
1540   GST_OBJECT_UNLOCK (self);
1541 }