7eab8063a0a55339b666c17825d58213327073b5
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Implementers should at least implement
33  *    the aggregate () vmethod.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72
73 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
74 #define GST_CAT_DEFAULT aggregator_debug
75
76 /* GstAggregatorPad definitions */
77 #define PAD_LOCK_EVENT(pad)   G_STMT_START {                            \
78   GST_LOG_OBJECT (pad, "Taking EVENT lock from thread %p",              \
79         g_thread_self());                                               \
80   g_mutex_lock(&pad->priv->event_lock);                                 \
81   GST_LOG_OBJECT (pad, "Took EVENT lock from thread %p",              \
82         g_thread_self());                                               \
83   } G_STMT_END
84
85 #define PAD_UNLOCK_EVENT(pad)  G_STMT_START {                           \
86   GST_LOG_OBJECT (pad, "Releasing EVENT lock from thread %p",          \
87         g_thread_self());                                               \
88   g_mutex_unlock(&pad->priv->event_lock);                               \
89   GST_LOG_OBJECT (pad, "Release EVENT lock from thread %p",          \
90         g_thread_self());                                               \
91   } G_STMT_END
92
93
94 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
95   GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p",               \
96         g_thread_self());                                               \
97   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),       \
98       &(pad->priv->event_lock));                                        \
99   GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p",               \
100         g_thread_self());                                               \
101   } G_STMT_END
102
103 #define PAD_BROADCAST_EVENT(pad) {                                          \
104   GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p",               \
105         g_thread_self());                                                   \
106   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \
107   }
108
109 struct _GstAggregatorPadPrivate
110 {
111   gboolean pending_flush_start;
112   gboolean pending_flush_stop;
113   gboolean pending_eos;
114   gboolean flushing;
115
116   GMutex event_lock;
117   GCond event_cond;
118 };
119
120 static gboolean
121 _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
122 {
123   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
124
125   aggpad->eos = FALSE;
126   aggpad->priv->flushing = FALSE;
127
128   if (klass->flush)
129     return klass->flush (aggpad, agg);
130
131   return TRUE;
132 }
133
134 /*************************************
135  * GstAggregator implementation  *
136  *************************************/
137 static GstElementClass *aggregator_parent_class = NULL;
138
139 #define MAIN_CONTEXT_LOCK(self) G_STMT_START {                       \
140   GST_LOG_OBJECT (self, "Getting MAIN_CONTEXT_LOCK in thread %p",    \
141         g_thread_self());                                            \
142   g_mutex_lock(&((GstAggregator*)self)->priv->mcontext_lock);    \
143   GST_LOG_OBJECT (self, "Got MAIN_CONTEXT_LOCK in thread %p",        \
144         g_thread_self());                                            \
145 } G_STMT_END
146
147 #define MAIN_CONTEXT_UNLOCK(self) G_STMT_START {                     \
148   g_mutex_unlock(&((GstAggregator*)self)->priv->mcontext_lock);  \
149   GST_LOG_OBJECT (self, "Unlocked MAIN_CONTEXT_LOCK in thread %p",   \
150         g_thread_self());                                            \
151 } G_STMT_END
152
153 struct _GstAggregatorPrivate
154 {
155   gint padcount;
156
157   GMainContext *mcontext;
158
159   /* Our state is >= PAUSED */
160   gboolean running;
161
162   /* Ensure that when we remove all sources from the maincontext
163    * we can not add any source, avoiding:
164    * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */
165   GMutex mcontext_lock;
166   GList *gsources;
167
168   gint seqnum;
169   gboolean send_stream_start;
170   gboolean send_segment;
171   gboolean flush_seeking;
172   gboolean pending_flush_start;
173   gboolean send_eos;
174   GstFlowReturn flow_return;
175
176   GstCaps *srccaps;
177
178   GstTagList *tags;
179   gboolean tags_changed;
180 };
181
182 typedef struct
183 {
184   GstEvent *event;
185   gboolean result;
186   gboolean flush;
187 } EventData;
188
189 /**
190  * gst_aggregator_iterate_sinkpads:
191  * @self: The #GstAggregator
192  * @func: The function to call.
193  * @user_data: The data to pass to @func.
194  *
195  * Iterate the sinkpads of aggregator to call a function on them.
196  *
197  * This method guarantees that @func will be called only once for each
198  * sink pad.
199  */
200 gboolean
201 gst_aggregator_iterate_sinkpads (GstAggregator * self,
202     GstAggregatorPadForeachFunc func, gpointer user_data)
203 {
204   gboolean result = FALSE;
205   GstIterator *iter;
206   gboolean done = FALSE;
207   GValue item = { 0, };
208   GList *seen_pads = NULL;
209
210   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
211
212   if (!iter)
213     goto no_iter;
214
215   while (!done) {
216     switch (gst_iterator_next (iter, &item)) {
217       case GST_ITERATOR_OK:
218       {
219         GstPad *pad;
220
221         pad = g_value_get_object (&item);
222
223         /* if already pushed, skip. FIXME, find something faster to tag pads */
224         if (pad == NULL || g_list_find (seen_pads, pad)) {
225           g_value_reset (&item);
226           break;
227         }
228
229         GST_LOG_OBJECT (self, "calling function on pad %s:%s",
230             GST_DEBUG_PAD_NAME (pad));
231         result = func (self, pad, user_data);
232
233         done = !result;
234
235         seen_pads = g_list_prepend (seen_pads, pad);
236
237         g_value_reset (&item);
238         break;
239       }
240       case GST_ITERATOR_RESYNC:
241         gst_iterator_resync (iter);
242         break;
243       case GST_ITERATOR_ERROR:
244         GST_ERROR_OBJECT (self,
245             "Could not iterate over internally linked pads");
246         done = TRUE;
247         break;
248       case GST_ITERATOR_DONE:
249         done = TRUE;
250         break;
251     }
252   }
253   g_value_unset (&item);
254   gst_iterator_free (iter);
255
256   if (seen_pads == NULL) {
257     GST_DEBUG_OBJECT (self, "No pad seen");
258     return FALSE;
259   }
260
261   g_list_free (seen_pads);
262
263 no_iter:
264   return result;
265 }
266
267 static inline gboolean
268 _check_all_pads_with_data_or_eos (GstAggregator * self,
269     GstAggregatorPad * aggpad)
270 {
271   if (aggpad->buffer || aggpad->eos) {
272     return TRUE;
273   }
274
275   GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
276
277   return FALSE;
278 }
279
280 /**
281  * gst_aggregator_set_src_caps:
282  * @self: The #GstAggregator
283  * @caps: The #GstCaps to set later on the src pad.
284  *
285  * Sets the caps to be used on the src pad.
286  */
287 void
288 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
289 {
290   gst_caps_replace (&self->priv->srccaps, caps);
291 }
292
293 static void
294 _reset_flow_values (GstAggregator * self)
295 {
296   self->priv->flow_return = GST_FLOW_FLUSHING;
297   self->priv->send_stream_start = TRUE;
298   self->priv->send_segment = TRUE;
299   gst_segment_init (&self->segment, GST_FORMAT_TIME);
300 }
301
302 static inline void
303 _push_mandatory_events (GstAggregator * self)
304 {
305   GstAggregatorPrivate *priv = self->priv;
306
307   if (g_atomic_int_get (&self->priv->send_stream_start)) {
308     gchar s_id[32];
309
310     GST_INFO_OBJECT (self, "pushing stream start");
311     /* stream-start (FIXME: create id based on input ids) */
312     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
313     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
314       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
315     }
316     g_atomic_int_set (&self->priv->send_stream_start, FALSE);
317   }
318
319   if (self->priv->srccaps) {
320
321     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
322         self->priv->srccaps);
323     if (!gst_pad_push_event (self->srcpad,
324             gst_event_new_caps (self->priv->srccaps))) {
325       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
326     }
327     gst_caps_unref (self->priv->srccaps);
328     self->priv->srccaps = NULL;
329   }
330
331   if (g_atomic_int_get (&self->priv->send_segment)) {
332     if (!g_atomic_int_get (&self->priv->flush_seeking)) {
333       GstEvent *segev = gst_event_new_segment (&self->segment);
334
335       if (!self->priv->seqnum)
336         self->priv->seqnum = gst_event_get_seqnum (segev);
337       else
338         gst_event_set_seqnum (segev, self->priv->seqnum);
339
340       GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
341       gst_pad_push_event (self->srcpad, segev);
342       g_atomic_int_set (&self->priv->send_segment, FALSE);
343     }
344   }
345
346   if (priv->tags && priv->tags_changed) {
347     gst_pad_push_event (self->srcpad,
348         gst_event_new_tag (gst_tag_list_ref (priv->tags)));
349     priv->tags_changed = FALSE;
350   }
351 }
352
353 /**
354  * gst_aggregator_finish_buffer:
355  * @self: The #GstAggregator
356  * @buffer: the #GstBuffer to push.
357  *
358  * This method will take care of sending mandatory events before pushing
359  * the provided buffer.
360  */
361 GstFlowReturn
362 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
363 {
364   _push_mandatory_events (self);
365
366   if (!g_atomic_int_get (&self->priv->flush_seeking) &&
367       gst_pad_is_active (self->srcpad)) {
368     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
369     return gst_pad_push (self->srcpad, buffer);
370   } else {
371     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
372         g_atomic_int_get (&self->priv->flush_seeking),
373         gst_pad_is_active (self->srcpad));
374     gst_buffer_unref (buffer);
375     return GST_FLOW_OK;
376   }
377 }
378
379 static void
380 _push_eos (GstAggregator * self)
381 {
382   GstEvent *event;
383   _push_mandatory_events (self);
384
385   self->priv->send_eos = FALSE;
386   event = gst_event_new_eos ();
387   gst_event_set_seqnum (event, self->priv->seqnum);
388   gst_pad_push_event (self->srcpad, event);
389 }
390
391
392 static void
393 _destroy_gsource (GSource * source)
394 {
395   g_source_destroy (source);
396   g_source_unref (source);
397 }
398
399 static void
400 _remove_all_sources (GstAggregator * self)
401 {
402   GstAggregatorPrivate *priv = self->priv;
403
404   MAIN_CONTEXT_LOCK (self);
405   g_list_free_full (priv->gsources, (GDestroyNotify) _destroy_gsource);
406   priv->gsources = NULL;
407   MAIN_CONTEXT_UNLOCK (self);
408 }
409
410 static gboolean
411 aggregate_func (GstAggregator * self)
412 {
413   GstAggregatorPrivate *priv = self->priv;
414   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
415
416   GST_LOG_OBJECT (self, "Checking aggregate");
417   while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
418           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
419           NULL) && priv->running) {
420     GST_TRACE_OBJECT (self, "Actually aggregating!");
421
422     priv->flow_return = klass->aggregate (self);
423
424     if (priv->flow_return == GST_FLOW_EOS) {
425       g_main_context_wakeup (self->priv->mcontext);
426       _remove_all_sources (self);
427       _push_eos (self);
428     }
429
430     if (priv->flow_return == GST_FLOW_FLUSHING &&
431         g_atomic_int_get (&priv->flush_seeking))
432       priv->flow_return = GST_FLOW_OK;
433
434     GST_LOG_OBJECT (self, "flow return is %s",
435         gst_flow_get_name (priv->flow_return));
436
437     if (priv->flow_return != GST_FLOW_OK)
438       break;
439   }
440
441   return G_SOURCE_REMOVE;
442 }
443
444 static void
445 iterate_main_context_func (GstAggregator * self)
446 {
447   if (self->priv->running == FALSE) {
448     GST_DEBUG_OBJECT (self, "Not running anymore");
449
450     return;
451   }
452
453   g_main_context_iteration (self->priv->mcontext, TRUE);
454 }
455
456 static gboolean
457 _start (GstAggregator * self)
458 {
459   self->priv->running = TRUE;
460   self->priv->send_stream_start = TRUE;
461   self->priv->send_segment = TRUE;
462   self->priv->send_eos = TRUE;
463   self->priv->srccaps = NULL;
464   self->priv->flow_return = GST_FLOW_OK;
465
466   return TRUE;
467 }
468
469 static gboolean
470 _check_pending_flush_stop (GstAggregatorPad * pad)
471 {
472   return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
473 }
474
475 static gboolean
476 _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
477 {
478   gboolean res = TRUE;
479
480   GST_INFO_OBJECT (self, "%s srcpad task",
481       flush_start ? "Pausing" : "Stopping");
482
483   self->priv->running = FALSE;
484
485   /*  Clean the stack of GSource set on the MainContext */
486   g_main_context_wakeup (self->priv->mcontext);
487   _remove_all_sources (self);
488   if (flush_start) {
489     res = gst_pad_push_event (self->srcpad, flush_start);
490   }
491
492   gst_pad_stop_task (self->srcpad);
493
494   return res;
495 }
496
497 static void
498 _start_srcpad_task (GstAggregator * self)
499 {
500   GST_INFO_OBJECT (self, "Starting srcpad task");
501
502   self->priv->running = TRUE;
503   gst_pad_start_task (GST_PAD (self->srcpad),
504       (GstTaskFunction) iterate_main_context_func, self, NULL);
505 }
506
507 static inline void
508 _add_aggregate_gsource (GstAggregator * self)
509 {
510   GSource *source;
511   GstAggregatorPrivate *priv = self->priv;
512
513   MAIN_CONTEXT_LOCK (self);
514   source = g_idle_source_new ();
515   g_source_set_callback (source, (GSourceFunc) aggregate_func, self, NULL);
516   priv->gsources = g_list_prepend (priv->gsources, source);
517   g_source_attach (source, priv->mcontext);
518   MAIN_CONTEXT_UNLOCK (self);
519 }
520
521 static GstFlowReturn
522 _flush (GstAggregator * self)
523 {
524   GstFlowReturn ret = GST_FLOW_OK;
525   GstAggregatorPrivate *priv = self->priv;
526   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
527
528   GST_DEBUG_OBJECT (self, "Flushing everything");
529   g_atomic_int_set (&priv->send_segment, TRUE);
530   g_atomic_int_set (&priv->flush_seeking, FALSE);
531   g_atomic_int_set (&priv->tags_changed, FALSE);
532   if (klass->flush)
533     ret = klass->flush (self);
534
535   return ret;
536 }
537
538 static gboolean
539 _all_flush_stop_received (GstAggregator * self)
540 {
541   GList *tmp;
542   GstAggregatorPad *tmppad;
543
544   GST_OBJECT_LOCK (self);
545   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
546     tmppad = (GstAggregatorPad *) tmp->data;
547
548     if (_check_pending_flush_stop (tmppad) == FALSE) {
549       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
550           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
551       GST_OBJECT_UNLOCK (self);
552       return FALSE;
553     }
554   }
555   GST_OBJECT_UNLOCK (self);
556
557   return TRUE;
558 }
559
560 /* GstAggregator vmethods default implementations */
561 static gboolean
562 _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
563 {
564   gboolean res = TRUE;
565   GstPad *pad = GST_PAD (aggpad);
566   GstAggregatorPrivate *priv = self->priv;
567   GstAggregatorPadPrivate *padpriv = aggpad->priv;
568
569   switch (GST_EVENT_TYPE (event)) {
570     case GST_EVENT_FLUSH_START:
571     {
572       GstBuffer *tmpbuf;
573
574       g_atomic_int_set (&aggpad->priv->flushing, TRUE);
575       /*  Remove pad buffer and wake up the streaming thread */
576       tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
577       gst_buffer_replace (&tmpbuf, NULL);
578       if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
579               TRUE, FALSE) == TRUE) {
580         GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
581         g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
582       }
583
584       if (g_atomic_int_get (&priv->flush_seeking)) {
585         /* If flush_seeking we forward the first FLUSH_START */
586         if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
587                 TRUE, FALSE) == TRUE) {
588
589           GST_DEBUG_OBJECT (self, "Flushing, pausing srcpad task");
590           priv->flow_return = GST_FLOW_OK;
591           _stop_srcpad_task (self, event);
592
593           GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
594           GST_PAD_STREAM_LOCK (self->srcpad);
595           GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
596           event = NULL;
597           goto eat;
598         }
599       }
600
601       /* We forward only in one case: right after flush_seeking */
602       goto eat;
603     }
604     case GST_EVENT_FLUSH_STOP:
605     {
606       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
607
608       _aggpad_flush (aggpad, self);
609       if (g_atomic_int_get (&priv->flush_seeking)) {
610         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
611
612         if (g_atomic_int_get (&priv->flush_seeking)) {
613           if (_all_flush_stop_received (self)) {
614             /* That means we received FLUSH_STOP/FLUSH_STOP on
615              * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
616             _flush (self);
617             gst_pad_push_event (self->srcpad, event);
618             priv->send_eos = TRUE;
619             event = NULL;
620             _add_aggregate_gsource (self);
621
622             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
623             GST_PAD_STREAM_UNLOCK (self->srcpad);
624             _start_srcpad_task (self);
625           }
626         }
627       }
628
629       /* We never forward the event */
630       goto eat;
631     }
632     case GST_EVENT_EOS:
633     {
634       GST_DEBUG_OBJECT (aggpad, "EOS");
635
636       /* We still have a buffer, and we don't want the subclass to have to
637        * check for it. Mark pending_eos, eos will be set when steal_buffer is
638        * called
639        */
640       PAD_LOCK_EVENT (aggpad);
641       if (!aggpad->buffer) {
642         aggpad->eos = TRUE;
643       } else {
644         aggpad->priv->pending_eos = TRUE;
645       }
646       PAD_UNLOCK_EVENT (aggpad);
647
648       _add_aggregate_gsource (self);
649       goto eat;
650     }
651     case GST_EVENT_SEGMENT:
652     {
653       PAD_LOCK_EVENT (aggpad);
654       gst_event_copy_segment (event, &aggpad->segment);
655       PAD_UNLOCK_EVENT (aggpad);
656       goto eat;
657     }
658     case GST_EVENT_STREAM_START:
659     {
660       goto eat;
661     }
662     case GST_EVENT_TAG:
663     {
664       GstTagList *tags;
665
666       gst_event_parse_tag (event, &tags);
667
668       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
669         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
670         gst_event_unref (event);
671         event = NULL;
672         goto eat;
673       }
674       break;
675     }
676     default:
677     {
678       break;
679     }
680   }
681
682   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
683   return gst_pad_event_default (pad, GST_OBJECT (self), event);
684
685 eat:
686   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
687   if (event)
688     gst_event_unref (event);
689
690   return res;
691 }
692
693 static gboolean
694 _flush_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
695 {
696   _aggpad_flush (pad, self);
697
698   return TRUE;
699 }
700
701 static gboolean
702 _stop (GstAggregator * agg)
703 {
704   _reset_flow_values (agg);
705
706   gst_aggregator_iterate_sinkpads (agg,
707       (GstAggregatorPadForeachFunc) _flush_pad, NULL);
708
709   return TRUE;
710 }
711
712 /* GstElement vmethods implementations */
713 static GstStateChangeReturn
714 _change_state (GstElement * element, GstStateChange transition)
715 {
716   GstStateChangeReturn ret;
717   GstAggregator *self = GST_AGGREGATOR (element);
718   GstAggregatorClass *agg_class = GST_AGGREGATOR_GET_CLASS (self);
719
720
721   switch (transition) {
722     case GST_STATE_CHANGE_READY_TO_PAUSED:
723       agg_class->start (self);
724       break;
725     default:
726       break;
727   }
728
729   if ((ret =
730           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
731               transition)) == GST_STATE_CHANGE_FAILURE)
732     goto failure;
733
734
735   switch (transition) {
736     case GST_STATE_CHANGE_PAUSED_TO_READY:
737       agg_class->stop (self);
738       break;
739     default:
740       break;
741   }
742
743   return ret;
744
745 failure:
746   {
747     GST_ERROR_OBJECT (element, "parent failed state change");
748     return ret;
749   }
750 }
751
752 static void
753 _release_pad (GstElement * element, GstPad * pad)
754 {
755   GstBuffer *tmpbuf;
756
757   GstAggregator *self = GST_AGGREGATOR (element);
758   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
759
760   GST_INFO_OBJECT (pad, "Removing pad");
761
762   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
763   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
764   gst_buffer_replace (&tmpbuf, NULL);
765   gst_element_remove_pad (element, pad);
766
767   /* Something changed make sure we try to aggregate */
768   _add_aggregate_gsource (self);
769 }
770
771 static GstPad *
772 _request_new_pad (GstElement * element,
773     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
774 {
775   GstAggregator *self;
776   GstAggregatorPad *agg_pad;
777
778   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
779   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
780
781   self = GST_AGGREGATOR (element);
782
783   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
784     gint serial = 0;
785     gchar *name = NULL;
786
787     GST_OBJECT_LOCK (element);
788     if (req_name == NULL || strlen (req_name) < 6
789         || !g_str_has_prefix (req_name, "sink_")) {
790       /* no name given when requesting the pad, use next available int */
791       priv->padcount++;
792     } else {
793       /* parse serial number from requested padname */
794       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
795       if (serial >= priv->padcount)
796         priv->padcount = serial;
797     }
798
799     name = g_strdup_printf ("sink_%u", priv->padcount);
800     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
801         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
802     g_free (name);
803     GST_OBJECT_UNLOCK (element);
804
805   } else {
806     return NULL;
807   }
808
809   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
810
811   if (priv->running)
812     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
813
814   /* add the pad to the element */
815   gst_element_add_pad (element, GST_PAD (agg_pad));
816
817   return GST_PAD (agg_pad);
818 }
819
820 static gboolean
821 _src_query (GstAggregator * self, GstQuery * query)
822 {
823   gboolean res = TRUE;
824
825   switch (GST_QUERY_TYPE (query)) {
826     case GST_QUERY_SEEKING:
827     {
828       GstFormat format;
829
830       /* don't pass it along as some (file)sink might claim it does
831        * whereas with a collectpads in between that will not likely work */
832       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
833       gst_query_set_seeking (query, format, FALSE, 0, -1);
834       res = TRUE;
835
836       goto discard;
837     }
838     default:
839       break;
840   }
841
842   return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
843
844 discard:
845   return res;
846 }
847
848 static gboolean
849 event_forward_func (GstPad * pad, EventData * evdata)
850 {
851   gboolean ret = TRUE;
852   GstPad *peer = gst_pad_get_peer (pad);
853   GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv;
854
855   if (peer) {
856     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
857     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
858     gst_object_unref (peer);
859   }
860
861   evdata->result &= ret;
862
863   if (ret == FALSE) {
864     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
865       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
866     else
867       GST_INFO_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
868
869     if (evdata->flush) {
870       padpriv->pending_flush_start = FALSE;
871       padpriv->pending_flush_stop = FALSE;
872     }
873   }
874
875   /* Always send to all pads */
876   return FALSE;
877 }
878
879 static gboolean
880 _set_flush_pending (GstAggregator * self, GstAggregatorPad * pad,
881     gpointer udata)
882 {
883   pad->priv->pending_flush_start = TRUE;
884   pad->priv->pending_flush_stop = FALSE;
885
886   return TRUE;
887 }
888
889 static gboolean
890 _forward_event_to_all_sinkpads (GstAggregator * self, GstEvent * event,
891     gboolean flush)
892 {
893   EventData evdata;
894
895   evdata.event = event;
896   evdata.result = TRUE;
897   evdata.flush = flush;
898
899   /* We first need to set all pads as flushing in a first pass
900    * as flush_start flush_stop is sometimes sent synchronously
901    * while we send the seek event */
902   if (flush)
903     gst_aggregator_iterate_sinkpads (self,
904         (GstAggregatorPadForeachFunc) _set_flush_pending, NULL);
905   gst_pad_forward (self->srcpad, (GstPadForwardFunction) event_forward_func,
906       &evdata);
907
908   gst_event_unref (event);
909
910   return evdata.result;
911 }
912
913 static gboolean
914 _do_seek (GstAggregator * self, GstEvent * event)
915 {
916   gdouble rate;
917   GstFormat fmt;
918   GstSeekFlags flags;
919   GstSeekType start_type, stop_type;
920   gint64 start, stop;
921   gboolean flush;
922   gboolean res;
923   GstAggregatorPrivate *priv = self->priv;
924
925   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
926       &start, &stop_type, &stop);
927
928   GST_INFO_OBJECT (self, "starting SEEK");
929
930   flush = flags & GST_SEEK_FLAG_FLUSH;
931
932   if (flush) {
933     g_atomic_int_set (&priv->pending_flush_start, TRUE);
934     g_atomic_int_set (&priv->flush_seeking, TRUE);
935   }
936
937   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
938       stop_type, stop, NULL);
939
940   /* forward the seek upstream */
941   res = _forward_event_to_all_sinkpads (self, event, flush);
942   event = NULL;
943
944   if (!res) {
945     g_atomic_int_set (&priv->flush_seeking, FALSE);
946     g_atomic_int_set (&priv->pending_flush_start, FALSE);
947   }
948
949   GST_INFO_OBJECT (self, "seek done, result: %d", res);
950
951   return res;
952 }
953
954 static gboolean
955 _src_event (GstAggregator * self, GstEvent * event)
956 {
957   gboolean res = TRUE;
958
959   switch (GST_EVENT_TYPE (event)) {
960     case GST_EVENT_SEEK:
961     {
962       gst_event_ref (event);
963       res = _do_seek (self, event);
964       if (res)
965         self->priv->seqnum = gst_event_get_seqnum (event);
966       gst_event_unref (event);
967       event = NULL;
968       goto done;
969     }
970     case GST_EVENT_NAVIGATION:
971     {
972       /* navigation is rather pointless. */
973       res = FALSE;
974       gst_event_unref (event);
975       goto done;
976     }
977     default:
978     {
979       break;
980     }
981   }
982
983   return _forward_event_to_all_sinkpads (self, event, FALSE);
984
985 done:
986   return res;
987 }
988
989 static gboolean
990 src_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
991 {
992   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
993
994   return klass->src_event (GST_AGGREGATOR (parent), event);
995 }
996
997 static gboolean
998 src_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
999 {
1000   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1001
1002   return klass->src_query (GST_AGGREGATOR (parent), query);
1003 }
1004
1005 static gboolean
1006 src_activate_mode (GstPad * pad,
1007     GstObject * parent, GstPadMode mode, gboolean active)
1008 {
1009   GstAggregator *self = GST_AGGREGATOR (parent);
1010   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1011
1012   if (klass->src_activate) {
1013     if (klass->src_activate (self, mode, active) == FALSE) {
1014       return FALSE;
1015     }
1016   }
1017
1018   if (active == TRUE) {
1019     switch (mode) {
1020       case GST_PAD_MODE_PUSH:
1021       {
1022         GST_INFO_OBJECT (pad, "Activating pad!");
1023         _start_srcpad_task (self);
1024         return TRUE;
1025       }
1026       default:
1027       {
1028         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1029         return FALSE;
1030       }
1031     }
1032   }
1033
1034   /* deactivating */
1035   GST_INFO_OBJECT (self, "Deactivating srcpad");
1036   _stop_srcpad_task (self, FALSE);
1037
1038   return TRUE;
1039 }
1040
1041 static gboolean
1042 _sink_query (GstAggregator * self, GstAggregatorPad * aggpad, GstQuery * query)
1043 {
1044   GstPad *pad = GST_PAD (aggpad);
1045
1046   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1047 }
1048
1049 static void
1050 gst_aggregator_finalize (GObject * object)
1051 {
1052   GstAggregator *self = (GstAggregator *) object;
1053
1054   g_mutex_clear (&self->priv->mcontext_lock);
1055
1056   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1057 }
1058
1059 static void
1060 gst_aggregator_dispose (GObject * object)
1061 {
1062   GstAggregator *self = (GstAggregator *) object;
1063
1064   G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
1065
1066   g_main_context_unref (self->priv->mcontext);
1067   _remove_all_sources (self);
1068 }
1069
1070 /* GObject vmethods implementations */
1071 static void
1072 gst_aggregator_class_init (GstAggregatorClass * klass)
1073 {
1074   GObjectClass *gobject_class = (GObjectClass *) klass;
1075   GstElementClass *gstelement_class = (GstElementClass *) klass;
1076
1077   aggregator_parent_class = g_type_class_peek_parent (klass);
1078   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1079
1080   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1081       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1082
1083   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1084   klass->start = _start;
1085   klass->stop = _stop;
1086
1087   klass->sink_event = _sink_event;
1088   klass->sink_query = _sink_query;
1089
1090   klass->src_event = _src_event;
1091   klass->src_query = _src_query;
1092
1093   gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (_request_new_pad);
1094   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad);
1095   gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state);
1096
1097   gobject_class->finalize = gst_aggregator_finalize;
1098   gobject_class->dispose = gst_aggregator_dispose;
1099 }
1100
1101 static void
1102 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1103 {
1104   GstPadTemplate *pad_template;
1105   GstAggregatorPrivate *priv;
1106
1107   g_return_if_fail (klass->aggregate != NULL);
1108
1109   self->priv =
1110       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1111       GstAggregatorPrivate);
1112
1113   priv = self->priv;
1114
1115   pad_template =
1116       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1117   g_return_if_fail (pad_template != NULL);
1118
1119   priv->padcount = -1;
1120   priv->tags_changed = FALSE;
1121   _reset_flow_values (self);
1122
1123   priv->mcontext = g_main_context_new ();
1124   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1125
1126   gst_pad_set_event_function (self->srcpad,
1127       GST_DEBUG_FUNCPTR ((GstPadEventFunction) src_event_func));
1128   gst_pad_set_query_function (self->srcpad,
1129       GST_DEBUG_FUNCPTR ((GstPadQueryFunction) src_query_func));
1130   gst_pad_set_activatemode_function (self->srcpad,
1131       GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) src_activate_mode));
1132
1133   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1134
1135   g_mutex_init (&self->priv->mcontext_lock);
1136 }
1137
1138 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1139  * method to get to the padtemplates */
1140 GType
1141 gst_aggregator_get_type (void)
1142 {
1143   static volatile gsize type = 0;
1144
1145   if (g_once_init_enter (&type)) {
1146     GType _type;
1147     static const GTypeInfo info = {
1148       sizeof (GstAggregatorClass),
1149       NULL,
1150       NULL,
1151       (GClassInitFunc) gst_aggregator_class_init,
1152       NULL,
1153       NULL,
1154       sizeof (GstAggregator),
1155       0,
1156       (GInstanceInitFunc) gst_aggregator_init,
1157     };
1158
1159     _type = g_type_register_static (GST_TYPE_ELEMENT,
1160         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1161     g_once_init_leave (&type, _type);
1162   }
1163   return type;
1164 }
1165
1166 static GstFlowReturn
1167 _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1168 {
1169   GstBuffer *actual_buf = buffer;
1170   GstAggregator *self = GST_AGGREGATOR (object);
1171   GstAggregatorPrivate *priv = self->priv;
1172   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1173   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1174
1175   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1176
1177   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1178     goto flushing;
1179
1180   if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
1181     goto eos;
1182
1183   PAD_LOCK_EVENT (aggpad);
1184   if (aggpad->buffer) {
1185     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1186     PAD_WAIT_EVENT (aggpad);
1187   }
1188   PAD_UNLOCK_EVENT (aggpad);
1189
1190   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1191     goto flushing;
1192
1193
1194   if (aggclass->clip) {
1195     aggclass->clip (self, aggpad, buffer, &actual_buf);
1196   }
1197
1198   PAD_LOCK_EVENT (aggpad);
1199   if (aggpad->buffer)
1200     gst_buffer_unref (aggpad->buffer);
1201   aggpad->buffer = actual_buf;
1202   PAD_UNLOCK_EVENT (aggpad);
1203
1204   _add_aggregate_gsource (self);
1205
1206   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1207
1208   return priv->flow_return;
1209
1210 flushing:
1211
1212   gst_buffer_unref (buffer);
1213   GST_DEBUG_OBJECT (aggpad, "We are flushing");
1214
1215   return GST_FLOW_FLUSHING;
1216
1217 eos:
1218
1219   gst_buffer_unref (buffer);
1220   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1221
1222   return GST_FLOW_EOS;
1223 }
1224
1225 static gboolean
1226 pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
1227 {
1228   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1229
1230   return klass->sink_query (GST_AGGREGATOR (parent),
1231       GST_AGGREGATOR_PAD (pad), query);
1232 }
1233
1234 static gboolean
1235 pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
1236 {
1237   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1238
1239   return klass->sink_event (GST_AGGREGATOR (parent),
1240       GST_AGGREGATOR_PAD (pad), event);
1241 }
1242
1243 static gboolean
1244 pad_activate_mode_func (GstPad * pad,
1245     GstObject * parent, GstPadMode mode, gboolean active)
1246 {
1247   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1248
1249   if (active == FALSE) {
1250     PAD_LOCK_EVENT (aggpad);
1251     g_atomic_int_set (&aggpad->priv->flushing, TRUE);
1252     gst_buffer_replace (&aggpad->buffer, NULL);
1253     PAD_BROADCAST_EVENT (aggpad);
1254     PAD_UNLOCK_EVENT (aggpad);
1255   } else {
1256     g_atomic_int_set (&aggpad->priv->flushing, FALSE);
1257     PAD_LOCK_EVENT (aggpad);
1258     PAD_BROADCAST_EVENT (aggpad);
1259     PAD_UNLOCK_EVENT (aggpad);
1260   }
1261
1262   return TRUE;
1263 }
1264
1265 /***********************************
1266  * GstAggregatorPad implementation  *
1267  ************************************/
1268 static GstPadClass *aggregator_pad_parent_class = NULL;
1269 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1270
1271 static void
1272 _pad_constructed (GObject * object)
1273 {
1274   GstPad *pad = GST_PAD (object);
1275
1276   gst_pad_set_chain_function (pad,
1277       GST_DEBUG_FUNCPTR ((GstPadChainFunction) _chain));
1278   gst_pad_set_event_function (pad,
1279       GST_DEBUG_FUNCPTR ((GstPadEventFunction) pad_event_func));
1280   gst_pad_set_query_function (pad,
1281       GST_DEBUG_FUNCPTR ((GstPadQueryFunction) pad_query_func));
1282   gst_pad_set_activatemode_function (pad,
1283       GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) pad_activate_mode_func));
1284 }
1285
1286 static void
1287 gst_aggregator_pad_finalize (GObject * object)
1288 {
1289   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1290
1291   g_mutex_clear (&pad->priv->event_lock);
1292   g_cond_clear (&pad->priv->event_cond);
1293
1294   G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object);
1295 }
1296
1297 static void
1298 gst_aggregator_pad_dispose (GObject * object)
1299 {
1300   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1301   GstBuffer *buf;
1302
1303   buf = gst_aggregator_pad_steal_buffer (pad);
1304   if (buf)
1305     gst_buffer_unref (buf);
1306
1307   G_OBJECT_CLASS (aggregator_pad_parent_class)->dispose (object);
1308 }
1309
1310 static void
1311 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
1312 {
1313   GObjectClass *gobject_class = (GObjectClass *) klass;
1314
1315   aggregator_pad_parent_class = g_type_class_peek_parent (klass);
1316   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
1317
1318   gobject_class->constructed = GST_DEBUG_FUNCPTR (_pad_constructed);
1319   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_aggregator_pad_finalize);
1320   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_aggregator_pad_dispose);
1321 }
1322
1323 static void
1324 gst_aggregator_pad_init (GstAggregatorPad * pad)
1325 {
1326   pad->priv =
1327       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
1328       GstAggregatorPadPrivate);
1329
1330   pad->buffer = NULL;
1331   g_mutex_init (&pad->priv->event_lock);
1332   g_cond_init (&pad->priv->event_cond);
1333
1334 }
1335
1336 /**
1337  * gst_aggregator_pad_steal_buffer:
1338  * @pad: the pad to get buffer from
1339  *
1340  * Steal the ref to the buffer currently queued in @pad.
1341  *
1342  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
1343  *   queued. You should unref the buffer after usage.
1344  */
1345 GstBuffer *
1346 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
1347 {
1348   GstBuffer *buffer = NULL;
1349
1350   PAD_LOCK_EVENT (pad);
1351   if (pad->buffer) {
1352     GST_TRACE_OBJECT (pad, "Consuming buffer");
1353     buffer = pad->buffer;
1354     pad->buffer = NULL;
1355     if (pad->priv->pending_eos) {
1356       pad->priv->pending_eos = FALSE;
1357       pad->eos = TRUE;
1358     }
1359     PAD_BROADCAST_EVENT (pad);
1360     GST_DEBUG_OBJECT (pad, "Consummed: %" GST_PTR_FORMAT, buffer);
1361   }
1362   PAD_UNLOCK_EVENT (pad);
1363
1364   return buffer;
1365 }
1366
1367 /**
1368  * gst_aggregator_pad_get_buffer:
1369  * @pad: the pad to get buffer from
1370  *
1371  * Returns: (transfer full): A reference to the buffer in @pad or
1372  * NULL if no buffer was queued. You should unref the buffer after
1373  * usage.
1374  */
1375 GstBuffer *
1376 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
1377 {
1378   GstBuffer *buffer = NULL;
1379
1380   PAD_LOCK_EVENT (pad);
1381   if (pad->buffer)
1382     buffer = gst_buffer_ref (pad->buffer);
1383   PAD_UNLOCK_EVENT (pad);
1384
1385   return buffer;
1386 }
1387
1388 /**
1389  * gst_aggregator_merge_tags:
1390  * @self: a #GstAggregator
1391  * @tags: a #GstTagList to merge
1392  * @mode: the #GstTagMergeMode to use
1393  *
1394  * Adds tags to so-called pending tags, which will be processed
1395  * before pushing out data downstream.
1396  *
1397  * Note that this is provided for convenience, and the subclass is
1398  * not required to use this and can still do tag handling on its own.
1399  *
1400  * MT safe.
1401  */
1402 void
1403 gst_aggregator_merge_tags (GstAggregator * self,
1404     const GstTagList * tags, GstTagMergeMode mode)
1405 {
1406   GstTagList *otags;
1407
1408   g_return_if_fail (GST_IS_AGGREGATOR (self));
1409   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
1410
1411   /* FIXME Check if we can use OBJECT lock here! */
1412   GST_OBJECT_LOCK (self);
1413   if (tags)
1414     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
1415   otags = self->priv->tags;
1416   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
1417   if (otags)
1418     gst_tag_list_unref (otags);
1419   self->priv->tags_changed = TRUE;
1420   GST_OBJECT_UNLOCK (self);
1421 }