aggregator: add simple support for caps handling
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: manages a set of pads with the purpose of
26  * aggregating their buffers.
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *
35  *  * When data is queued on all pads, tha aggregate vmethod is called.
36  *
37  *  * One can peek at the data on any given GstAggregatorPad with the
38  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
39  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
40  *    has been taken with steal_buffer (), a new buffer can be queued
41  *    on that pad.
42  *
43  *  * If the subclass wishes to push a buffer downstream in its aggregate
44  *    implementation, it should do so through the
45  *    gst_aggregator_finish_buffer () method. This method will take care
46  *    of sending and ordering mandatory events such as stream start, caps
47  *    and segment.
48  *
49  *  * Same goes for EOS events, which should not be pushed directly by the
50  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
51  *    implementation.
52  *
53  *  * Note that the aggregator logic regarding gap event handling is to turn
54  *    these into gap buffers with matching PTS and duration. It will also
55  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
56  *    to ease their identification and subsequent processing.
57  *
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68 typedef enum
69 {
70   GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
71   GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
72   GST_AGGREGATOR_START_TIME_SELECTION_SET
73 } GstAggregatorStartTimeSelection;
74
75 static GType
76 gst_aggregator_start_time_selection_get_type (void)
77 {
78   static GType gtype = 0;
79
80   if (gtype == 0) {
81     static const GEnumValue values[] = {
82       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
83           "Start at 0 running time (default)", "zero"},
84       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
85           "Start at first observed input running time", "first"},
86       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
87           "Set start time with start-time property", "set"},
88       {0, NULL, NULL}
89     };
90
91     gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
92   }
93   return gtype;
94 }
95
96 /*  Might become API */
97 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
98     const GstTagList * tags, GstTagMergeMode mode);
99 static void gst_aggregator_set_latency_property (GstAggregator * agg,
100     gint64 latency);
101 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
102
103
104 /* Locking order, locks in this element must always be taken in this order
105  *
106  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
107  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
108  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
109  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
110  * standard element object lock -> GST_OBJECT_LOCK(agg)
111  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
112  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
113  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
114  */
115
116
117 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
118
119 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
120 #define GST_CAT_DEFAULT aggregator_debug
121
122 /* GstAggregatorPad definitions */
123 #define PAD_LOCK(pad)   G_STMT_START {                                  \
124   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
125         g_thread_self());                                               \
126   g_mutex_lock(&pad->priv->lock);                                       \
127   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
128         g_thread_self());                                               \
129   } G_STMT_END
130
131 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
132   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
133       g_thread_self());                                                 \
134   g_mutex_unlock(&pad->priv->lock);                                     \
135   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
136         g_thread_self());                                               \
137   } G_STMT_END
138
139
140 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
141   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
142         g_thread_self());                                               \
143   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
144       (&((GstAggregatorPad*)pad)->priv->lock));                         \
145   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
146         g_thread_self());                                               \
147   } G_STMT_END
148
149 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
150   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
151         g_thread_self());                                              \
152   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
153   } G_STMT_END
154
155
156 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
157   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
158         g_thread_self());                                               \
159   g_mutex_lock(&pad->priv->flush_lock);                                 \
160   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
161         g_thread_self());                                               \
162   } G_STMT_END
163
164 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
165   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
166         g_thread_self());                                               \
167   g_mutex_unlock(&pad->priv->flush_lock);                               \
168   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
169         g_thread_self());                                               \
170   } G_STMT_END
171
172 #define SRC_LOCK(self)   G_STMT_START {                             \
173   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
174       g_thread_self());                                             \
175   g_mutex_lock(&self->priv->src_lock);                              \
176   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
177         g_thread_self());                                           \
178   } G_STMT_END
179
180 #define SRC_UNLOCK(self)  G_STMT_START {                            \
181   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
182         g_thread_self());                                           \
183   g_mutex_unlock(&self->priv->src_lock);                            \
184   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
185         g_thread_self());                                           \
186   } G_STMT_END
187
188 #define SRC_WAIT(self) G_STMT_START {                               \
189   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
190         g_thread_self());                                           \
191   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
192   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
193         g_thread_self());                                           \
194   } G_STMT_END
195
196 #define SRC_BROADCAST(self) G_STMT_START {                          \
197     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
198         g_thread_self());                                           \
199     if (self->priv->aggregate_id)                                   \
200       gst_clock_id_unschedule (self->priv->aggregate_id);           \
201     g_cond_broadcast(&(self->priv->src_cond));                      \
202   } G_STMT_END
203
204 struct _GstAggregatorPadPrivate
205 {
206   /* Following fields are protected by the PAD_LOCK */
207   GstFlowReturn flow_return;
208   gboolean pending_flush_start;
209   gboolean pending_flush_stop;
210   gboolean pending_eos;
211
212   gboolean first_buffer;
213
214   GQueue buffers;
215   GstBuffer *clipped_buffer;
216   guint num_buffers;
217   GstClockTime head_position;
218   GstClockTime tail_position;
219   GstClockTime head_time;
220   GstClockTime tail_time;
221   GstClockTime time_level;
222   GstSegment head_segment;      /* segment before the queue */
223
224   gboolean eos;
225
226   GMutex lock;
227   GCond event_cond;
228   /* This lock prevents a flush start processing happening while
229    * the chain function is also happening.
230    */
231   GMutex flush_lock;
232 };
233
234 /* Must be called with PAD_LOCK held */
235 static void
236 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
237 {
238   aggpad->priv->pending_eos = FALSE;
239   aggpad->priv->eos = FALSE;
240   aggpad->priv->flow_return = GST_FLOW_OK;
241   GST_OBJECT_LOCK (aggpad);
242   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
243   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
244   GST_OBJECT_UNLOCK (aggpad);
245   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
246   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
247   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
248   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
249   aggpad->priv->time_level = 0;
250   aggpad->priv->first_buffer = TRUE;
251 }
252
253 static gboolean
254 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
255 {
256   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
257
258   PAD_LOCK (aggpad);
259   gst_aggregator_pad_reset_unlocked (aggpad);
260   PAD_UNLOCK (aggpad);
261
262   if (klass->flush)
263     return klass->flush (aggpad, agg);
264
265   return TRUE;
266 }
267
268 /*************************************
269  * GstAggregator implementation  *
270  *************************************/
271 static GstElementClass *aggregator_parent_class = NULL;
272
273 /* All members are protected by the object lock unless otherwise noted */
274
275 struct _GstAggregatorPrivate
276 {
277   gint max_padserial;
278
279   /* Our state is >= PAUSED */
280   gboolean running;             /* protected by src_lock */
281
282   gint seqnum;
283   gboolean send_stream_start;   /* protected by srcpad stream lock */
284   gboolean send_segment;
285   gboolean flush_seeking;
286   gboolean pending_flush_start;
287   gboolean send_eos;            /* protected by srcpad stream lock */
288
289   GstCaps *srccaps;             /* protected by the srcpad stream lock */
290
291   GstTagList *tags;
292   gboolean tags_changed;
293
294   gboolean peer_latency_live;   /* protected by src_lock */
295   GstClockTime peer_latency_min;        /* protected by src_lock */
296   GstClockTime peer_latency_max;        /* protected by src_lock */
297   gboolean has_peer_latency;    /* protected by src_lock */
298
299   GstClockTime sub_latency_min; /* protected by src_lock */
300   GstClockTime sub_latency_max; /* protected by src_lock */
301
302   /* aggregate */
303   GstClockID aggregate_id;      /* protected by src_lock */
304   GMutex src_lock;
305   GCond src_cond;
306
307   gboolean first_buffer;        /* protected by object lock */
308   GstAggregatorStartTimeSelection start_time_selection;
309   GstClockTime start_time;
310
311   /* properties */
312   gint64 latency;               /* protected by both src_lock and all pad locks */
313 };
314
315 typedef struct
316 {
317   GstEvent *event;
318   gboolean result;
319   gboolean flush;
320   gboolean only_to_active_pads;
321
322   gboolean one_actually_seeked;
323 } EventData;
324
325 #define DEFAULT_LATENCY              0
326 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
327 #define DEFAULT_START_TIME           (-1)
328
329 enum
330 {
331   PROP_0,
332   PROP_LATENCY,
333   PROP_START_TIME_SELECTION,
334   PROP_START_TIME,
335   PROP_LAST
336 };
337
338 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
339     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
340
341 /**
342  * gst_aggregator_iterate_sinkpads:
343  * @self: The #GstAggregator
344  * @func: (scope call): The function to call.
345  * @user_data: (closure): The data to pass to @func.
346  *
347  * Iterate the sinkpads of aggregator to call a function on them.
348  *
349  * This method guarantees that @func will be called only once for each
350  * sink pad.
351  */
352 gboolean
353 gst_aggregator_iterate_sinkpads (GstAggregator * self,
354     GstAggregatorPadForeachFunc func, gpointer user_data)
355 {
356   gboolean result = FALSE;
357   GstIterator *iter;
358   gboolean done = FALSE;
359   GValue item = { 0, };
360   GList *seen_pads = NULL;
361
362   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
363
364   if (!iter)
365     goto no_iter;
366
367   while (!done) {
368     switch (gst_iterator_next (iter, &item)) {
369       case GST_ITERATOR_OK:
370       {
371         GstAggregatorPad *pad;
372
373         pad = g_value_get_object (&item);
374
375         /* if already pushed, skip. FIXME, find something faster to tag pads */
376         if (pad == NULL || g_list_find (seen_pads, pad)) {
377           g_value_reset (&item);
378           break;
379         }
380
381         GST_LOG_OBJECT (pad, "calling function %s on pad",
382             GST_DEBUG_FUNCPTR_NAME (func));
383
384         result = func (self, pad, user_data);
385
386         done = !result;
387
388         seen_pads = g_list_prepend (seen_pads, pad);
389
390         g_value_reset (&item);
391         break;
392       }
393       case GST_ITERATOR_RESYNC:
394         gst_iterator_resync (iter);
395         break;
396       case GST_ITERATOR_ERROR:
397         GST_ERROR_OBJECT (self,
398             "Could not iterate over internally linked pads");
399         done = TRUE;
400         break;
401       case GST_ITERATOR_DONE:
402         done = TRUE;
403         break;
404     }
405   }
406   g_value_unset (&item);
407   gst_iterator_free (iter);
408
409   if (seen_pads == NULL) {
410     GST_DEBUG_OBJECT (self, "No pad seen");
411     return FALSE;
412   }
413
414   g_list_free (seen_pads);
415
416 no_iter:
417   return result;
418 }
419
420 static gboolean
421 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
422 {
423   return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
424       pad->priv->clipped_buffer == NULL);
425 }
426
427 static gboolean
428 gst_aggregator_check_pads_ready (GstAggregator * self)
429 {
430   GstAggregatorPad *pad;
431   GList *l, *sinkpads;
432   gboolean have_buffer = TRUE;
433   gboolean have_event = FALSE;
434
435   GST_LOG_OBJECT (self, "checking pads");
436
437   GST_OBJECT_LOCK (self);
438
439   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
440   if (sinkpads == NULL)
441     goto no_sinkpads;
442
443   for (l = sinkpads; l != NULL; l = l->next) {
444     pad = l->data;
445
446     PAD_LOCK (pad);
447
448     if (pad->priv->num_buffers == 0) {
449       if (!gst_aggregator_pad_queue_is_empty (pad))
450         have_event = TRUE;
451       if (!pad->priv->eos) {
452         have_buffer = FALSE;
453
454         /* If not live we need data on all pads, so leave the loop */
455         if (!self->priv->peer_latency_live) {
456           PAD_UNLOCK (pad);
457           goto pad_not_ready;
458         }
459       }
460     } else if (self->priv->peer_latency_live) {
461       /* In live mode, having a single pad with buffers is enough to
462        * generate a start time from it. In non-live mode all pads need
463        * to have a buffer
464        */
465       self->priv->first_buffer = FALSE;
466     }
467
468     PAD_UNLOCK (pad);
469   }
470
471   if (!have_buffer && !have_event)
472     goto pad_not_ready;
473
474   if (have_buffer)
475     self->priv->first_buffer = FALSE;
476
477   GST_OBJECT_UNLOCK (self);
478   GST_LOG_OBJECT (self, "pads are ready");
479   return TRUE;
480
481 no_sinkpads:
482   {
483     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
484     GST_OBJECT_UNLOCK (self);
485     return FALSE;
486   }
487 pad_not_ready:
488   {
489     if (have_event)
490       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
491           " but waking up for serialized event");
492     else
493       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
494     GST_OBJECT_UNLOCK (self);
495     return have_event;
496   }
497 }
498
499 static void
500 gst_aggregator_reset_flow_values (GstAggregator * self)
501 {
502   GST_OBJECT_LOCK (self);
503   self->priv->send_stream_start = TRUE;
504   self->priv->send_segment = TRUE;
505   gst_segment_init (&self->segment, GST_FORMAT_TIME);
506   self->priv->first_buffer = TRUE;
507   GST_OBJECT_UNLOCK (self);
508 }
509
510 static inline void
511 gst_aggregator_push_mandatory_events (GstAggregator * self)
512 {
513   GstAggregatorPrivate *priv = self->priv;
514   GstEvent *segment = NULL;
515   GstEvent *tags = NULL;
516
517   if (self->priv->send_stream_start) {
518     gchar s_id[32];
519
520     GST_INFO_OBJECT (self, "pushing stream start");
521     /* stream-start (FIXME: create id based on input ids) */
522     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
523     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
524       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
525     }
526     self->priv->send_stream_start = FALSE;
527   }
528
529   if (self->priv->srccaps) {
530
531     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
532         self->priv->srccaps);
533     if (!gst_pad_push_event (self->srcpad,
534             gst_event_new_caps (self->priv->srccaps))) {
535       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
536     }
537     gst_caps_unref (self->priv->srccaps);
538     self->priv->srccaps = NULL;
539   }
540
541   GST_OBJECT_LOCK (self);
542   if (self->priv->send_segment && !self->priv->flush_seeking) {
543     segment = gst_event_new_segment (&self->segment);
544
545     if (!self->priv->seqnum)
546       self->priv->seqnum = gst_event_get_seqnum (segment);
547     else
548       gst_event_set_seqnum (segment, self->priv->seqnum);
549     self->priv->send_segment = FALSE;
550
551     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
552   }
553
554   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
555     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
556     priv->tags_changed = FALSE;
557   }
558   GST_OBJECT_UNLOCK (self);
559
560   if (segment)
561     gst_pad_push_event (self->srcpad, segment);
562   if (tags)
563     gst_pad_push_event (self->srcpad, tags);
564
565 }
566
567 /**
568  * gst_aggregator_set_src_caps:
569  * @self: The #GstAggregator
570  * @caps: The #GstCaps to set on the src pad.
571  *
572  * Sets the caps to be used on the src pad.
573  */
574 void
575 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
576 {
577   GST_PAD_STREAM_LOCK (self->srcpad);
578   gst_caps_replace (&self->priv->srccaps, caps);
579   gst_aggregator_push_mandatory_events (self);
580   GST_PAD_STREAM_UNLOCK (self->srcpad);
581 }
582
583 /**
584  * gst_aggregator_finish_buffer:
585  * @self: The #GstAggregator
586  * @buffer: (transfer full): the #GstBuffer to push.
587  *
588  * This method will push the provided output buffer downstream. If needed,
589  * mandatory events such as stream-start, caps, and segment events will be
590  * sent before pushing the buffer.
591  */
592 GstFlowReturn
593 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
594 {
595   gst_aggregator_push_mandatory_events (self);
596
597   GST_OBJECT_LOCK (self);
598   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
599     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
600     GST_OBJECT_UNLOCK (self);
601     return gst_pad_push (self->srcpad, buffer);
602   } else {
603     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
604         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
605     GST_OBJECT_UNLOCK (self);
606     gst_buffer_unref (buffer);
607     return GST_FLOW_OK;
608   }
609 }
610
611 static void
612 gst_aggregator_push_eos (GstAggregator * self)
613 {
614   GstEvent *event;
615   gst_aggregator_push_mandatory_events (self);
616
617   event = gst_event_new_eos ();
618
619   GST_OBJECT_LOCK (self);
620   self->priv->send_eos = FALSE;
621   gst_event_set_seqnum (event, self->priv->seqnum);
622   GST_OBJECT_UNLOCK (self);
623
624   gst_pad_push_event (self->srcpad, event);
625 }
626
627 static GstClockTime
628 gst_aggregator_get_next_time (GstAggregator * self)
629 {
630   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
631
632   if (klass->get_next_time)
633     return klass->get_next_time (self);
634
635   return GST_CLOCK_TIME_NONE;
636 }
637
638 static gboolean
639 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
640 {
641   GstClockTime latency;
642   GstClockTime start;
643   gboolean res;
644
645   *timeout = FALSE;
646
647   SRC_LOCK (self);
648
649   latency = gst_aggregator_get_latency_unlocked (self);
650
651   if (gst_aggregator_check_pads_ready (self)) {
652     GST_DEBUG_OBJECT (self, "all pads have data");
653     SRC_UNLOCK (self);
654
655     return TRUE;
656   }
657
658   /* Before waiting, check if we're actually still running */
659   if (!self->priv->running || !self->priv->send_eos) {
660     SRC_UNLOCK (self);
661
662     return FALSE;
663   }
664
665   start = gst_aggregator_get_next_time (self);
666
667   /* If we're not live, or if we use the running time
668    * of the first buffer as start time, we wait until
669    * all pads have buffers.
670    * Otherwise (i.e. if we are live!), we wait on the clock
671    * and if a pad does not have a buffer in time we ignore
672    * that pad.
673    */
674   GST_OBJECT_LOCK (self);
675   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
676       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
677       !GST_CLOCK_TIME_IS_VALID (start) ||
678       (self->priv->first_buffer
679           && self->priv->start_time_selection ==
680           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
681     /* We wake up here when something happened, and below
682      * then check if we're ready now. If we return FALSE,
683      * we will be directly called again.
684      */
685     GST_OBJECT_UNLOCK (self);
686     SRC_WAIT (self);
687   } else {
688     GstClockTime base_time, time;
689     GstClock *clock;
690     GstClockReturn status;
691     GstClockTimeDiff jitter;
692
693     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
694         GST_TIME_ARGS (start));
695
696     base_time = GST_ELEMENT_CAST (self)->base_time;
697     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
698     GST_OBJECT_UNLOCK (self);
699
700     time = base_time + start;
701     time += latency;
702
703     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
704         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
705         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
706         GST_TIME_ARGS (time),
707         GST_TIME_ARGS (base_time),
708         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
709         GST_TIME_ARGS (gst_clock_get_time (clock)));
710
711     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
712     gst_object_unref (clock);
713     SRC_UNLOCK (self);
714
715     jitter = 0;
716     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
717
718     SRC_LOCK (self);
719     if (self->priv->aggregate_id) {
720       gst_clock_id_unref (self->priv->aggregate_id);
721       self->priv->aggregate_id = NULL;
722     }
723
724     GST_DEBUG_OBJECT (self,
725         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
726         status, GST_STIME_ARGS (jitter));
727
728     /* we timed out */
729     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
730       SRC_UNLOCK (self);
731       *timeout = TRUE;
732       return TRUE;
733     }
734   }
735
736   res = gst_aggregator_check_pads_ready (self);
737   SRC_UNLOCK (self);
738
739   return res;
740 }
741
742 static gboolean
743 check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
744 {
745   GstEvent *event = NULL;
746   GstAggregatorClass *klass = NULL;
747   gboolean *processed_event = user_data;
748
749   do {
750     event = NULL;
751
752     PAD_LOCK (pad);
753     if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
754       pad->priv->pending_eos = FALSE;
755       pad->priv->eos = TRUE;
756     }
757     if (pad->priv->clipped_buffer == NULL &&
758         GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
759       event = g_queue_pop_tail (&pad->priv->buffers);
760       PAD_BROADCAST_EVENT (pad);
761     }
762     PAD_UNLOCK (pad);
763     if (event) {
764       if (processed_event)
765         *processed_event = TRUE;
766       if (klass == NULL)
767         klass = GST_AGGREGATOR_GET_CLASS (self);
768
769       GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
770       klass->sink_event (self, pad, event);
771     }
772   } while (event != NULL);
773
774   return TRUE;
775 }
776
777 static void
778 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
779     GstFlowReturn flow_return, gboolean full)
780 {
781   GList *item;
782
783   PAD_LOCK (aggpad);
784   if (flow_return == GST_FLOW_NOT_LINKED)
785     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
786   else
787     aggpad->priv->flow_return = flow_return;
788
789   item = g_queue_peek_head_link (&aggpad->priv->buffers);
790   while (item) {
791     GList *next = item->next;
792
793     /* In partial flush, we do like the pad, we get rid of non-sticky events
794      * and EOS/SEGMENT.
795      */
796     if (full || GST_IS_BUFFER (item->data) ||
797         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
798         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
799         !GST_EVENT_IS_STICKY (item->data)) {
800       gst_mini_object_unref (item->data);
801       g_queue_delete_link (&aggpad->priv->buffers, item);
802     }
803     item = next;
804   }
805   aggpad->priv->num_buffers = 0;
806   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
807
808   PAD_BROADCAST_EVENT (aggpad);
809   PAD_UNLOCK (aggpad);
810 }
811
812 static GstFlowReturn
813 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
814     GstCaps ** ret)
815 {
816   *ret = gst_caps_ref (caps);
817
818   return GST_FLOW_OK;
819 }
820
821 static GstCaps *
822 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
823 {
824   caps = gst_caps_fixate (caps);
825
826   return caps;
827 }
828
829 static gboolean
830 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
831 {
832   return TRUE;
833 }
834
835 /* WITH SRC_LOCK held */
836 static GstFlowReturn
837 gst_aggregator_update_src_caps (GstAggregator * self)
838 {
839   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
840   GstCaps *downstream_caps, *template_caps, *caps = NULL;
841   GstFlowReturn ret = GST_FLOW_OK;
842
843   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
844   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
845
846   if (gst_caps_is_empty (downstream_caps)) {
847     GST_INFO_OBJECT (self, "Downstream caps (%"
848         GST_PTR_FORMAT ") not compatible with pad template caps (%"
849         GST_PTR_FORMAT ")", downstream_caps, template_caps);
850     ret = GST_FLOW_NOT_NEGOTIATED;
851     goto done;
852   }
853
854   g_assert (agg_klass->update_src_caps);
855   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
856       downstream_caps);
857   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
858   if (ret < GST_FLOW_OK) {
859     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
860     goto done;
861   }
862   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
863     ret = GST_FLOW_NOT_NEGOTIATED;
864     goto done;
865   }
866   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
867
868 #ifdef GST_ENABLE_EXTRA_CHECKS
869   if (!gst_caps_is_subset (caps, template_caps)) {
870     GstCaps *intersection;
871
872     GST_ERROR_OBJECT (self,
873         "update_src_caps returned caps %" GST_PTR_FORMAT
874         " which are not a real subset of the template caps %"
875         GST_PTR_FORMAT, caps, template_caps);
876     g_warning ("%s: update_src_caps returned caps which are not a real "
877         "subset of the filter caps", GST_ELEMENT_NAME (self));
878
879     intersection =
880         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
881     gst_caps_unref (caps);
882     caps = intersection;
883   }
884 #endif
885
886   if (gst_caps_is_any (caps)) {
887     goto done;
888   }
889
890   if (!gst_caps_is_fixed (caps)) {
891     g_assert (agg_klass->fixate_src_caps);
892
893     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
894     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
895       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
896       ret = GST_FLOW_NOT_NEGOTIATED;
897       goto done;
898     }
899     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
900   }
901
902   if (agg_klass->negotiated_src_caps) {
903     if (!agg_klass->negotiated_src_caps (self, caps)) {
904       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
905       ret = GST_FLOW_NOT_NEGOTIATED;
906       goto done;
907     }
908   }
909
910   gst_aggregator_set_src_caps (self, caps);
911
912 done:
913   gst_caps_unref (downstream_caps);
914   gst_caps_unref (template_caps);
915
916   if (caps)
917     gst_caps_unref (caps);
918
919   return ret;
920 }
921
922 static void
923 gst_aggregator_aggregate_func (GstAggregator * self)
924 {
925   GstAggregatorPrivate *priv = self->priv;
926   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
927   gboolean timeout = FALSE;
928
929   if (self->priv->running == FALSE) {
930     GST_DEBUG_OBJECT (self, "Not running anymore");
931     return;
932   }
933
934   GST_LOG_OBJECT (self, "Checking aggregate");
935   while (priv->send_eos && priv->running) {
936     GstFlowReturn flow_return = GST_FLOW_OK;
937     gboolean processed_event = FALSE;
938
939     gst_aggregator_iterate_sinkpads (self, check_events, NULL);
940
941     if (!gst_aggregator_wait_and_check (self, &timeout))
942       continue;
943
944     gst_aggregator_iterate_sinkpads (self, check_events, &processed_event);
945     if (processed_event)
946       continue;
947
948     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
949       flow_return = gst_aggregator_update_src_caps (self);
950       if (flow_return != GST_FLOW_OK)
951         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
952     }
953
954     if (timeout || flow_return >= GST_FLOW_OK) {
955       GST_TRACE_OBJECT (self, "Actually aggregating!");
956       flow_return = klass->aggregate (self, timeout);
957     }
958
959     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
960       continue;
961
962     GST_OBJECT_LOCK (self);
963     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
964       /* We don't want to set the pads to flushing, but we want to
965        * stop the thread, so just break here */
966       GST_OBJECT_UNLOCK (self);
967       break;
968     }
969     GST_OBJECT_UNLOCK (self);
970
971     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
972       gst_aggregator_push_eos (self);
973     }
974
975     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
976
977     if (flow_return != GST_FLOW_OK) {
978       GList *item;
979
980       GST_OBJECT_LOCK (self);
981       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
982         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
983
984         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
985       }
986       GST_OBJECT_UNLOCK (self);
987       break;
988     }
989   }
990
991   /* Pause the task here, the only ways to get here are:
992    * 1) We're stopping, in which case the task is stopped anyway
993    * 2) We got a flow error above, in which case it might take
994    *    some time to forward the flow return upstream and we
995    *    would otherwise call the task function over and over
996    *    again without doing anything
997    */
998   gst_pad_pause_task (self->srcpad);
999 }
1000
1001 static gboolean
1002 gst_aggregator_start (GstAggregator * self)
1003 {
1004   GstAggregatorClass *klass;
1005   gboolean result;
1006
1007   self->priv->send_stream_start = TRUE;
1008   self->priv->send_segment = TRUE;
1009   self->priv->send_eos = TRUE;
1010   self->priv->srccaps = NULL;
1011
1012   klass = GST_AGGREGATOR_GET_CLASS (self);
1013
1014   if (klass->start)
1015     result = klass->start (self);
1016   else
1017     result = TRUE;
1018
1019   return result;
1020 }
1021
1022 static gboolean
1023 _check_pending_flush_stop (GstAggregatorPad * pad)
1024 {
1025   gboolean res;
1026
1027   PAD_LOCK (pad);
1028   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1029   PAD_UNLOCK (pad);
1030
1031   return res;
1032 }
1033
1034 static gboolean
1035 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1036 {
1037   gboolean res = TRUE;
1038
1039   GST_INFO_OBJECT (self, "%s srcpad task",
1040       flush_start ? "Pausing" : "Stopping");
1041
1042   SRC_LOCK (self);
1043   self->priv->running = FALSE;
1044   SRC_BROADCAST (self);
1045   SRC_UNLOCK (self);
1046
1047   if (flush_start) {
1048     res = gst_pad_push_event (self->srcpad, flush_start);
1049   }
1050
1051   gst_pad_stop_task (self->srcpad);
1052
1053   return res;
1054 }
1055
1056 static void
1057 gst_aggregator_start_srcpad_task (GstAggregator * self)
1058 {
1059   GST_INFO_OBJECT (self, "Starting srcpad task");
1060
1061   self->priv->running = TRUE;
1062   gst_pad_start_task (GST_PAD (self->srcpad),
1063       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1064 }
1065
1066 static GstFlowReturn
1067 gst_aggregator_flush (GstAggregator * self)
1068 {
1069   GstFlowReturn ret = GST_FLOW_OK;
1070   GstAggregatorPrivate *priv = self->priv;
1071   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1072
1073   GST_DEBUG_OBJECT (self, "Flushing everything");
1074   GST_OBJECT_LOCK (self);
1075   priv->send_segment = TRUE;
1076   priv->flush_seeking = FALSE;
1077   priv->tags_changed = FALSE;
1078   GST_OBJECT_UNLOCK (self);
1079   if (klass->flush)
1080     ret = klass->flush (self);
1081
1082   return ret;
1083 }
1084
1085
1086 /* Called with GstAggregator's object lock held */
1087
1088 static gboolean
1089 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1090 {
1091   GList *tmp;
1092   GstAggregatorPad *tmppad;
1093
1094   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1095     tmppad = (GstAggregatorPad *) tmp->data;
1096
1097     if (_check_pending_flush_stop (tmppad) == FALSE) {
1098       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1099           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1100       return FALSE;
1101     }
1102   }
1103
1104   return TRUE;
1105 }
1106
1107 static void
1108 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1109     GstEvent * event)
1110 {
1111   GstAggregatorPrivate *priv = self->priv;
1112   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1113
1114   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1115
1116   PAD_FLUSH_LOCK (aggpad);
1117   PAD_LOCK (aggpad);
1118   if (padpriv->pending_flush_start) {
1119     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1120
1121     padpriv->pending_flush_start = FALSE;
1122     padpriv->pending_flush_stop = TRUE;
1123   }
1124   PAD_UNLOCK (aggpad);
1125
1126   GST_OBJECT_LOCK (self);
1127   if (priv->flush_seeking) {
1128     /* If flush_seeking we forward the first FLUSH_START */
1129     if (priv->pending_flush_start) {
1130       priv->pending_flush_start = FALSE;
1131       GST_OBJECT_UNLOCK (self);
1132
1133       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1134       gst_aggregator_stop_srcpad_task (self, event);
1135
1136       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1137       GST_PAD_STREAM_LOCK (self->srcpad);
1138       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1139       event = NULL;
1140     } else {
1141       GST_OBJECT_UNLOCK (self);
1142       gst_event_unref (event);
1143     }
1144   } else {
1145     GST_OBJECT_UNLOCK (self);
1146     gst_event_unref (event);
1147   }
1148   PAD_FLUSH_UNLOCK (aggpad);
1149 }
1150
1151 /* Must be called with the the PAD_LOCK held */
1152 static void
1153 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1154 {
1155   if (head) {
1156     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
1157         aggpad->priv->head_segment.format == GST_FORMAT_TIME)
1158       aggpad->priv->head_time =
1159           gst_segment_to_running_time (&aggpad->priv->head_segment,
1160           GST_FORMAT_TIME, aggpad->priv->head_position);
1161     else
1162       aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
1163
1164     if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
1165       aggpad->priv->tail_time = aggpad->priv->head_time;
1166   } else {
1167     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
1168         aggpad->segment.format == GST_FORMAT_TIME)
1169       aggpad->priv->tail_time =
1170           gst_segment_to_running_time (&aggpad->segment,
1171           GST_FORMAT_TIME, aggpad->priv->tail_position);
1172     else
1173       aggpad->priv->tail_time = aggpad->priv->head_time;
1174   }
1175
1176   if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
1177       aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
1178     aggpad->priv->time_level = 0;
1179     return;
1180   }
1181
1182   if (aggpad->priv->tail_time > aggpad->priv->head_time)
1183     aggpad->priv->time_level = 0;
1184   else
1185     aggpad->priv->time_level = aggpad->priv->head_time -
1186         aggpad->priv->tail_time;
1187 }
1188
1189
1190 /* GstAggregator vmethods default implementations */
1191 static gboolean
1192 gst_aggregator_default_sink_event (GstAggregator * self,
1193     GstAggregatorPad * aggpad, GstEvent * event)
1194 {
1195   gboolean res = TRUE;
1196   GstPad *pad = GST_PAD (aggpad);
1197   GstAggregatorPrivate *priv = self->priv;
1198
1199   switch (GST_EVENT_TYPE (event)) {
1200     case GST_EVENT_FLUSH_START:
1201     {
1202       gst_aggregator_flush_start (self, aggpad, event);
1203       /* We forward only in one case: right after flush_seeking */
1204       event = NULL;
1205       goto eat;
1206     }
1207     case GST_EVENT_FLUSH_STOP:
1208     {
1209       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
1210
1211       gst_aggregator_pad_flush (aggpad, self);
1212       GST_OBJECT_LOCK (self);
1213       if (priv->flush_seeking) {
1214         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1215         if (gst_aggregator_all_flush_stop_received_locked (self)) {
1216           GST_OBJECT_UNLOCK (self);
1217           /* That means we received FLUSH_STOP/FLUSH_STOP on
1218            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1219           gst_aggregator_flush (self);
1220           gst_pad_push_event (self->srcpad, event);
1221           event = NULL;
1222           SRC_LOCK (self);
1223           priv->send_eos = TRUE;
1224           SRC_BROADCAST (self);
1225           SRC_UNLOCK (self);
1226
1227           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1228           GST_PAD_STREAM_UNLOCK (self->srcpad);
1229           gst_aggregator_start_srcpad_task (self);
1230         } else {
1231           GST_OBJECT_UNLOCK (self);
1232         }
1233       } else {
1234         GST_OBJECT_UNLOCK (self);
1235       }
1236
1237       /* We never forward the event */
1238       goto eat;
1239     }
1240     case GST_EVENT_EOS:
1241     {
1242       GST_DEBUG_OBJECT (aggpad, "EOS");
1243
1244       /* We still have a buffer, and we don't want the subclass to have to
1245        * check for it. Mark pending_eos, eos will be set when steal_buffer is
1246        * called
1247        */
1248       SRC_LOCK (self);
1249       PAD_LOCK (aggpad);
1250       if (aggpad->priv->num_buffers == 0) {
1251         aggpad->priv->eos = TRUE;
1252       } else {
1253         aggpad->priv->pending_eos = TRUE;
1254       }
1255       PAD_UNLOCK (aggpad);
1256
1257       SRC_BROADCAST (self);
1258       SRC_UNLOCK (self);
1259       goto eat;
1260     }
1261     case GST_EVENT_SEGMENT:
1262     {
1263       PAD_LOCK (aggpad);
1264       GST_OBJECT_LOCK (aggpad);
1265       gst_event_copy_segment (event, &aggpad->segment);
1266       update_time_level (aggpad, FALSE);
1267       GST_OBJECT_UNLOCK (aggpad);
1268       PAD_UNLOCK (aggpad);
1269
1270       GST_OBJECT_LOCK (self);
1271       self->priv->seqnum = gst_event_get_seqnum (event);
1272       GST_OBJECT_UNLOCK (self);
1273       goto eat;
1274     }
1275     case GST_EVENT_STREAM_START:
1276     {
1277       goto eat;
1278     }
1279     case GST_EVENT_GAP:
1280     {
1281       GstClockTime pts, endpts;
1282       GstClockTime duration;
1283       GstBuffer *gapbuf;
1284
1285       gst_event_parse_gap (event, &pts, &duration);
1286       gapbuf = gst_buffer_new ();
1287
1288       if (GST_CLOCK_TIME_IS_VALID (duration))
1289         endpts = pts + duration;
1290       else
1291         endpts = GST_CLOCK_TIME_NONE;
1292
1293       GST_OBJECT_LOCK (aggpad);
1294       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1295           &pts, &endpts);
1296       GST_OBJECT_UNLOCK (aggpad);
1297
1298       if (!res) {
1299         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1300         goto eat;
1301       }
1302
1303       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1304         duration = endpts - pts;
1305       else
1306         duration = GST_CLOCK_TIME_NONE;
1307
1308       GST_BUFFER_PTS (gapbuf) = pts;
1309       GST_BUFFER_DURATION (gapbuf) = duration;
1310       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1311       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1312
1313       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1314           GST_FLOW_OK) {
1315         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1316         res = FALSE;
1317       }
1318
1319       goto eat;
1320     }
1321     case GST_EVENT_TAG:
1322     {
1323       GstTagList *tags;
1324
1325       gst_event_parse_tag (event, &tags);
1326
1327       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
1328         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
1329         gst_event_unref (event);
1330         event = NULL;
1331         goto eat;
1332       }
1333       break;
1334     }
1335     default:
1336     {
1337       break;
1338     }
1339   }
1340
1341   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1342   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1343
1344 eat:
1345   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1346   if (event)
1347     gst_event_unref (event);
1348
1349   return res;
1350 }
1351
1352 static inline gboolean
1353 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
1354     gpointer unused_udata)
1355 {
1356   gst_aggregator_pad_flush (pad, self);
1357
1358   return TRUE;
1359 }
1360
1361 static gboolean
1362 gst_aggregator_stop (GstAggregator * agg)
1363 {
1364   GstAggregatorClass *klass;
1365   gboolean result;
1366
1367   gst_aggregator_reset_flow_values (agg);
1368
1369   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
1370
1371   klass = GST_AGGREGATOR_GET_CLASS (agg);
1372
1373   if (klass->stop)
1374     result = klass->stop (agg);
1375   else
1376     result = TRUE;
1377
1378   agg->priv->has_peer_latency = FALSE;
1379   agg->priv->peer_latency_live = FALSE;
1380   agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
1381
1382   if (agg->priv->tags)
1383     gst_tag_list_unref (agg->priv->tags);
1384   agg->priv->tags = NULL;
1385
1386   return result;
1387 }
1388
1389 /* GstElement vmethods implementations */
1390 static GstStateChangeReturn
1391 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1392 {
1393   GstStateChangeReturn ret;
1394   GstAggregator *self = GST_AGGREGATOR (element);
1395
1396   switch (transition) {
1397     case GST_STATE_CHANGE_READY_TO_PAUSED:
1398       if (!gst_aggregator_start (self))
1399         goto error_start;
1400       break;
1401     default:
1402       break;
1403   }
1404
1405   if ((ret =
1406           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1407               transition)) == GST_STATE_CHANGE_FAILURE)
1408     goto failure;
1409
1410
1411   switch (transition) {
1412     case GST_STATE_CHANGE_PAUSED_TO_READY:
1413       if (!gst_aggregator_stop (self)) {
1414         /* What to do in this case? Error out? */
1415         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1416       }
1417       break;
1418     default:
1419       break;
1420   }
1421
1422   return ret;
1423
1424 /* ERRORS */
1425 failure:
1426   {
1427     GST_ERROR_OBJECT (element, "parent failed state change");
1428     return ret;
1429   }
1430 error_start:
1431   {
1432     GST_ERROR_OBJECT (element, "Subclass failed to start");
1433     return GST_STATE_CHANGE_FAILURE;
1434   }
1435 }
1436
1437 static void
1438 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1439 {
1440   GstAggregator *self = GST_AGGREGATOR (element);
1441   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1442
1443   GST_INFO_OBJECT (pad, "Removing pad");
1444
1445   SRC_LOCK (self);
1446   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1447   gst_element_remove_pad (element, pad);
1448
1449   self->priv->has_peer_latency = FALSE;
1450   SRC_BROADCAST (self);
1451   SRC_UNLOCK (self);
1452 }
1453
1454 static GstAggregatorPad *
1455 gst_aggregator_default_create_new_pad (GstAggregator * self,
1456     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1457 {
1458   GstAggregatorPad *agg_pad;
1459   GstAggregatorPrivate *priv = self->priv;
1460   gint serial = 0;
1461   gchar *name = NULL;
1462
1463   if (templ->direction != GST_PAD_SINK ||
1464       g_strcmp0 (templ->name_template, "sink_%u") != 0)
1465     goto not_sink;
1466
1467   GST_OBJECT_LOCK (self);
1468   if (req_name == NULL || strlen (req_name) < 6
1469       || !g_str_has_prefix (req_name, "sink_")) {
1470     /* no name given when requesting the pad, use next available int */
1471     serial = ++priv->max_padserial;
1472   } else {
1473     /* parse serial number from requested padname */
1474     serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1475     if (serial > priv->max_padserial)
1476       priv->max_padserial = serial;
1477   }
1478
1479   name = g_strdup_printf ("sink_%u", serial);
1480   agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1481       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1482   g_free (name);
1483
1484   GST_OBJECT_UNLOCK (self);
1485
1486   return agg_pad;
1487
1488   /* errors */
1489 not_sink:
1490   {
1491     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad\n");
1492     return NULL;
1493   }
1494 }
1495
1496 static GstPad *
1497 gst_aggregator_request_new_pad (GstElement * element,
1498     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1499 {
1500   GstAggregator *self;
1501   GstAggregatorPad *agg_pad;
1502   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1503   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1504
1505   self = GST_AGGREGATOR (element);
1506
1507   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1508   if (!agg_pad) {
1509     GST_ERROR_OBJECT (element, "Couldn't create new pad");
1510     return NULL;
1511   }
1512
1513   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1514
1515   if (priv->running)
1516     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1517
1518   /* add the pad to the element */
1519   gst_element_add_pad (element, GST_PAD (agg_pad));
1520
1521   return GST_PAD (agg_pad);
1522 }
1523
1524 /* Must be called with SRC_LOCK held */
1525
1526 static gboolean
1527 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1528 {
1529   gboolean query_ret, live;
1530   GstClockTime our_latency, min, max;
1531
1532   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1533
1534   if (!query_ret) {
1535     GST_WARNING_OBJECT (self, "Latency query failed");
1536     return FALSE;
1537   }
1538
1539   gst_query_parse_latency (query, &live, &min, &max);
1540
1541   our_latency = self->priv->latency;
1542
1543   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1544     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1545         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1546     return FALSE;
1547   }
1548
1549   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1550     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1551         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1552             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1553             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1554     return FALSE;
1555   }
1556
1557   self->priv->peer_latency_live = live;
1558   self->priv->peer_latency_min = min;
1559   self->priv->peer_latency_max = max;
1560   self->priv->has_peer_latency = TRUE;
1561
1562   /* add our own */
1563   min += our_latency;
1564   min += self->priv->sub_latency_min;
1565   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1566       && GST_CLOCK_TIME_IS_VALID (max))
1567     max += self->priv->sub_latency_max + our_latency;
1568   else
1569     max = GST_CLOCK_TIME_NONE;
1570
1571   SRC_BROADCAST (self);
1572
1573   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1574       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1575
1576   gst_query_set_latency (query, live, min, max);
1577
1578   return query_ret;
1579 }
1580
1581 /*
1582  * MUST be called with the src_lock held.
1583  *
1584  * See  gst_aggregator_get_latency() for doc
1585  */
1586 static GstClockTime
1587 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1588 {
1589   GstClockTime latency;
1590
1591   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1592
1593   if (!self->priv->has_peer_latency) {
1594     GstQuery *query = gst_query_new_latency ();
1595     gboolean ret;
1596
1597     ret = gst_aggregator_query_latency_unlocked (self, query);
1598     gst_query_unref (query);
1599     if (!ret)
1600       return GST_CLOCK_TIME_NONE;
1601   }
1602
1603   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1604     return GST_CLOCK_TIME_NONE;
1605
1606   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1607   latency = self->priv->peer_latency_min;
1608
1609   /* add our own */
1610   latency += self->priv->latency;
1611   latency += self->priv->sub_latency_min;
1612
1613   return latency;
1614 }
1615
1616 /**
1617  * gst_aggregator_get_latency:
1618  * @self: a #GstAggregator
1619  *
1620  * Retrieves the latency values reported by @self in response to the latency
1621  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1622  * will not wait for the clock.
1623  *
1624  * Typically only called by subclasses.
1625  *
1626  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1627  */
1628 GstClockTime
1629 gst_aggregator_get_latency (GstAggregator * self)
1630 {
1631   GstClockTime ret;
1632
1633   SRC_LOCK (self);
1634   ret = gst_aggregator_get_latency_unlocked (self);
1635   SRC_UNLOCK (self);
1636
1637   return ret;
1638 }
1639
1640 static gboolean
1641 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1642 {
1643   GstAggregator *self = GST_AGGREGATOR (element);
1644
1645   GST_STATE_LOCK (element);
1646   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1647       GST_STATE (element) < GST_STATE_PAUSED) {
1648     gdouble rate;
1649     GstFormat fmt;
1650     GstSeekFlags flags;
1651     GstSeekType start_type, stop_type;
1652     gint64 start, stop;
1653
1654     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1655         &start, &stop_type, &stop);
1656
1657     GST_OBJECT_LOCK (self);
1658     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1659         stop_type, stop, NULL);
1660     self->priv->seqnum = gst_event_get_seqnum (event);
1661     self->priv->first_buffer = FALSE;
1662     GST_OBJECT_UNLOCK (self);
1663
1664     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1665   }
1666   GST_STATE_UNLOCK (element);
1667
1668
1669   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1670       event);
1671 }
1672
1673 static gboolean
1674 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1675 {
1676   gboolean res = TRUE;
1677
1678   switch (GST_QUERY_TYPE (query)) {
1679     case GST_QUERY_SEEKING:
1680     {
1681       GstFormat format;
1682
1683       /* don't pass it along as some (file)sink might claim it does
1684        * whereas with a collectpads in between that will not likely work */
1685       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1686       gst_query_set_seeking (query, format, FALSE, 0, -1);
1687       res = TRUE;
1688
1689       break;
1690     }
1691     case GST_QUERY_LATENCY:
1692       SRC_LOCK (self);
1693       res = gst_aggregator_query_latency_unlocked (self, query);
1694       SRC_UNLOCK (self);
1695       break;
1696     default:
1697       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1698   }
1699
1700   return res;
1701 }
1702
1703 static gboolean
1704 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1705 {
1706   EventData *evdata = user_data;
1707   gboolean ret = TRUE;
1708   GstPad *peer = gst_pad_get_peer (pad);
1709   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1710
1711   if (peer) {
1712     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1713       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1714       ret = TRUE;
1715     } else {
1716       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1717       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1718       gst_object_unref (peer);
1719     }
1720   }
1721
1722   if (ret == FALSE) {
1723     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1724       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1725
1726       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1727
1728       if (gst_pad_query (peer, seeking)) {
1729         gboolean seekable;
1730
1731         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1732
1733         if (seekable == FALSE) {
1734           GST_INFO_OBJECT (pad,
1735               "Source not seekable, We failed but it does not matter!");
1736
1737           ret = TRUE;
1738         }
1739       } else {
1740         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1741       }
1742
1743       gst_query_unref (seeking);
1744     }
1745
1746     if (evdata->flush) {
1747       PAD_LOCK (aggpad);
1748       aggpad->priv->pending_flush_start = FALSE;
1749       aggpad->priv->pending_flush_stop = FALSE;
1750       PAD_UNLOCK (aggpad);
1751     }
1752   } else {
1753     evdata->one_actually_seeked = TRUE;
1754   }
1755
1756   evdata->result &= ret;
1757
1758   /* Always send to all pads */
1759   return FALSE;
1760 }
1761
1762 static EventData
1763 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1764     GstEvent * event, gboolean flush, gboolean only_to_active_pads)
1765 {
1766   EventData evdata;
1767
1768   evdata.event = event;
1769   evdata.result = TRUE;
1770   evdata.flush = flush;
1771   evdata.one_actually_seeked = FALSE;
1772   evdata.only_to_active_pads = only_to_active_pads;
1773
1774   /* We first need to set all pads as flushing in a first pass
1775    * as flush_start flush_stop is sometimes sent synchronously
1776    * while we send the seek event */
1777   if (flush) {
1778     GList *l;
1779
1780     GST_OBJECT_LOCK (self);
1781     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1782       GstAggregatorPad *pad = l->data;
1783
1784       PAD_LOCK (pad);
1785       pad->priv->pending_flush_start = TRUE;
1786       pad->priv->pending_flush_stop = FALSE;
1787       PAD_UNLOCK (pad);
1788     }
1789     GST_OBJECT_UNLOCK (self);
1790   }
1791
1792   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
1793
1794   gst_event_unref (event);
1795
1796   return evdata;
1797 }
1798
1799 static gboolean
1800 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1801 {
1802   gdouble rate;
1803   GstFormat fmt;
1804   GstSeekFlags flags;
1805   GstSeekType start_type, stop_type;
1806   gint64 start, stop;
1807   gboolean flush;
1808   EventData evdata;
1809   GstAggregatorPrivate *priv = self->priv;
1810
1811   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1812       &start, &stop_type, &stop);
1813
1814   GST_INFO_OBJECT (self, "starting SEEK");
1815
1816   flush = flags & GST_SEEK_FLAG_FLUSH;
1817
1818   GST_OBJECT_LOCK (self);
1819   if (flush) {
1820     priv->pending_flush_start = TRUE;
1821     priv->flush_seeking = TRUE;
1822   }
1823
1824   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1825       stop_type, stop, NULL);
1826
1827   /* Seeking sets a position */
1828   self->priv->first_buffer = FALSE;
1829   GST_OBJECT_UNLOCK (self);
1830
1831   /* forward the seek upstream */
1832   evdata =
1833       gst_aggregator_forward_event_to_all_sinkpads (self, event, flush, FALSE);
1834   event = NULL;
1835
1836   if (!evdata.result || !evdata.one_actually_seeked) {
1837     GST_OBJECT_LOCK (self);
1838     priv->flush_seeking = FALSE;
1839     priv->pending_flush_start = FALSE;
1840     GST_OBJECT_UNLOCK (self);
1841   }
1842
1843   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1844
1845   return evdata.result;
1846 }
1847
1848 static gboolean
1849 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
1850 {
1851   EventData evdata;
1852   gboolean res = TRUE;
1853
1854   switch (GST_EVENT_TYPE (event)) {
1855     case GST_EVENT_SEEK:
1856     {
1857       gst_event_ref (event);
1858       res = gst_aggregator_do_seek (self, event);
1859       gst_event_unref (event);
1860       event = NULL;
1861       goto done;
1862     }
1863     case GST_EVENT_NAVIGATION:
1864     {
1865       /* navigation is rather pointless. */
1866       res = FALSE;
1867       gst_event_unref (event);
1868       goto done;
1869     }
1870     default:
1871     {
1872       break;
1873     }
1874   }
1875
1876   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
1877    * they will receive a QOS event that has earliest_time=0 (because we can't
1878    * have negative timestamps), and consider their buffer as too late */
1879   evdata =
1880       gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE,
1881       GST_EVENT_TYPE (event) == GST_EVENT_QOS);
1882   res = evdata.result;
1883
1884 done:
1885   return res;
1886 }
1887
1888 static gboolean
1889 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
1890     GstEvent * event)
1891 {
1892   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1893
1894   return klass->src_event (GST_AGGREGATOR (parent), event);
1895 }
1896
1897 static gboolean
1898 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
1899     GstQuery * query)
1900 {
1901   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1902
1903   return klass->src_query (GST_AGGREGATOR (parent), query);
1904 }
1905
1906 static gboolean
1907 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
1908     GstObject * parent, GstPadMode mode, gboolean active)
1909 {
1910   GstAggregator *self = GST_AGGREGATOR (parent);
1911   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1912
1913   if (klass->src_activate) {
1914     if (klass->src_activate (self, mode, active) == FALSE) {
1915       return FALSE;
1916     }
1917   }
1918
1919   if (active == TRUE) {
1920     switch (mode) {
1921       case GST_PAD_MODE_PUSH:
1922       {
1923         GST_INFO_OBJECT (pad, "Activating pad!");
1924         gst_aggregator_start_srcpad_task (self);
1925         return TRUE;
1926       }
1927       default:
1928       {
1929         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1930         return FALSE;
1931       }
1932     }
1933   }
1934
1935   /* deactivating */
1936   GST_INFO_OBJECT (self, "Deactivating srcpad");
1937   gst_aggregator_stop_srcpad_task (self, FALSE);
1938
1939   return TRUE;
1940 }
1941
1942 static gboolean
1943 gst_aggregator_default_sink_query (GstAggregator * self,
1944     GstAggregatorPad * aggpad, GstQuery * query)
1945 {
1946   GstPad *pad = GST_PAD (aggpad);
1947
1948   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1949 }
1950
1951 static void
1952 gst_aggregator_finalize (GObject * object)
1953 {
1954   GstAggregator *self = (GstAggregator *) object;
1955
1956   g_mutex_clear (&self->priv->src_lock);
1957   g_cond_clear (&self->priv->src_cond);
1958
1959   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1960 }
1961
1962 /*
1963  * gst_aggregator_set_latency_property:
1964  * @agg: a #GstAggregator
1965  * @latency: the new latency value (in nanoseconds).
1966  *
1967  * Sets the new latency value to @latency. This value is used to limit the
1968  * amount of time a pad waits for data to appear before considering the pad
1969  * as unresponsive.
1970  */
1971 static void
1972 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1973 {
1974   gboolean changed;
1975
1976   g_return_if_fail (GST_IS_AGGREGATOR (self));
1977   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
1978
1979   SRC_LOCK (self);
1980   changed = (self->priv->latency != latency);
1981
1982   if (changed) {
1983     GList *item;
1984
1985     GST_OBJECT_LOCK (self);
1986     /* First lock all the pads */
1987     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
1988       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1989       PAD_LOCK (aggpad);
1990     }
1991
1992     self->priv->latency = latency;
1993
1994     SRC_BROADCAST (self);
1995
1996     /* Now wake up the pads */
1997     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
1998       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1999       PAD_BROADCAST_EVENT (aggpad);
2000       PAD_UNLOCK (aggpad);
2001     }
2002     GST_OBJECT_UNLOCK (self);
2003   }
2004
2005   SRC_UNLOCK (self);
2006
2007   if (changed)
2008     gst_element_post_message (GST_ELEMENT_CAST (self),
2009         gst_message_new_latency (GST_OBJECT_CAST (self)));
2010 }
2011
2012 /*
2013  * gst_aggregator_get_latency_property:
2014  * @agg: a #GstAggregator
2015  *
2016  * Gets the latency value. See gst_aggregator_set_latency for
2017  * more details.
2018  *
2019  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2020  * before a pad is deemed unresponsive. A value of -1 means an
2021  * unlimited time.
2022  */
2023 static gint64
2024 gst_aggregator_get_latency_property (GstAggregator * agg)
2025 {
2026   gint64 res;
2027
2028   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
2029
2030   GST_OBJECT_LOCK (agg);
2031   res = agg->priv->latency;
2032   GST_OBJECT_UNLOCK (agg);
2033
2034   return res;
2035 }
2036
2037 static void
2038 gst_aggregator_set_property (GObject * object, guint prop_id,
2039     const GValue * value, GParamSpec * pspec)
2040 {
2041   GstAggregator *agg = GST_AGGREGATOR (object);
2042
2043   switch (prop_id) {
2044     case PROP_LATENCY:
2045       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
2046       break;
2047     case PROP_START_TIME_SELECTION:
2048       agg->priv->start_time_selection = g_value_get_enum (value);
2049       break;
2050     case PROP_START_TIME:
2051       agg->priv->start_time = g_value_get_uint64 (value);
2052       break;
2053     default:
2054       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2055       break;
2056   }
2057 }
2058
2059 static void
2060 gst_aggregator_get_property (GObject * object, guint prop_id,
2061     GValue * value, GParamSpec * pspec)
2062 {
2063   GstAggregator *agg = GST_AGGREGATOR (object);
2064
2065   switch (prop_id) {
2066     case PROP_LATENCY:
2067       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
2068       break;
2069     case PROP_START_TIME_SELECTION:
2070       g_value_set_enum (value, agg->priv->start_time_selection);
2071       break;
2072     case PROP_START_TIME:
2073       g_value_set_uint64 (value, agg->priv->start_time);
2074       break;
2075     default:
2076       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2077       break;
2078   }
2079 }
2080
2081 /* GObject vmethods implementations */
2082 static void
2083 gst_aggregator_class_init (GstAggregatorClass * klass)
2084 {
2085   GObjectClass *gobject_class = (GObjectClass *) klass;
2086   GstElementClass *gstelement_class = (GstElementClass *) klass;
2087
2088   aggregator_parent_class = g_type_class_peek_parent (klass);
2089   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
2090
2091   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2092       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2093
2094   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
2095
2096   klass->sink_event = gst_aggregator_default_sink_event;
2097   klass->sink_query = gst_aggregator_default_sink_query;
2098
2099   klass->src_event = gst_aggregator_default_src_event;
2100   klass->src_query = gst_aggregator_default_src_query;
2101
2102   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2103   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2104   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2105   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2106
2107   gstelement_class->request_new_pad =
2108       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2109   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2110   gstelement_class->release_pad =
2111       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2112   gstelement_class->change_state =
2113       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2114
2115   gobject_class->set_property = gst_aggregator_set_property;
2116   gobject_class->get_property = gst_aggregator_get_property;
2117   gobject_class->finalize = gst_aggregator_finalize;
2118
2119   g_object_class_install_property (gobject_class, PROP_LATENCY,
2120       g_param_spec_int64 ("latency", "Buffer latency",
2121           "Additional latency in live mode to allow upstream "
2122           "to take longer to produce buffers for the current "
2123           "position (in nanoseconds)", 0,
2124           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
2125           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2126
2127   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2128       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2129           "Decides which start time is output",
2130           gst_aggregator_start_time_selection_get_type (),
2131           DEFAULT_START_TIME_SELECTION,
2132           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2133
2134   g_object_class_install_property (gobject_class, PROP_START_TIME,
2135       g_param_spec_uint64 ("start-time", "Start Time",
2136           "Start time to use if start-time-selection=set", 0,
2137           G_MAXUINT64,
2138           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2139
2140   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
2141 }
2142
2143 static void
2144 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2145 {
2146   GstPadTemplate *pad_template;
2147   GstAggregatorPrivate *priv;
2148
2149   g_return_if_fail (klass->aggregate != NULL);
2150
2151   self->priv =
2152       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
2153       GstAggregatorPrivate);
2154
2155   priv = self->priv;
2156
2157   pad_template =
2158       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2159   g_return_if_fail (pad_template != NULL);
2160
2161   priv->max_padserial = -1;
2162   priv->tags_changed = FALSE;
2163
2164   self->priv->peer_latency_live = FALSE;
2165   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2166   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2167   self->priv->has_peer_latency = FALSE;
2168   gst_aggregator_reset_flow_values (self);
2169
2170   self->srcpad = gst_pad_new_from_template (pad_template, "src");
2171
2172   gst_pad_set_event_function (self->srcpad,
2173       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2174   gst_pad_set_query_function (self->srcpad,
2175       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2176   gst_pad_set_activatemode_function (self->srcpad,
2177       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2178
2179   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2180
2181   self->priv->latency = DEFAULT_LATENCY;
2182   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2183   self->priv->start_time = DEFAULT_START_TIME;
2184
2185   g_mutex_init (&self->priv->src_lock);
2186   g_cond_init (&self->priv->src_cond);
2187 }
2188
2189 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2190  * method to get to the padtemplates */
2191 GType
2192 gst_aggregator_get_type (void)
2193 {
2194   static volatile gsize type = 0;
2195
2196   if (g_once_init_enter (&type)) {
2197     GType _type;
2198     static const GTypeInfo info = {
2199       sizeof (GstAggregatorClass),
2200       NULL,
2201       NULL,
2202       (GClassInitFunc) gst_aggregator_class_init,
2203       NULL,
2204       NULL,
2205       sizeof (GstAggregator),
2206       0,
2207       (GInstanceInitFunc) gst_aggregator_init,
2208     };
2209
2210     _type = g_type_register_static (GST_TYPE_ELEMENT,
2211         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2212     g_once_init_leave (&type, _type);
2213   }
2214   return type;
2215 }
2216
2217 /* Must be called with SRC lock and PAD lock held */
2218 static gboolean
2219 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2220 {
2221   /* Empty queue always has space */
2222   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2223     return TRUE;
2224
2225   /* We also want at least two buffers, one is being processed and one is ready
2226    * for the next iteration when we operate in live mode. */
2227   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2228     return TRUE;
2229
2230   /* zero latency, if there is a buffer, it's full */
2231   if (self->priv->latency == 0)
2232     return FALSE;
2233
2234   /* Allow no more buffers than the latency */
2235   return (aggpad->priv->time_level <= self->priv->latency);
2236 }
2237
2238 /* Must be called with the PAD_LOCK held */
2239 static void
2240 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2241 {
2242   GstClockTime timestamp;
2243
2244   if (GST_BUFFER_DTS_IS_VALID (buffer))
2245     timestamp = GST_BUFFER_DTS (buffer);
2246   else
2247     timestamp = GST_BUFFER_PTS (buffer);
2248
2249   if (timestamp == GST_CLOCK_TIME_NONE) {
2250     if (head)
2251       timestamp = aggpad->priv->head_position;
2252     else
2253       timestamp = aggpad->priv->tail_position;
2254   }
2255
2256   /* add duration */
2257   if (GST_BUFFER_DURATION_IS_VALID (buffer))
2258     timestamp += GST_BUFFER_DURATION (buffer);
2259
2260   if (head)
2261     aggpad->priv->head_position = timestamp;
2262   else
2263     aggpad->priv->tail_position = timestamp;
2264
2265   update_time_level (aggpad, head);
2266 }
2267
2268 static GstFlowReturn
2269 gst_aggregator_pad_chain_internal (GstAggregator * self,
2270     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2271 {
2272   GstFlowReturn flow_return;
2273   GstClockTime buf_pts;
2274
2275   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
2276
2277   PAD_FLUSH_LOCK (aggpad);
2278
2279   PAD_LOCK (aggpad);
2280   flow_return = aggpad->priv->flow_return;
2281   if (flow_return != GST_FLOW_OK)
2282     goto flushing;
2283
2284   if (aggpad->priv->pending_eos == TRUE)
2285     goto eos;
2286
2287   PAD_UNLOCK (aggpad);
2288
2289   buf_pts = GST_BUFFER_PTS (buffer);
2290
2291   for (;;) {
2292     SRC_LOCK (self);
2293     GST_OBJECT_LOCK (self);
2294     PAD_LOCK (aggpad);
2295
2296     if (aggpad->priv->first_buffer) {
2297       self->priv->has_peer_latency = FALSE;
2298       aggpad->priv->first_buffer = FALSE;
2299     }
2300
2301     if (gst_aggregator_pad_has_space (self, aggpad)
2302         && aggpad->priv->flow_return == GST_FLOW_OK) {
2303       if (head)
2304         g_queue_push_head (&aggpad->priv->buffers, buffer);
2305       else
2306         g_queue_push_tail (&aggpad->priv->buffers, buffer);
2307       apply_buffer (aggpad, buffer, head);
2308       aggpad->priv->num_buffers++;
2309       buffer = NULL;
2310       SRC_BROADCAST (self);
2311       break;
2312     }
2313
2314     flow_return = aggpad->priv->flow_return;
2315     if (flow_return != GST_FLOW_OK) {
2316       GST_OBJECT_UNLOCK (self);
2317       SRC_UNLOCK (self);
2318       goto flushing;
2319     }
2320     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2321     GST_OBJECT_UNLOCK (self);
2322     SRC_UNLOCK (self);
2323     PAD_WAIT_EVENT (aggpad);
2324
2325     PAD_UNLOCK (aggpad);
2326   }
2327
2328   if (self->priv->first_buffer) {
2329     GstClockTime start_time;
2330
2331     switch (self->priv->start_time_selection) {
2332       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2333       default:
2334         start_time = 0;
2335         break;
2336       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2337         GST_OBJECT_LOCK (aggpad);
2338         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2339           start_time = buf_pts;
2340           if (start_time != -1) {
2341             start_time = MAX (start_time, aggpad->priv->head_segment.start);
2342             start_time =
2343                 gst_segment_to_running_time (&aggpad->priv->head_segment,
2344                 GST_FORMAT_TIME, start_time);
2345           }
2346         } else {
2347           start_time = 0;
2348           GST_WARNING_OBJECT (aggpad,
2349               "Ignoring request of selecting the first start time "
2350               "as the segment is a %s segment instead of a time segment",
2351               gst_format_get_name (aggpad->segment.format));
2352         }
2353         GST_OBJECT_UNLOCK (aggpad);
2354         break;
2355       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2356         start_time = self->priv->start_time;
2357         if (start_time == -1)
2358           start_time = 0;
2359         break;
2360     }
2361
2362     if (start_time != -1) {
2363       if (self->segment.position == -1)
2364         self->segment.position = start_time;
2365       else
2366         self->segment.position = MIN (start_time, self->segment.position);
2367
2368       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2369           GST_TIME_ARGS (start_time));
2370     }
2371   }
2372
2373   PAD_UNLOCK (aggpad);
2374   GST_OBJECT_UNLOCK (self);
2375   SRC_UNLOCK (self);
2376
2377   PAD_FLUSH_UNLOCK (aggpad);
2378
2379   GST_DEBUG_OBJECT (aggpad, "Done chaining");
2380
2381   return flow_return;
2382
2383 flushing:
2384   PAD_UNLOCK (aggpad);
2385   PAD_FLUSH_UNLOCK (aggpad);
2386
2387   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2388       gst_flow_get_name (flow_return));
2389   if (buffer)
2390     gst_buffer_unref (buffer);
2391
2392   return flow_return;
2393
2394 eos:
2395   PAD_UNLOCK (aggpad);
2396   PAD_FLUSH_UNLOCK (aggpad);
2397
2398   gst_buffer_unref (buffer);
2399   GST_DEBUG_OBJECT (aggpad, "We are EOS already...");
2400
2401   return GST_FLOW_EOS;
2402 }
2403
2404 static GstFlowReturn
2405 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2406 {
2407   return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2408       GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
2409 }
2410
2411 static gboolean
2412 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2413     GstQuery * query)
2414 {
2415   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2416   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2417
2418   if (GST_QUERY_IS_SERIALIZED (query)) {
2419     PAD_LOCK (aggpad);
2420
2421     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2422         && aggpad->priv->flow_return == GST_FLOW_OK) {
2423       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2424       PAD_WAIT_EVENT (aggpad);
2425     }
2426
2427     if (aggpad->priv->flow_return != GST_FLOW_OK)
2428       goto flushing;
2429
2430     PAD_UNLOCK (aggpad);
2431   }
2432
2433   return klass->sink_query (GST_AGGREGATOR (parent),
2434       GST_AGGREGATOR_PAD (pad), query);
2435
2436 flushing:
2437   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2438       gst_flow_get_name (aggpad->priv->flow_return));
2439   PAD_UNLOCK (aggpad);
2440   return FALSE;
2441 }
2442
2443 static GstFlowReturn
2444 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2445     GstEvent * event)
2446 {
2447   GstFlowReturn ret = GST_FLOW_OK;
2448   GstAggregator *self = GST_AGGREGATOR (parent);
2449   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2450   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2451
2452   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
2453       /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) {
2454     SRC_LOCK (self);
2455     PAD_LOCK (aggpad);
2456
2457     if (aggpad->priv->flow_return != GST_FLOW_OK
2458         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2459       ret = aggpad->priv->flow_return;
2460       goto flushing;
2461     }
2462
2463     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2464       GST_OBJECT_LOCK (aggpad);
2465       gst_event_copy_segment (event, &aggpad->priv->head_segment);
2466       aggpad->priv->head_position = aggpad->priv->head_segment.position;
2467       update_time_level (aggpad, TRUE);
2468       GST_OBJECT_UNLOCK (aggpad);
2469     }
2470
2471     if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2472       GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
2473           event);
2474       g_queue_push_head (&aggpad->priv->buffers, event);
2475       event = NULL;
2476       SRC_BROADCAST (self);
2477     }
2478     PAD_UNLOCK (aggpad);
2479     SRC_UNLOCK (self);
2480   }
2481
2482   if (event) {
2483     gboolean is_caps = (GST_EVENT_TYPE (event) == GST_EVENT_CAPS);
2484
2485     if (!klass->sink_event (self, aggpad, event)) {
2486       /* Copied from GstPad to convert boolean to a GstFlowReturn in
2487        * the event handling func */
2488       ret = is_caps ? GST_FLOW_NOT_NEGOTIATED : GST_FLOW_ERROR;
2489     }
2490   }
2491
2492   return ret;
2493
2494 flushing:
2495   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2496       gst_flow_get_name (aggpad->priv->flow_return));
2497   PAD_UNLOCK (aggpad);
2498   SRC_UNLOCK (self);
2499   if (GST_EVENT_IS_STICKY (event))
2500     gst_pad_store_sticky_event (pad, event);
2501   gst_event_unref (event);
2502
2503   return ret;
2504 }
2505
2506 static gboolean
2507 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2508     GstObject * parent, GstPadMode mode, gboolean active)
2509 {
2510   GstAggregator *self = GST_AGGREGATOR (parent);
2511   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2512
2513   if (active == FALSE) {
2514     SRC_LOCK (self);
2515     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2516     SRC_BROADCAST (self);
2517     SRC_UNLOCK (self);
2518   } else {
2519     PAD_LOCK (aggpad);
2520     aggpad->priv->flow_return = GST_FLOW_OK;
2521     PAD_BROADCAST_EVENT (aggpad);
2522     PAD_UNLOCK (aggpad);
2523   }
2524
2525   return TRUE;
2526 }
2527
2528 /***********************************
2529  * GstAggregatorPad implementation  *
2530  ************************************/
2531 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2532
2533 static void
2534 gst_aggregator_pad_constructed (GObject * object)
2535 {
2536   GstPad *pad = GST_PAD (object);
2537
2538   gst_pad_set_chain_function (pad,
2539       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2540   gst_pad_set_event_full_function_full (pad,
2541       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2542   gst_pad_set_query_function (pad,
2543       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2544   gst_pad_set_activatemode_function (pad,
2545       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2546 }
2547
2548 static void
2549 gst_aggregator_pad_finalize (GObject * object)
2550 {
2551   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2552
2553   g_cond_clear (&pad->priv->event_cond);
2554   g_mutex_clear (&pad->priv->flush_lock);
2555   g_mutex_clear (&pad->priv->lock);
2556
2557   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2558 }
2559
2560 static void
2561 gst_aggregator_pad_dispose (GObject * object)
2562 {
2563   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2564
2565   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2566
2567   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2568 }
2569
2570 static void
2571 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2572 {
2573   GObjectClass *gobject_class = (GObjectClass *) klass;
2574
2575   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
2576
2577   gobject_class->constructed = gst_aggregator_pad_constructed;
2578   gobject_class->finalize = gst_aggregator_pad_finalize;
2579   gobject_class->dispose = gst_aggregator_pad_dispose;
2580 }
2581
2582 static void
2583 gst_aggregator_pad_init (GstAggregatorPad * pad)
2584 {
2585   pad->priv =
2586       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
2587       GstAggregatorPadPrivate);
2588
2589   g_queue_init (&pad->priv->buffers);
2590   g_cond_init (&pad->priv->event_cond);
2591
2592   g_mutex_init (&pad->priv->flush_lock);
2593   g_mutex_init (&pad->priv->lock);
2594
2595   gst_aggregator_pad_reset_unlocked (pad);
2596 }
2597
2598 /* Must be called with the PAD_LOCK held */
2599 static void
2600 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
2601 {
2602   pad->priv->num_buffers--;
2603   GST_TRACE_OBJECT (pad, "Consuming buffer");
2604   if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
2605     pad->priv->pending_eos = FALSE;
2606     pad->priv->eos = TRUE;
2607   }
2608   PAD_BROADCAST_EVENT (pad);
2609 }
2610
2611 /* Must be called with the PAD_LOCK held */
2612 static void
2613 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
2614 {
2615   GstAggregator *self = NULL;
2616   GstAggregatorClass *aggclass;
2617   GstBuffer *buffer = NULL;
2618
2619   while (pad->priv->clipped_buffer == NULL &&
2620       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
2621     buffer = g_queue_pop_tail (&pad->priv->buffers);
2622
2623     apply_buffer (pad, buffer, FALSE);
2624
2625     /* We only take the parent here so that it's not taken if the buffer is
2626      * already clipped or if the queue is empty.
2627      */
2628     if (self == NULL) {
2629       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
2630       if (self == NULL) {
2631         gst_buffer_unref (buffer);
2632         return;
2633       }
2634
2635       aggclass = GST_AGGREGATOR_GET_CLASS (self);
2636     }
2637
2638     if (aggclass->clip) {
2639       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
2640
2641       buffer = aggclass->clip (self, pad, buffer);
2642
2643       if (buffer == NULL) {
2644         gst_aggregator_pad_buffer_consumed (pad);
2645         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
2646       }
2647     }
2648
2649     pad->priv->clipped_buffer = buffer;
2650   }
2651
2652   if (self)
2653     gst_object_unref (self);
2654 }
2655
2656 /**
2657  * gst_aggregator_pad_steal_buffer:
2658  * @pad: the pad to get buffer from
2659  *
2660  * Steal the ref to the buffer currently queued in @pad.
2661  *
2662  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2663  *   queued. You should unref the buffer after usage.
2664  */
2665 GstBuffer *
2666 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
2667 {
2668   GstBuffer *buffer;
2669
2670   PAD_LOCK (pad);
2671
2672   gst_aggregator_pad_clip_buffer_unlocked (pad);
2673
2674   buffer = pad->priv->clipped_buffer;
2675   pad->priv->clipped_buffer = NULL;
2676
2677   if (buffer) {
2678     gst_aggregator_pad_buffer_consumed (pad);
2679     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2680   }
2681
2682   PAD_UNLOCK (pad);
2683
2684   return buffer;
2685 }
2686
2687 /**
2688  * gst_aggregator_pad_drop_buffer:
2689  * @pad: the pad where to drop any pending buffer
2690  *
2691  * Drop the buffer currently queued in @pad.
2692  *
2693  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2694  */
2695 gboolean
2696 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2697 {
2698   GstBuffer *buf;
2699
2700   buf = gst_aggregator_pad_steal_buffer (pad);
2701
2702   if (buf == NULL)
2703     return FALSE;
2704
2705   gst_buffer_unref (buf);
2706   return TRUE;
2707 }
2708
2709 /**
2710  * gst_aggregator_pad_get_buffer:
2711  * @pad: the pad to get buffer from
2712  *
2713  * Returns: (transfer full): A reference to the buffer in @pad or
2714  * NULL if no buffer was queued. You should unref the buffer after
2715  * usage.
2716  */
2717 GstBuffer *
2718 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
2719 {
2720   GstBuffer *buffer;
2721
2722   PAD_LOCK (pad);
2723
2724   gst_aggregator_pad_clip_buffer_unlocked (pad);
2725
2726   if (pad->priv->clipped_buffer) {
2727     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
2728   } else {
2729     buffer = NULL;
2730   }
2731   PAD_UNLOCK (pad);
2732
2733   return buffer;
2734 }
2735
2736 gboolean
2737 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2738 {
2739   gboolean is_eos;
2740
2741   PAD_LOCK (pad);
2742   is_eos = pad->priv->eos;
2743   PAD_UNLOCK (pad);
2744
2745   return is_eos;
2746 }
2747
2748 /**
2749  * gst_aggregator_merge_tags:
2750  * @self: a #GstAggregator
2751  * @tags: a #GstTagList to merge
2752  * @mode: the #GstTagMergeMode to use
2753  *
2754  * Adds tags to so-called pending tags, which will be processed
2755  * before pushing out data downstream.
2756  *
2757  * Note that this is provided for convenience, and the subclass is
2758  * not required to use this and can still do tag handling on its own.
2759  *
2760  * MT safe.
2761  */
2762 void
2763 gst_aggregator_merge_tags (GstAggregator * self,
2764     const GstTagList * tags, GstTagMergeMode mode)
2765 {
2766   GstTagList *otags;
2767
2768   g_return_if_fail (GST_IS_AGGREGATOR (self));
2769   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
2770
2771   /* FIXME Check if we can use OBJECT lock here! */
2772   GST_OBJECT_LOCK (self);
2773   if (tags)
2774     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2775   otags = self->priv->tags;
2776   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2777   if (otags)
2778     gst_tag_list_unref (otags);
2779   self->priv->tags_changed = TRUE;
2780   GST_OBJECT_UNLOCK (self);
2781 }
2782
2783 /**
2784  * gst_aggregator_set_latency:
2785  * @self: a #GstAggregator
2786  * @min_latency: minimum latency
2787  * @max_latency: maximum latency
2788  *
2789  * Lets #GstAggregator sub-classes tell the baseclass what their internal
2790  * latency is. Will also post a LATENCY message on the bus so the pipeline
2791  * can reconfigure its global latency.
2792  */
2793 void
2794 gst_aggregator_set_latency (GstAggregator * self,
2795     GstClockTime min_latency, GstClockTime max_latency)
2796 {
2797   gboolean changed = FALSE;
2798
2799   g_return_if_fail (GST_IS_AGGREGATOR (self));
2800   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
2801   g_return_if_fail (max_latency >= min_latency);
2802
2803   SRC_LOCK (self);
2804   if (self->priv->sub_latency_min != min_latency) {
2805     self->priv->sub_latency_min = min_latency;
2806     changed = TRUE;
2807   }
2808   if (self->priv->sub_latency_max != max_latency) {
2809     self->priv->sub_latency_max = max_latency;
2810     changed = TRUE;
2811   }
2812
2813   if (changed)
2814     SRC_BROADCAST (self);
2815   SRC_UNLOCK (self);
2816
2817   if (changed) {
2818     gst_element_post_message (GST_ELEMENT_CAST (self),
2819         gst_message_new_latency (GST_OBJECT_CAST (self)));
2820   }
2821 }