73a655f2acba5bbea1eaff7e06ee4abbc269c782
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72 static void gst_aggregator_set_latency_property (GstAggregator * agg,
73     gint64 latency);
74 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
75
76
77 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
78 #define GST_CAT_DEFAULT aggregator_debug
79
80 /* GstAggregatorPad definitions */
81 #define PAD_LOCK(pad)   G_STMT_START {                            \
82   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",            \
83         g_thread_self());                                               \
84   GST_OBJECT_LOCK (pad);                                                \
85   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",              \
86         g_thread_self());                                               \
87   } G_STMT_END
88
89 #define PAD_UNLOCK(pad)  G_STMT_START {                           \
90   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",         \
91         g_thread_self());                                               \
92   GST_OBJECT_UNLOCK (pad);                                              \
93   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",           \
94         g_thread_self());                                               \
95   } G_STMT_END
96
97
98 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
99   GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p",                \
100         g_thread_self());                                               \
101   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
102       GST_OBJECT_GET_LOCK (pad));                                       \
103   GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p",           \
104         g_thread_self());                                               \
105   } G_STMT_END
106
107 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
108   GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p",               \
109         g_thread_self());                                              \
110   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
111   } G_STMT_END
112
113
114 #define PAD_STREAM_LOCK(pad)   G_STMT_START {                           \
115   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
116         g_thread_self());                                               \
117   g_mutex_lock(&pad->priv->stream_lock);                                \
118   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
119         g_thread_self());                                               \
120   } G_STMT_END
121
122 #define PAD_STREAM_UNLOCK(pad)  G_STMT_START {                          \
123   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
124         g_thread_self());                                               \
125   g_mutex_unlock(&pad->priv->stream_lock);                              \
126   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
127         g_thread_self());                                               \
128   } G_STMT_END
129
130 #define SRC_STREAM_LOCK(self)   G_STMT_START {                             \
131   GST_TRACE_OBJECT (self, "Taking src STREAM lock from thread %p",         \
132         g_thread_self());                                                  \
133   g_mutex_lock(&self->priv->src_lock);                                     \
134   GST_TRACE_OBJECT (self, "Took src STREAM lock from thread %p",           \
135         g_thread_self());                                                  \
136   } G_STMT_END
137
138 #define SRC_STREAM_UNLOCK(self)  G_STMT_START {                            \
139   GST_TRACE_OBJECT (self, "Releasing src STREAM lock from thread %p",      \
140         g_thread_self());                                                  \
141   g_mutex_unlock(&self->priv->src_lock);                                   \
142   GST_TRACE_OBJECT (self, "Released src STREAM lock from thread %p",       \
143         g_thread_self());                                                  \
144   } G_STMT_END
145
146 #define SRC_STREAM_WAIT(self) G_STMT_START {                               \
147   GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p",             \
148         g_thread_self());                                                  \
149   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));           \
150   GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p",        \
151         g_thread_self());                                                  \
152   } G_STMT_END
153
154 #define SRC_STREAM_BROADCAST(self) G_STMT_START {                 \
155     GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",           \
156         g_thread_self());                                                  \
157     if (self->priv->aggregate_id)                                          \
158       gst_clock_id_unschedule (self->priv->aggregate_id);                  \
159     g_cond_broadcast(&(self->priv->src_cond));                             \
160   } G_STMT_END
161
162 struct _GstAggregatorPadPrivate
163 {
164   gboolean pending_flush_start;
165   gboolean pending_flush_stop;
166   gboolean pending_eos;
167   gboolean flushing;
168
169   GCond event_cond;
170
171   GMutex stream_lock;
172 };
173
174 static gboolean
175 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
176 {
177   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
178
179   PAD_LOCK (aggpad);
180   aggpad->eos = FALSE;
181   aggpad->priv->flushing = FALSE;
182   PAD_UNLOCK (aggpad);
183
184   if (klass->flush)
185     return klass->flush (aggpad, agg);
186
187   return TRUE;
188 }
189
190 /*************************************
191  * GstAggregator implementation  *
192  *************************************/
193 static GstElementClass *aggregator_parent_class = NULL;
194
195 struct _GstAggregatorPrivate
196 {
197   gint padcount;
198
199   /* Our state is >= PAUSED */
200   gboolean running;             /* protected by SRC_STREAM_LOCK */
201
202   gint seqnum;
203   gboolean send_stream_start;
204   gboolean send_segment;
205   gboolean flush_seeking;
206   gboolean pending_flush_start;
207   gboolean send_eos;
208   GstFlowReturn flow_return;
209
210   gboolean send_stream_start;   /* protected by srcpad stream lock */
211   gboolean send_segment;        /* protected by object lock */
212   gboolean flush_seeking;       /* protected by object lock */
213   gboolean pending_flush_start; /* protected by object lock */
214   gboolean send_eos;            /* protected by srcpad stream lock */
215   GstFlowReturn flow_return;    /* protected by object lock */
216
217   GstCaps *srccaps;             /* protected by the srcpad stream lock */
218
219   /* protected by object lock */
220   GstTagList *tags;
221   gboolean tags_changed;
222
223   gboolean latency_live;
224   GstClockTime latency_min;
225   GstClockTime latency_max;
226
227   GstClockTime sub_latency_min;
228   GstClockTime sub_latency_max;
229
230   /* aggregate */
231   GstClockID aggregate_id;
232   GMutex src_lock;
233   GCond src_cond;
234
235   /* properties */
236   gint64 latency;
237 };
238
239 typedef struct
240 {
241   GstEvent *event;
242   gboolean result;
243   gboolean flush;
244
245   gboolean one_actually_seeked;
246 } EventData;
247
248 #define DEFAULT_LATENCY        0
249
250 enum
251 {
252   PROP_0,
253   PROP_LATENCY,
254   PROP_LAST
255 };
256
257 /**
258  * gst_aggregator_iterate_sinkpads:
259  * @self: The #GstAggregator
260  * @func: (scope call): The function to call.
261  * @user_data: (closure): The data to pass to @func.
262  *
263  * Iterate the sinkpads of aggregator to call a function on them.
264  *
265  * This method guarantees that @func will be called only once for each
266  * sink pad.
267  */
268 gboolean
269 gst_aggregator_iterate_sinkpads (GstAggregator * self,
270     GstAggregatorPadForeachFunc func, gpointer user_data)
271 {
272   gboolean result = FALSE;
273   GstIterator *iter;
274   gboolean done = FALSE;
275   GValue item = { 0, };
276   GList *seen_pads = NULL;
277
278   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
279
280   if (!iter)
281     goto no_iter;
282
283   while (!done) {
284     switch (gst_iterator_next (iter, &item)) {
285       case GST_ITERATOR_OK:
286       {
287         GstAggregatorPad *pad;
288
289         pad = g_value_get_object (&item);
290
291         /* if already pushed, skip. FIXME, find something faster to tag pads */
292         if (pad == NULL || g_list_find (seen_pads, pad)) {
293           g_value_reset (&item);
294           break;
295         }
296
297         GST_LOG_OBJECT (pad, "calling function %s on pad",
298             GST_DEBUG_FUNCPTR_NAME (func));
299
300         result = func (self, pad, user_data);
301
302         done = !result;
303
304         seen_pads = g_list_prepend (seen_pads, pad);
305
306         g_value_reset (&item);
307         break;
308       }
309       case GST_ITERATOR_RESYNC:
310         gst_iterator_resync (iter);
311         break;
312       case GST_ITERATOR_ERROR:
313         GST_ERROR_OBJECT (self,
314             "Could not iterate over internally linked pads");
315         done = TRUE;
316         break;
317       case GST_ITERATOR_DONE:
318         done = TRUE;
319         break;
320     }
321   }
322   g_value_unset (&item);
323   gst_iterator_free (iter);
324
325   if (seen_pads == NULL) {
326     GST_DEBUG_OBJECT (self, "No pad seen");
327     return FALSE;
328   }
329
330   g_list_free (seen_pads);
331
332 no_iter:
333   return result;
334 }
335
336 static gboolean
337 gst_aggregator_check_pads_ready (GstAggregator * self)
338 {
339   GstAggregatorPad *pad;
340   GList *l, *sinkpads;
341
342   GST_LOG_OBJECT (self, "checking pads");
343
344   GST_OBJECT_LOCK (self);
345
346   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
347   if (sinkpads == NULL)
348     goto no_sinkpads;
349
350   for (l = sinkpads; l != NULL; l = l->next) {
351     pad = l->data;
352
353     PAD_LOCK (pad);
354     if (pad->buffer == NULL && !pad->eos) {
355       GST_OBJECT_UNLOCK (pad);
356       goto pad_not_ready;
357     }
358     PAD_UNLOCK (pad);
359
360   }
361
362   GST_OBJECT_UNLOCK (self);
363   GST_LOG_OBJECT (self, "pads are ready");
364   return TRUE;
365
366 no_sinkpads:
367   {
368     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
369     GST_OBJECT_UNLOCK (self);
370     return FALSE;
371   }
372 pad_not_ready:
373   {
374     GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
375     GST_OBJECT_UNLOCK (self);
376     return FALSE;
377   }
378 }
379
380 static void
381 gst_aggregator_reset_flow_values (GstAggregator * self)
382 {
383   GST_OBJECT_LOCK (self);
384   self->priv->flow_return = GST_FLOW_FLUSHING;
385   self->priv->send_stream_start = TRUE;
386   self->priv->send_segment = TRUE;
387   gst_segment_init (&self->segment, GST_FORMAT_TIME);
388   GST_OBJECT_UNLOCK (self);
389 }
390
391 static inline void
392 gst_aggregator_push_mandatory_events (GstAggregator * self)
393 {
394   GstAggregatorPrivate *priv = self->priv;
395   GstEvent *segment = NULL;
396   GstEvent *tags = NULL;
397
398   if (self->priv->send_stream_start) {
399     gchar s_id[32];
400
401     GST_INFO_OBJECT (self, "pushing stream start");
402     /* stream-start (FIXME: create id based on input ids) */
403     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
404     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
405       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
406     }
407     self->priv->send_stream_start = FALSE;
408   }
409
410   if (self->priv->srccaps) {
411
412     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
413         self->priv->srccaps);
414     if (!gst_pad_push_event (self->srcpad,
415             gst_event_new_caps (self->priv->srccaps))) {
416       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
417     }
418     gst_caps_unref (self->priv->srccaps);
419     self->priv->srccaps = NULL;
420   }
421
422   GST_OBJECT_LOCK (self);
423   if (self->priv->send_segment && !self->priv->flush_seeking) {
424     segment = gst_event_new_segment (&self->segment);
425
426     if (!self->priv->seqnum)
427       self->priv->seqnum = gst_event_get_seqnum (segment);
428     else
429       gst_event_set_seqnum (segment, self->priv->seqnum);
430     self->priv->send_segment = FALSE;
431
432     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
433   }
434
435   if (priv->tags && priv->tags_changed) {
436     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
437     priv->tags_changed = FALSE;
438   }
439   GST_OBJECT_UNLOCK (self);
440
441   if (segment)
442     gst_pad_push_event (self->srcpad, segment);
443   if (tags)
444     gst_pad_push_event (self->srcpad, tags);
445
446 }
447
448 /**
449  * gst_aggregator_set_src_caps:
450  * @self: The #GstAggregator
451  * @caps: The #GstCaps to set on the src pad.
452  *
453  * Sets the caps to be used on the src pad.
454  */
455 void
456 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
457 {
458   GST_PAD_STREAM_LOCK (self->srcpad);
459   gst_caps_replace (&self->priv->srccaps, caps);
460   gst_aggregator_push_mandatory_events (self);
461   GST_PAD_STREAM_UNLOCK (self->srcpad);
462 }
463
464 /**
465  * gst_aggregator_finish_buffer:
466  * @self: The #GstAggregator
467  * @buffer: (transfer full): the #GstBuffer to push.
468  *
469  * This method will push the provided output buffer downstream. If needed,
470  * mandatory events such as stream-start, caps, and segment events will be
471  * sent before pushing the buffer.
472  */
473 GstFlowReturn
474 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
475 {
476   gst_aggregator_push_mandatory_events (self);
477
478   GST_OBJECT_LOCK (self);
479   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
480     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
481     GST_OBJECT_UNLOCK (self);
482     return gst_pad_push (self->srcpad, buffer);
483   } else {
484     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
485         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
486     GST_OBJECT_UNLOCK (self);
487     gst_buffer_unref (buffer);
488     return GST_FLOW_OK;
489   }
490 }
491
492 static void
493 gst_aggregator_push_eos (GstAggregator * self)
494 {
495   GstEvent *event;
496   gst_aggregator_push_mandatory_events (self);
497
498   self->priv->send_eos = FALSE;
499   event = gst_event_new_eos ();
500   gst_event_set_seqnum (event, self->priv->seqnum);
501   gst_pad_push_event (self->srcpad, event);
502 }
503
504 static GstClockTime
505 gst_aggregator_get_next_time (GstAggregator * self)
506 {
507   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
508
509   if (klass->get_next_time)
510     return klass->get_next_time (self);
511
512   return GST_CLOCK_TIME_NONE;
513 }
514
515 /* called with the src STREAM lock */
516 static gboolean
517 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
518 {
519   GstClockTime latency_max, latency_min;
520   GstClockTime start;
521   gboolean live, res;
522
523   *timeout = FALSE;
524
525   SRC_STREAM_LOCK (self);
526
527   GST_OBJECT_LOCK (self);
528   gst_aggregator_get_latency_unlocked (self, &live, &latency_min, &latency_max);
529   GST_OBJECT_UNLOCK (self);
530
531   if (gst_aggregator_check_pads_ready (self)) {
532     GST_DEBUG_OBJECT (self, "all pads have data");
533     SRC_STREAM_UNLOCK (self);
534
535     return TRUE;
536   }
537
538   /* Before waiting, check if we're actually still running */
539   if (!self->priv->running || !self->priv->send_eos) {
540     SRC_STREAM_UNLOCK (self);
541
542     return FALSE;
543   }
544
545   start = gst_aggregator_get_next_time (self);
546
547   if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
548       || !GST_CLOCK_TIME_IS_VALID (start)) {
549     /* We wake up here when something happened, and below
550      * then check if we're ready now. If we return FALSE,
551      * we will be directly called again.
552      */
553     SRC_STREAM_WAIT (self);
554   } else {
555     GstClockTime base_time, time;
556     GstClock *clock;
557     GstClockReturn status;
558     GstClockTimeDiff jitter;
559
560     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
561         GST_TIME_ARGS (start));
562
563     GST_OBJECT_LOCK (self);
564     base_time = GST_ELEMENT_CAST (self)->base_time;
565     clock = GST_ELEMENT_CLOCK (self);
566     if (clock)
567       gst_object_ref (clock);
568     GST_OBJECT_UNLOCK (self);
569
570     time = base_time + start;
571
572     if (GST_CLOCK_TIME_IS_VALID (latency_min)) {
573       time += latency_min;
574     } else {
575       time += self->priv->latency;
576     }
577
578     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
579         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
580         " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT
581         " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time),
582         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
583         GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max),
584         GST_TIME_ARGS (latency_min),
585         GST_TIME_ARGS (gst_clock_get_time (clock)));
586
587     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
588     gst_object_unref (clock);
589     SRC_STREAM_UNLOCK (self);
590
591     jitter = 0;
592     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
593
594     SRC_STREAM_LOCK (self);
595     if (self->priv->aggregate_id) {
596       gst_clock_id_unref (self->priv->aggregate_id);
597       self->priv->aggregate_id = NULL;
598     }
599
600     GST_DEBUG_OBJECT (self,
601         "clock returned %d (jitter: %s%" GST_TIME_FORMAT ")",
602         status, (jitter < 0 ? "-" : " "),
603         GST_TIME_ARGS ((jitter < 0 ? -jitter : jitter)));
604
605     /* we timed out */
606     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
607       SRC_STREAM_UNLOCK (self);
608       *timeout = TRUE;
609       return TRUE;
610     }
611   }
612
613   res = gst_aggregator_check_pads_ready (self);
614   SRC_STREAM_UNLOCK (self);
615
616   return res;
617 }
618
619 static void
620 gst_aggregator_aggregate_func (GstAggregator * self)
621 {
622   GstAggregatorPrivate *priv = self->priv;
623   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
624   gboolean timeout = FALSE;
625
626   if (self->priv->running == FALSE) {
627     GST_DEBUG_OBJECT (self, "Not running anymore");
628     return;
629   }
630
631   GST_LOG_OBJECT (self, "Checking aggregate");
632   while (priv->send_eos && priv->running) {
633     GstFlowReturn flow_return;
634
635     if (!gst_aggregator_wait_and_check (self, &timeout))
636       continue;
637
638     GST_TRACE_OBJECT (self, "Actually aggregating!");
639
640     flow_return = klass->aggregate (self, timeout);
641
642     GST_OBJECT_LOCK (self);
643     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking)
644       priv->flow_return = GST_FLOW_OK;
645     else
646       priv->flow_return = flow_return;
647     GST_OBJECT_UNLOCK (self);
648
649     if (flow_return == GST_FLOW_EOS) {
650       gst_aggregator_push_eos (self);
651     }
652
653     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
654
655     if (flow_return != GST_FLOW_OK)
656       break;
657   }
658 }
659
660 static gboolean
661 gst_aggregator_start (GstAggregator * self)
662 {
663   GstAggregatorClass *klass;
664   gboolean result;
665
666   self->priv->running = TRUE;
667   self->priv->send_stream_start = TRUE;
668   self->priv->send_segment = TRUE;
669   self->priv->send_eos = TRUE;
670   self->priv->srccaps = NULL;
671   self->priv->flow_return = GST_FLOW_OK;
672
673   klass = GST_AGGREGATOR_GET_CLASS (self);
674
675   if (klass->start)
676     result = klass->start (self);
677   else
678     result = TRUE;
679
680   return result;
681 }
682
683 static gboolean
684 _check_pending_flush_stop (GstAggregatorPad * pad)
685 {
686   return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
687 }
688
689 static gboolean
690 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
691 {
692   gboolean res = TRUE;
693
694   GST_INFO_OBJECT (self, "%s srcpad task",
695       flush_start ? "Pausing" : "Stopping");
696
697   SRC_STREAM_LOCK (self);
698   self->priv->running = FALSE;
699   SRC_STREAM_BROADCAST (self);
700   SRC_STREAM_UNLOCK (self);
701
702   if (flush_start) {
703     res = gst_pad_push_event (self->srcpad, flush_start);
704   }
705
706   gst_pad_stop_task (self->srcpad);
707
708   return res;
709 }
710
711 static void
712 gst_aggregator_start_srcpad_task (GstAggregator * self)
713 {
714   GST_INFO_OBJECT (self, "Starting srcpad task");
715
716   self->priv->running = TRUE;
717   gst_pad_start_task (GST_PAD (self->srcpad),
718       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
719 }
720
721 static GstFlowReturn
722 gst_aggregator_flush (GstAggregator * self)
723 {
724   GstFlowReturn ret = GST_FLOW_OK;
725   GstAggregatorPrivate *priv = self->priv;
726   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
727
728   GST_DEBUG_OBJECT (self, "Flushing everything");
729   GST_OBJECT_LOCK (self);
730   priv->send_segment = TRUE;
731   priv->flush_seeking = FALSE;
732   priv->tags_changed = FALSE;
733   GST_OBJECT_UNLOCK (self);
734   if (klass->flush)
735     ret = klass->flush (self);
736
737   return ret;
738 }
739
740
741 /* Called with GstAggregator's object lock held */
742
743 static gboolean
744 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
745 {
746   GList *tmp;
747   GstAggregatorPad *tmppad;
748
749   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
750     tmppad = (GstAggregatorPad *) tmp->data;
751
752     if (_check_pending_flush_stop (tmppad) == FALSE) {
753       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
754           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
755       return FALSE;
756     }
757   }
758
759   return TRUE;
760 }
761
762 static void
763 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
764     GstEvent * event)
765 {
766   GstBuffer *tmpbuf;
767   GstAggregatorPrivate *priv = self->priv;
768   GstAggregatorPadPrivate *padpriv = aggpad->priv;
769
770   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
771   /*  Remove pad buffer and wake up the streaming thread */
772   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
773   gst_buffer_replace (&tmpbuf, NULL);
774   PAD_STREAM_LOCK (aggpad);
775   if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
776           TRUE, FALSE) == TRUE) {
777     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
778     g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
779   }
780
781   GST_OBJECT_LOCK (self);
782   if (priv->flush_seeking) {
783     /* If flush_seeking we forward the first FLUSH_START */
784     if (priv->pending_flush_start) {
785       priv->pending_flush_start = FALSE;
786       GST_OBJECT_UNLOCK (self);
787
788       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
789       gst_aggregator_stop_srcpad_task (self, event);
790       priv->flow_return = GST_FLOW_OK;
791
792       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
793       GST_PAD_STREAM_LOCK (self->srcpad);
794       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
795       event = NULL;
796     } else {
797       GST_OBJECT_UNLOCK (self);
798       gst_event_unref (event);
799     }
800   } else {
801     GST_OBJECT_UNLOCK (self);
802     gst_event_unref (event);
803   }
804   PAD_STREAM_UNLOCK (aggpad);
805
806   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
807   gst_buffer_replace (&tmpbuf, NULL);
808 }
809
810 /* GstAggregator vmethods default implementations */
811 static gboolean
812 gst_aggregator_default_sink_event (GstAggregator * self,
813     GstAggregatorPad * aggpad, GstEvent * event)
814 {
815   gboolean res = TRUE;
816   GstPad *pad = GST_PAD (aggpad);
817   GstAggregatorPrivate *priv = self->priv;
818
819   switch (GST_EVENT_TYPE (event)) {
820     case GST_EVENT_FLUSH_START:
821     {
822       gst_aggregator_flush_start (self, aggpad, event);
823       /* We forward only in one case: right after flush_seeking */
824       event = NULL;
825       goto eat;
826     }
827     case GST_EVENT_FLUSH_STOP:
828     {
829       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
830
831       gst_aggregator_pad_flush (aggpad, self);
832       GST_OBJECT_LOCK (self);
833       if (priv->flush_seeking) {
834         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
835         if (gst_aggregator_all_flush_stop_received_locked (self)) {
836           GST_OBJECT_UNLOCK (self);
837           /* That means we received FLUSH_STOP/FLUSH_STOP on
838            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
839           gst_aggregator_flush (self);
840           gst_pad_push_event (self->srcpad, event);
841           event = NULL;
842           SRC_STREAM_LOCK (self);
843           priv->send_eos = TRUE;
844           SRC_STREAM_BROADCAST (self);
845           SRC_STREAM_UNLOCK (self);
846
847           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
848           GST_PAD_STREAM_UNLOCK (self->srcpad);
849           gst_aggregator_start_srcpad_task (self);
850         } else {
851           GST_OBJECT_UNLOCK (self);
852         }
853       } else {
854         GST_OBJECT_UNLOCK (self);
855       }
856
857       /* We never forward the event */
858       goto eat;
859     }
860     case GST_EVENT_EOS:
861     {
862       GST_DEBUG_OBJECT (aggpad, "EOS");
863
864       /* We still have a buffer, and we don't want the subclass to have to
865        * check for it. Mark pending_eos, eos will be set when steal_buffer is
866        * called
867        */
868       SRC_STREAM_LOCK (self);
869       PAD_LOCK (aggpad);
870       if (!aggpad->buffer) {
871         aggpad->eos = TRUE;
872       } else {
873         aggpad->priv->pending_eos = TRUE;
874       }
875       PAD_UNLOCK (aggpad);
876
877       SRC_STREAM_BROADCAST (self);
878       SRC_STREAM_UNLOCK (self);
879       goto eat;
880     }
881     case GST_EVENT_SEGMENT:
882     {
883       PAD_LOCK (aggpad);
884       gst_event_copy_segment (event, &aggpad->segment);
885       self->priv->seqnum = gst_event_get_seqnum (event);
886       PAD_UNLOCK (aggpad);
887       goto eat;
888     }
889     case GST_EVENT_STREAM_START:
890     {
891       goto eat;
892     }
893     case GST_EVENT_TAG:
894     {
895       GstTagList *tags;
896
897       gst_event_parse_tag (event, &tags);
898
899       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
900         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
901         gst_event_unref (event);
902         event = NULL;
903         goto eat;
904       }
905       break;
906     }
907     default:
908     {
909       break;
910     }
911   }
912
913   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
914   return gst_pad_event_default (pad, GST_OBJECT (self), event);
915
916 eat:
917   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
918   if (event)
919     gst_event_unref (event);
920
921   return res;
922 }
923
924 static inline gboolean
925 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
926     gpointer unused_udata)
927 {
928   gst_aggregator_pad_flush (pad, self);
929
930   return TRUE;
931 }
932
933 static gboolean
934 gst_aggregator_stop (GstAggregator * agg)
935 {
936   GstAggregatorClass *klass;
937   gboolean result;
938
939   gst_aggregator_reset_flow_values (agg);
940
941   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
942
943   klass = GST_AGGREGATOR_GET_CLASS (agg);
944
945   if (klass->stop)
946     result = klass->stop (agg);
947   else
948     result = TRUE;
949
950   if (agg->priv->tags)
951     gst_tag_list_unref (agg->priv->tags);
952   agg->priv->tags = NULL;
953
954   return result;
955 }
956
957 /* GstElement vmethods implementations */
958 static GstStateChangeReturn
959 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
960 {
961   GstStateChangeReturn ret;
962   GstAggregator *self = GST_AGGREGATOR (element);
963
964   switch (transition) {
965     case GST_STATE_CHANGE_READY_TO_PAUSED:
966       if (!gst_aggregator_start (self))
967         goto error_start;
968       break;
969     default:
970       break;
971   }
972
973   if ((ret =
974           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
975               transition)) == GST_STATE_CHANGE_FAILURE)
976     goto failure;
977
978
979   switch (transition) {
980     case GST_STATE_CHANGE_PAUSED_TO_READY:
981       if (!gst_aggregator_stop (self)) {
982         /* What to do in this case? Error out? */
983         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
984       }
985       break;
986     default:
987       break;
988   }
989
990   return ret;
991
992 /* ERRORS */
993 failure:
994   {
995     GST_ERROR_OBJECT (element, "parent failed state change");
996     return ret;
997   }
998 error_start:
999   {
1000     GST_ERROR_OBJECT (element, "Subclass failed to start");
1001     return GST_STATE_CHANGE_FAILURE;
1002   }
1003 }
1004
1005 static void
1006 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1007 {
1008   GstAggregator *self = GST_AGGREGATOR (element);
1009   GstBuffer *tmpbuf;
1010
1011   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1012
1013   GST_INFO_OBJECT (pad, "Removing pad");
1014
1015   SRC_STREAM_LOCK (self);
1016   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
1017   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
1018   gst_buffer_replace (&tmpbuf, NULL);
1019   gst_element_remove_pad (element, pad);
1020
1021   SRC_STREAM_BROADCAST (self);
1022   SRC_STREAM_UNLOCK (self);
1023 }
1024
1025 static GstPad *
1026 gst_aggregator_request_new_pad (GstElement * element,
1027     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1028 {
1029   GstAggregator *self;
1030   GstAggregatorPad *agg_pad;
1031
1032   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
1033   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1034
1035   self = GST_AGGREGATOR (element);
1036
1037   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
1038     gint serial = 0;
1039     gchar *name = NULL;
1040
1041     GST_OBJECT_LOCK (element);
1042     if (req_name == NULL || strlen (req_name) < 6
1043         || !g_str_has_prefix (req_name, "sink_")) {
1044       /* no name given when requesting the pad, use next available int */
1045       priv->padcount++;
1046     } else {
1047       /* parse serial number from requested padname */
1048       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1049       if (serial >= priv->padcount)
1050         priv->padcount = serial;
1051     }
1052
1053     name = g_strdup_printf ("sink_%u", priv->padcount);
1054     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1055         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1056     g_free (name);
1057
1058     GST_OBJECT_UNLOCK (element);
1059
1060   } else {
1061     return NULL;
1062   }
1063
1064   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1065
1066   if (priv->running)
1067     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1068
1069   /* add the pad to the element */
1070   gst_element_add_pad (element, GST_PAD (agg_pad));
1071
1072   return GST_PAD (agg_pad);
1073 }
1074
1075 typedef struct
1076 {
1077   GstClockTime min, max;
1078   gboolean live;
1079 } LatencyData;
1080
1081 static gboolean
1082 gst_aggregator_query_sink_latency_foreach (GstAggregator * self,
1083     GstAggregatorPad * pad, gpointer user_data)
1084 {
1085   LatencyData *data = user_data;
1086   GstClockTime min, max;
1087   GstQuery *query;
1088   gboolean live, res;
1089
1090   query = gst_query_new_latency ();
1091   res = gst_pad_peer_query (GST_PAD_CAST (pad), query);
1092
1093   if (res) {
1094     gst_query_parse_latency (query, &live, &min, &max);
1095
1096     GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT
1097         " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1098
1099     if (min != GST_CLOCK_TIME_NONE && min > data->min)
1100       data->min = min;
1101
1102     if (max != GST_CLOCK_TIME_NONE &&
1103         ((data->max != GST_CLOCK_TIME_NONE && max < data->max) ||
1104             (data->max == GST_CLOCK_TIME_NONE)))
1105       data->max = max;
1106
1107     data->live |= live;
1108   }
1109
1110   gst_query_unref (query);
1111
1112   return TRUE;
1113 }
1114
1115 /**
1116  * gst_aggregator_get_latency_unlocked:
1117  * @self: a #GstAggregator
1118  * @live: (out) (allow-none): whether @self is live
1119  * @min_latency: (out) (allow-none): the configured minimum latency of @self
1120  * @max_latency: (out) (allow-none): the configured maximum latency of @self
1121  *
1122  * Retreives the latency values reported by @self in response to the latency
1123  * query.
1124  *
1125  * Typically only called by subclasses.
1126  *
1127  * MUST be called with the object lock held.
1128  */
1129 void
1130 gst_aggregator_get_latency_unlocked (GstAggregator * self, gboolean * live,
1131     GstClockTime * min_latency, GstClockTime * max_latency)
1132 {
1133   GstClockTime our_latency;
1134   GstClockTime min, max;
1135
1136   g_return_if_fail (GST_IS_AGGREGATOR (self));
1137
1138   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1139   min = self->priv->latency_min;
1140   max = self->priv->latency_max;
1141
1142   min += self->priv->sub_latency_min;
1143   if (GST_CLOCK_TIME_IS_VALID (max)
1144       && GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max))
1145     max += self->priv->sub_latency_max;
1146
1147   our_latency = self->priv->latency;
1148   if (GST_CLOCK_TIME_IS_VALID (our_latency)) {
1149     min += our_latency;
1150     if (GST_CLOCK_TIME_IS_VALID (max))
1151       max += our_latency;
1152   }
1153
1154   if (live)
1155     *live = self->priv->latency_live;
1156   if (min_latency)
1157     *min_latency = min;
1158   if (max_latency)
1159     *max_latency = max;
1160 }
1161
1162 static gboolean
1163 gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
1164 {
1165   GstClockTime our_latency;
1166   LatencyData data;
1167
1168   data.min = 0;
1169   data.max = GST_CLOCK_TIME_NONE;
1170   data.live = FALSE;
1171
1172   /* query upstream's latency */
1173   SRC_STREAM_LOCK (self);
1174   gst_aggregator_iterate_sinkpads (self,
1175       gst_aggregator_query_sink_latency_foreach, &data);
1176   SRC_STREAM_UNLOCK (self);
1177
1178   our_latency = self->priv->latency;
1179
1180   if (data.live && GST_CLOCK_TIME_IS_VALID (our_latency) &&
1181       our_latency > data.max) {
1182     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1183         ("%s", "Latency too big"),
1184         ("The requested latency value is too big for the current pipeline.  "
1185             "Limiting to %" G_GINT64_FORMAT, data.max));
1186     self->priv->latency = data.max;
1187     /* FIXME: shouldn't we g_object_notify() the change here? */
1188   }
1189
1190   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (data.min))) {
1191     GST_WARNING_OBJECT (self, "Invalid minimum latency, using 0");
1192     data.min = 0;
1193   }
1194
1195   if (G_UNLIKELY (data.min > data.max)) {
1196     GST_WARNING_OBJECT (self, "Minimum latency is greater than maximum latency "
1197         "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT "). "
1198         "Clamping it at the maximum latency", data.min, data.max);
1199     data.min = data.max;
1200   }
1201
1202   self->priv->latency_live = data.live;
1203   self->priv->latency_min = data.min;
1204   self->priv->latency_max = data.max;
1205
1206   /* add our own */
1207   if (GST_CLOCK_TIME_IS_VALID (our_latency)) {
1208     if (GST_CLOCK_TIME_IS_VALID (data.min))
1209       data.min += our_latency;
1210     if (GST_CLOCK_TIME_IS_VALID (data.max))
1211       data.max += our_latency;
1212   }
1213
1214   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_min)
1215       && GST_CLOCK_TIME_IS_VALID (data.min))
1216     data.min += self->priv->sub_latency_min;
1217   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1218       && GST_CLOCK_TIME_IS_VALID (data.max))
1219     data.max += self->priv->sub_latency_max;
1220
1221   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1222       " max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min,
1223       data.max);
1224
1225   gst_query_set_latency (query, data.live, data.min, data.max);
1226
1227   return TRUE;
1228 }
1229
1230 static gboolean
1231 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1232 {
1233   GstAggregator *self = GST_AGGREGATOR (element);
1234
1235   GST_STATE_LOCK (element);
1236   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1237       GST_STATE (element) < GST_STATE_PAUSED) {
1238     gdouble rate;
1239     GstFormat fmt;
1240     GstSeekFlags flags;
1241     GstSeekType start_type, stop_type;
1242     gint64 start, stop;
1243
1244     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1245         &start, &stop_type, &stop);
1246     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1247         stop_type, stop, NULL);
1248
1249     self->priv->seqnum = gst_event_get_seqnum (event);
1250     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1251   }
1252   GST_STATE_UNLOCK (element);
1253
1254
1255   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1256       event);
1257 }
1258
1259 static gboolean
1260 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1261 {
1262   gboolean res = TRUE;
1263
1264   switch (GST_QUERY_TYPE (query)) {
1265     case GST_QUERY_SEEKING:
1266     {
1267       GstFormat format;
1268
1269       /* don't pass it along as some (file)sink might claim it does
1270        * whereas with a collectpads in between that will not likely work */
1271       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1272       gst_query_set_seeking (query, format, FALSE, 0, -1);
1273       res = TRUE;
1274
1275       goto discard;
1276     }
1277     case GST_QUERY_LATENCY:
1278     {
1279       gboolean ret;
1280
1281       ret = gst_aggregator_query_latency (self, query);
1282       /* Wake up the src thread again, due to changed latencies
1283        * or changed live-ness we might have to adjust if we wait
1284        * on a deadline at all and how long.
1285        * This is only to unschedule the clock id, we don't really care
1286        * about the GCond here.
1287        */
1288       SRC_STREAM_LOCK (self);
1289       SRC_STREAM_BROADCAST (self);
1290       SRC_STREAM_UNLOCK (self);
1291       return ret;
1292     }
1293     default:
1294       break;
1295   }
1296
1297   return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1298
1299 discard:
1300   return res;
1301 }
1302
1303 static gboolean
1304 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1305 {
1306   EventData *evdata = user_data;
1307   gboolean ret = TRUE;
1308   GstPad *peer = gst_pad_get_peer (pad);
1309   GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv;
1310
1311   if (peer) {
1312     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1313     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1314     gst_object_unref (peer);
1315   }
1316
1317   if (ret == FALSE) {
1318     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
1319       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1320
1321     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1322       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1323
1324       if (gst_pad_query (peer, seeking)) {
1325         gboolean seekable;
1326
1327         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1328
1329         if (seekable == FALSE) {
1330           GST_INFO_OBJECT (pad,
1331               "Source not seekable, We failed but it does not matter!");
1332
1333           ret = TRUE;
1334         }
1335       } else {
1336         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1337       }
1338
1339       gst_query_unref (seeking);
1340     }
1341
1342     if (evdata->flush) {
1343       padpriv->pending_flush_start = FALSE;
1344       padpriv->pending_flush_stop = FALSE;
1345     }
1346   } else {
1347     evdata->one_actually_seeked = TRUE;
1348   }
1349
1350   evdata->result &= ret;
1351
1352   /* Always send to all pads */
1353   return FALSE;
1354 }
1355
1356 static EventData
1357 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1358     GstEvent * event, gboolean flush)
1359 {
1360   EventData evdata;
1361
1362   evdata.event = event;
1363   evdata.result = TRUE;
1364   evdata.flush = flush;
1365   evdata.one_actually_seeked = FALSE;
1366
1367   /* We first need to set all pads as flushing in a first pass
1368    * as flush_start flush_stop is sometimes sent synchronously
1369    * while we send the seek event */
1370   if (flush) {
1371     GList *l;
1372
1373     GST_OBJECT_LOCK (self);
1374     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1375       GstAggregatorPad *pad = l->data;
1376
1377       pad->priv->pending_flush_start = TRUE;
1378       pad->priv->pending_flush_stop = FALSE;
1379     }
1380     GST_OBJECT_UNLOCK (self);
1381   }
1382
1383   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
1384
1385   gst_event_unref (event);
1386
1387   return evdata;
1388 }
1389
1390 static gboolean
1391 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1392 {
1393   gdouble rate;
1394   GstFormat fmt;
1395   GstSeekFlags flags;
1396   GstSeekType start_type, stop_type;
1397   gint64 start, stop;
1398   gboolean flush;
1399   EventData evdata;
1400   GstAggregatorPrivate *priv = self->priv;
1401
1402   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1403       &start, &stop_type, &stop);
1404
1405   GST_INFO_OBJECT (self, "starting SEEK");
1406
1407   flush = flags & GST_SEEK_FLAG_FLUSH;
1408
1409   GST_OBJECT_LOCK (self);
1410   if (flush) {
1411     priv->pending_flush_start = TRUE;
1412     priv->flush_seeking = TRUE;
1413   }
1414
1415   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1416       stop_type, stop, NULL);
1417   GST_OBJECT_UNLOCK (self);
1418
1419   /* forward the seek upstream */
1420   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
1421   event = NULL;
1422
1423   if (!evdata.result || !evdata.one_actually_seeked) {
1424     GST_OBJECT_LOCK (self);
1425     priv->flush_seeking = FALSE;
1426     priv->pending_flush_start = FALSE;
1427     GST_OBJECT_UNLOCK (self);
1428   }
1429
1430   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1431
1432   return evdata.result;
1433 }
1434
1435 static gboolean
1436 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
1437 {
1438   EventData evdata;
1439   gboolean res = TRUE;
1440
1441   switch (GST_EVENT_TYPE (event)) {
1442     case GST_EVENT_SEEK:
1443     {
1444       gst_event_ref (event);
1445       res = gst_aggregator_do_seek (self, event);
1446       gst_event_unref (event);
1447       event = NULL;
1448       goto done;
1449     }
1450     case GST_EVENT_NAVIGATION:
1451     {
1452       /* navigation is rather pointless. */
1453       res = FALSE;
1454       gst_event_unref (event);
1455       goto done;
1456     }
1457     default:
1458     {
1459       break;
1460     }
1461   }
1462
1463   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE);
1464   res = evdata.result;
1465
1466 done:
1467   return res;
1468 }
1469
1470 static gboolean
1471 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
1472     GstEvent * event)
1473 {
1474   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1475
1476   return klass->src_event (GST_AGGREGATOR (parent), event);
1477 }
1478
1479 static gboolean
1480 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
1481     GstQuery * query)
1482 {
1483   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1484
1485   return klass->src_query (GST_AGGREGATOR (parent), query);
1486 }
1487
1488 static gboolean
1489 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
1490     GstObject * parent, GstPadMode mode, gboolean active)
1491 {
1492   GstAggregator *self = GST_AGGREGATOR (parent);
1493   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1494
1495   if (klass->src_activate) {
1496     if (klass->src_activate (self, mode, active) == FALSE) {
1497       return FALSE;
1498     }
1499   }
1500
1501   if (active == TRUE) {
1502     switch (mode) {
1503       case GST_PAD_MODE_PUSH:
1504       {
1505         GST_INFO_OBJECT (pad, "Activating pad!");
1506         gst_aggregator_start_srcpad_task (self);
1507         return TRUE;
1508       }
1509       default:
1510       {
1511         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1512         return FALSE;
1513       }
1514     }
1515   }
1516
1517   /* deactivating */
1518   GST_INFO_OBJECT (self, "Deactivating srcpad");
1519   gst_aggregator_stop_srcpad_task (self, FALSE);
1520
1521   return TRUE;
1522 }
1523
1524 static gboolean
1525 gst_aggregator_default_sink_query (GstAggregator * self,
1526     GstAggregatorPad * aggpad, GstQuery * query)
1527 {
1528   GstPad *pad = GST_PAD (aggpad);
1529
1530   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1531 }
1532
1533 static void
1534 gst_aggregator_finalize (GObject * object)
1535 {
1536   GstAggregator *self = (GstAggregator *) object;
1537
1538   g_mutex_clear (&self->priv->src_lock);
1539   g_cond_clear (&self->priv->src_cond);
1540
1541   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1542 }
1543
1544 /*
1545  * gst_aggregator_set_latency_property:
1546  * @agg: a #GstAggregator
1547  * @latency: the new latency value.
1548  *
1549  * Sets the new latency value to @latency. This value is used to limit the
1550  * amount of time a pad waits for data to appear before considering the pad
1551  * as unresponsive.
1552  */
1553 static void
1554 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1555 {
1556   gboolean changed;
1557
1558   g_return_if_fail (GST_IS_AGGREGATOR (self));
1559
1560   GST_OBJECT_LOCK (self);
1561
1562   if (self->priv->latency_live && self->priv->latency_max != 0 &&
1563       GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) {
1564     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1565         ("%s", "Latency too big"),
1566         ("The requested latency value is too big for the latency in the "
1567             "current pipeline.  Limiting to %" G_GINT64_FORMAT,
1568             self->priv->latency_max));
1569     latency = self->priv->latency_max;
1570   }
1571
1572   changed = (self->priv->latency != latency);
1573   self->priv->latency = latency;
1574   GST_OBJECT_UNLOCK (self);
1575
1576   if (changed)
1577     gst_element_post_message (GST_ELEMENT_CAST (self),
1578         gst_message_new_latency (GST_OBJECT_CAST (self)));
1579 }
1580
1581 /*
1582  * gst_aggregator_get_latency_property:
1583  * @agg: a #GstAggregator
1584  *
1585  * Gets the latency value. See gst_aggregator_set_latency for
1586  * more details.
1587  *
1588  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
1589  * before a pad is deemed unresponsive. A value of -1 means an
1590  * unlimited time.
1591  */
1592 static gint64
1593 gst_aggregator_get_latency_property (GstAggregator * agg)
1594 {
1595   gint64 res;
1596
1597   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
1598
1599   GST_OBJECT_LOCK (agg);
1600   res = agg->priv->latency;
1601   GST_OBJECT_UNLOCK (agg);
1602
1603   return res;
1604 }
1605
1606 static void
1607 gst_aggregator_set_property (GObject * object, guint prop_id,
1608     const GValue * value, GParamSpec * pspec)
1609 {
1610   GstAggregator *agg = GST_AGGREGATOR (object);
1611
1612   switch (prop_id) {
1613     case PROP_LATENCY:
1614       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
1615       break;
1616     default:
1617       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1618       break;
1619   }
1620 }
1621
1622 static void
1623 gst_aggregator_get_property (GObject * object, guint prop_id,
1624     GValue * value, GParamSpec * pspec)
1625 {
1626   GstAggregator *agg = GST_AGGREGATOR (object);
1627
1628   switch (prop_id) {
1629     case PROP_LATENCY:
1630       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
1631       break;
1632     default:
1633       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1634       break;
1635   }
1636 }
1637
1638 /* GObject vmethods implementations */
1639 static void
1640 gst_aggregator_class_init (GstAggregatorClass * klass)
1641 {
1642   GObjectClass *gobject_class = (GObjectClass *) klass;
1643   GstElementClass *gstelement_class = (GstElementClass *) klass;
1644
1645   aggregator_parent_class = g_type_class_peek_parent (klass);
1646   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1647
1648   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1649       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1650
1651   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1652
1653   klass->sink_event = gst_aggregator_default_sink_event;
1654   klass->sink_query = gst_aggregator_default_sink_query;
1655
1656   klass->src_event = gst_aggregator_default_src_event;
1657   klass->src_query = gst_aggregator_default_src_query;
1658
1659   gstelement_class->request_new_pad =
1660       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
1661   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
1662   gstelement_class->release_pad =
1663       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
1664   gstelement_class->change_state =
1665       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
1666
1667   gobject_class->set_property = gst_aggregator_set_property;
1668   gobject_class->get_property = gst_aggregator_get_property;
1669   gobject_class->finalize = gst_aggregator_finalize;
1670
1671   g_object_class_install_property (gobject_class, PROP_LATENCY,
1672       g_param_spec_int64 ("latency", "Buffer latency",
1673           "Additional latency in live mode to allow upstream "
1674           "to take longer to produce buffers for the current "
1675           "position", 0,
1676           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
1677           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1678
1679   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
1680   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_query_sink_latency_foreach);
1681 }
1682
1683 static void
1684 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1685 {
1686   GstPadTemplate *pad_template;
1687   GstAggregatorPrivate *priv;
1688
1689   g_return_if_fail (klass->aggregate != NULL);
1690
1691   self->priv =
1692       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1693       GstAggregatorPrivate);
1694
1695   priv = self->priv;
1696
1697   pad_template =
1698       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1699   g_return_if_fail (pad_template != NULL);
1700
1701   priv->padcount = -1;
1702   priv->tags_changed = FALSE;
1703
1704   self->priv->latency_live = FALSE;
1705   self->priv->latency_min = self->priv->sub_latency_min = 0;
1706   self->priv->latency_max = self->priv->sub_latency_max = GST_CLOCK_TIME_NONE;
1707   gst_aggregator_reset_flow_values (self);
1708
1709   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1710
1711   gst_pad_set_event_function (self->srcpad,
1712       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
1713   gst_pad_set_query_function (self->srcpad,
1714       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
1715   gst_pad_set_activatemode_function (self->srcpad,
1716       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
1717
1718   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1719
1720   self->priv->latency = DEFAULT_LATENCY;
1721
1722   g_mutex_init (&self->priv->src_lock);
1723   g_cond_init (&self->priv->src_cond);
1724 }
1725
1726 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1727  * method to get to the padtemplates */
1728 GType
1729 gst_aggregator_get_type (void)
1730 {
1731   static volatile gsize type = 0;
1732
1733   if (g_once_init_enter (&type)) {
1734     GType _type;
1735     static const GTypeInfo info = {
1736       sizeof (GstAggregatorClass),
1737       NULL,
1738       NULL,
1739       (GClassInitFunc) gst_aggregator_class_init,
1740       NULL,
1741       NULL,
1742       sizeof (GstAggregator),
1743       0,
1744       (GInstanceInitFunc) gst_aggregator_init,
1745     };
1746
1747     _type = g_type_register_static (GST_TYPE_ELEMENT,
1748         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1749     g_once_init_leave (&type, _type);
1750   }
1751   return type;
1752 }
1753
1754 static GstFlowReturn
1755 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1756 {
1757   GstBuffer *actual_buf = buffer;
1758   GstAggregator *self = GST_AGGREGATOR (object);
1759   GstAggregatorPrivate *priv = self->priv;
1760   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1761   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1762   GstFlowReturn flow_return;
1763
1764   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1765
1766   PAD_STREAM_LOCK (aggpad);
1767
1768   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1769     goto flushing;
1770
1771   if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
1772     goto eos;
1773
1774   PAD_LOCK (aggpad);
1775
1776   while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1777     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1778     PAD_WAIT_EVENT (aggpad);
1779   }
1780   PAD_UNLOCK (aggpad);
1781
1782   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1783     goto flushing;
1784
1785   if (aggclass->clip) {
1786     aggclass->clip (self, aggpad, buffer, &actual_buf);
1787   }
1788
1789   SRC_STREAM_LOCK (self);
1790   PAD_LOCK (aggpad);
1791   if (aggpad->buffer)
1792     gst_buffer_unref (aggpad->buffer);
1793   aggpad->buffer = actual_buf;
1794   PAD_UNLOCK (aggpad);
1795   PAD_STREAM_UNLOCK (aggpad);
1796
1797   if (gst_aggregator_check_pads_ready (self))
1798     SRC_STREAM_BROADCAST (self);
1799   SRC_STREAM_UNLOCK (self);
1800
1801   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1802
1803   GST_OBJECT_LOCK (self);
1804   flow_return = priv->flow_return;
1805   GST_OBJECT_UNLOCK (self);
1806
1807   return flow_return;
1808
1809 flushing:
1810   PAD_STREAM_UNLOCK (aggpad);
1811
1812   gst_buffer_unref (buffer);
1813   GST_DEBUG_OBJECT (aggpad, "We are flushing");
1814
1815   return GST_FLOW_FLUSHING;
1816
1817 eos:
1818   PAD_STREAM_UNLOCK (aggpad);
1819
1820   gst_buffer_unref (buffer);
1821   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1822
1823   return GST_FLOW_EOS;
1824 }
1825
1826 static gboolean
1827 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
1828     GstQuery * query)
1829 {
1830   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1831   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1832
1833   if (GST_QUERY_IS_SERIALIZED (query)) {
1834     PAD_LOCK (aggpad);
1835
1836     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
1837       PAD_UNLOCK (aggpad);
1838       goto flushing;
1839     }
1840
1841     while (aggpad->buffer
1842         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1843       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1844       PAD_WAIT_EVENT (aggpad);
1845     }
1846     PAD_UNLOCK (aggpad);
1847
1848     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1849       goto flushing;
1850   }
1851
1852   return klass->sink_query (GST_AGGREGATOR (parent),
1853       GST_AGGREGATOR_PAD (pad), query);
1854
1855 flushing:
1856   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
1857   return FALSE;
1858 }
1859
1860 static gboolean
1861 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
1862     GstEvent * event)
1863 {
1864   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1865   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1866
1867   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
1868       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
1869     PAD_LOCK (aggpad);
1870
1871     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1872         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
1873       PAD_UNLOCK (aggpad);
1874       goto flushing;
1875     }
1876
1877     while (aggpad->buffer
1878         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1879       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1880       PAD_WAIT_EVENT (aggpad);
1881     }
1882     PAD_UNLOCK (aggpad);
1883
1884     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1885         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
1886       goto flushing;
1887   }
1888
1889   return klass->sink_event (GST_AGGREGATOR (parent),
1890       GST_AGGREGATOR_PAD (pad), event);
1891
1892 flushing:
1893   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
1894   if (GST_EVENT_IS_STICKY (event))
1895     gst_pad_store_sticky_event (pad, event);
1896   gst_event_unref (event);
1897   return FALSE;
1898 }
1899
1900 static gboolean
1901 gst_aggregator_pad_activate_mode_func (GstPad * pad,
1902     GstObject * parent, GstPadMode mode, gboolean active)
1903 {
1904   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1905
1906   if (active == FALSE) {
1907     PAD_LOCK (aggpad);
1908     g_atomic_int_set (&aggpad->priv->flushing, TRUE);
1909     gst_buffer_replace (&aggpad->buffer, NULL);
1910     PAD_BROADCAST_EVENT (aggpad);
1911     PAD_UNLOCK (aggpad);
1912   } else {
1913     PAD_LOCK (aggpad);
1914     g_atomic_int_set (&aggpad->priv->flushing, FALSE);
1915     PAD_BROADCAST_EVENT (aggpad);
1916     PAD_UNLOCK (aggpad);
1917   }
1918
1919   return TRUE;
1920 }
1921
1922 /***********************************
1923  * GstAggregatorPad implementation  *
1924  ************************************/
1925 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1926
1927 static void
1928 gst_aggregator_pad_constructed (GObject * object)
1929 {
1930   GstPad *pad = GST_PAD (object);
1931
1932   gst_pad_set_chain_function (pad,
1933       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
1934   gst_pad_set_event_function (pad,
1935       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func));
1936   gst_pad_set_query_function (pad,
1937       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
1938   gst_pad_set_activatemode_function (pad,
1939       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
1940 }
1941
1942 static void
1943 gst_aggregator_pad_finalize (GObject * object)
1944 {
1945   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1946
1947   g_cond_clear (&pad->priv->event_cond);
1948   g_mutex_clear (&pad->priv->stream_lock);
1949
1950   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
1951 }
1952
1953 static void
1954 gst_aggregator_pad_dispose (GObject * object)
1955 {
1956   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1957   GstBuffer *buf;
1958
1959   buf = gst_aggregator_pad_steal_buffer (pad);
1960   if (buf)
1961     gst_buffer_unref (buf);
1962
1963   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
1964 }
1965
1966 static void
1967 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
1968 {
1969   GObjectClass *gobject_class = (GObjectClass *) klass;
1970
1971   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
1972
1973   gobject_class->constructed = gst_aggregator_pad_constructed;
1974   gobject_class->finalize = gst_aggregator_pad_finalize;
1975   gobject_class->dispose = gst_aggregator_pad_dispose;
1976 }
1977
1978 static void
1979 gst_aggregator_pad_init (GstAggregatorPad * pad)
1980 {
1981   pad->priv =
1982       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
1983       GstAggregatorPadPrivate);
1984
1985   pad->buffer = NULL;
1986   g_cond_init (&pad->priv->event_cond);
1987
1988   g_mutex_init (&pad->priv->stream_lock);
1989 }
1990
1991 /**
1992  * gst_aggregator_pad_steal_buffer_unlocked:
1993  * @pad: the pad to get buffer from
1994  *
1995  * Steal the ref to the buffer currently queued in @pad.
1996  *
1997  * MUST be called with the pad's object lock held.
1998  *
1999  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2000  *   queued. You should unref the buffer after usage.
2001  */
2002 GstBuffer *
2003 gst_aggregator_pad_steal_buffer_unlocked (GstAggregatorPad * pad)
2004 {
2005   GstBuffer *buffer = NULL;
2006
2007   if (pad->buffer) {
2008     GST_TRACE_OBJECT (pad, "Consuming buffer");
2009     buffer = pad->buffer;
2010     pad->buffer = NULL;
2011     if (pad->priv->pending_eos) {
2012       pad->priv->pending_eos = FALSE;
2013       pad->eos = TRUE;
2014     }
2015     PAD_BROADCAST_EVENT (pad);
2016     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2017   }
2018
2019   return buffer;
2020 }
2021
2022 /**
2023  * gst_aggregator_pad_steal_buffer:
2024  * @pad: the pad to get buffer from
2025  *
2026  * Steal the ref to the buffer currently queued in @pad.
2027  *
2028  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2029  *   queued. You should unref the buffer after usage.
2030  */
2031 GstBuffer *
2032 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
2033 {
2034   GstBuffer *buffer = NULL;
2035
2036   PAD_LOCK (pad);
2037   buffer = gst_aggregator_pad_steal_buffer_unlocked (pad);
2038   PAD_UNLOCK (pad);
2039
2040   return buffer;
2041 }
2042
2043 /**
2044  * gst_aggregator_pad_get_buffer:
2045  * @pad: the pad to get buffer from
2046  *
2047  * Returns: (transfer full): A reference to the buffer in @pad or
2048  * NULL if no buffer was queued. You should unref the buffer after
2049  * usage.
2050  */
2051 GstBuffer *
2052 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
2053 {
2054   GstBuffer *buffer = NULL;
2055
2056   PAD_LOCK (pad);
2057   if (pad->buffer)
2058     buffer = gst_buffer_ref (pad->buffer);
2059   PAD_UNLOCK (pad);
2060
2061   return buffer;
2062 }
2063
2064 /**
2065  * gst_aggregator_merge_tags:
2066  * @self: a #GstAggregator
2067  * @tags: a #GstTagList to merge
2068  * @mode: the #GstTagMergeMode to use
2069  *
2070  * Adds tags to so-called pending tags, which will be processed
2071  * before pushing out data downstream.
2072  *
2073  * Note that this is provided for convenience, and the subclass is
2074  * not required to use this and can still do tag handling on its own.
2075  *
2076  * MT safe.
2077  */
2078 void
2079 gst_aggregator_merge_tags (GstAggregator * self,
2080     const GstTagList * tags, GstTagMergeMode mode)
2081 {
2082   GstTagList *otags;
2083
2084   g_return_if_fail (GST_IS_AGGREGATOR (self));
2085   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
2086
2087   /* FIXME Check if we can use OBJECT lock here! */
2088   GST_OBJECT_LOCK (self);
2089   if (tags)
2090     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2091   otags = self->priv->tags;
2092   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2093   if (otags)
2094     gst_tag_list_unref (otags);
2095   self->priv->tags_changed = TRUE;
2096   GST_OBJECT_UNLOCK (self);
2097 }
2098
2099 /**
2100  * gst_aggregator_set_latency:
2101  * @self: a #GstAggregator
2102  * @min_latency: minimum latency
2103  * @max_latency: maximum latency
2104  *
2105  * Lets #GstAggregator sub-classes tell the baseclass what their internal
2106  * latency is. Will also post a LATENCY message on the bus so the pipeline
2107  * can reconfigure its global latency.
2108  */
2109 void
2110 gst_aggregator_set_latency (GstAggregator * self,
2111     GstClockTime min_latency, GstClockTime max_latency)
2112 {
2113   g_return_if_fail (GST_IS_AGGREGATOR (self));
2114   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
2115   g_return_if_fail (max_latency >= min_latency);
2116
2117   GST_OBJECT_LOCK (self);
2118   self->priv->sub_latency_min = min_latency;
2119   self->priv->sub_latency_max = max_latency;
2120   GST_OBJECT_UNLOCK (self);
2121
2122   gst_element_post_message (GST_ELEMENT_CAST (self),
2123       gst_message_new_latency (GST_OBJECT_CAST (self)));
2124 }