audioaggregator: remove GstAudioAggregator->info
[platform/upstream/gstreamer.git] / gst-libs / gst / audio / gstaudioaggregator.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2001 Thomas <thomas@apestaart.org>
4  *               2005,2006 Wim Taymans <wim@fluendo.com>
5  *                    2013 Sebastian Dröge <sebastian@centricular.com>
6  *                    2014 Collabora
7  *                             Olivier Crete <olivier.crete@collabora.com>
8  *
9  * gstaudioaggregator.c:
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION: gstaudioaggregator
28  * @title: GstAudioAggregator
29  * @short_description: Base class that manages a set of audio input pads
30  * with the purpose of aggregating or mixing their raw audio input buffers
31  * @see_also: #GstAggregator, #GstAudioMixer
32  *
33  * Subclasses must use (a subclass of) #GstAudioAggregatorPad for both
34  * their source and sink pads,
35  * gst_element_class_add_static_pad_template_with_gtype() is a convenient
36  * helper.
37  *
38  * #GstAudioAggregator can perform conversion on the data arriving
39  * on its sink pads, based on the format expected downstream: in order
40  * to enable that behaviour, the GType of the sink pads must either be
41  * a (subclass of) #GstAudioAggregatorConvertPad to use the default
42  * #GstAudioConverter implementation, or a subclass of #GstAudioAggregatorPad
43  * implementing #GstAudioAggregatorPad.convert_buffer.
44  *
45  * To allow for the output caps to change, the mechanism is the same as
46  * above, with the GType of the source pad.
47  *
48  * See #GstAudioMixer for an example.
49  *
50  * When conversion is enabled, #GstAudioAggregator will accept
51  * any type of raw audio caps and perform conversion
52  * on the data arriving on its sink pads, with whatever downstream
53  * expects as the target format.
54  *
55  * In case downstream caps are not fully fixated, it will use
56  * the first configured sink pad to finish fixating its source pad
57  * caps.
58  *
59  * A notable exception for now is the sample rate, sink pads must
60  * have the same sample rate as either the downstream requirement,
61  * or the first configured pad, or a combination of both (when
62  * downstream specifies a range or a set of acceptable rates).
63  */
64
65
66 #ifdef HAVE_CONFIG_H
67 #  include "config.h"
68 #endif
69
70 #include "gstaudioaggregator.h"
71
72 #include <string.h>
73
74 GST_DEBUG_CATEGORY_STATIC (audio_aggregator_debug);
75 #define GST_CAT_DEFAULT audio_aggregator_debug
76
77 struct _GstAudioAggregatorPadPrivate
78 {
79   /* All members are protected by the pad object lock */
80
81   GstBuffer *buffer;            /* current buffer we're mixing, for
82                                    comparison with a new input buffer from
83                                    aggregator to see if we need to update our
84                                    cached values. */
85
86   guint position, size;         /* position in the input buffer and size of the
87                                    input buffer in number of samples */
88
89   GstBuffer *input_buffer;
90
91   guint64 output_offset;        /* Sample offset in output segment relative to
92                                    pad.segment.start that position refers to
93                                    in the current buffer. */
94
95   guint64 next_offset;          /* Next expected sample offset relative to
96                                    pad.segment.start */
97
98   /* Last time we noticed a discont */
99   GstClockTime discont_time;
100
101   /* A new unhandled segment event has been received */
102   gboolean new_segment;
103 };
104
105
106 /*****************************************
107  * GstAudioAggregatorPad implementation  *
108  *****************************************/
109 G_DEFINE_TYPE (GstAudioAggregatorPad, gst_audio_aggregator_pad,
110     GST_TYPE_AGGREGATOR_PAD);
111
112 enum
113 {
114   PROP_PAD_0,
115   PROP_PAD_CONVERTER_CONFIG,
116 };
117
118 static GstFlowReturn
119 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
120     GstAggregator * aggregator);
121
122 static void
123 gst_audio_aggregator_pad_finalize (GObject * object)
124 {
125   GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) object;
126
127   gst_buffer_replace (&pad->priv->buffer, NULL);
128   gst_buffer_replace (&pad->priv->input_buffer, NULL);
129
130   G_OBJECT_CLASS (gst_audio_aggregator_pad_parent_class)->finalize (object);
131 }
132
133 static void
134 gst_audio_aggregator_pad_class_init (GstAudioAggregatorPadClass * klass)
135 {
136   GObjectClass *gobject_class = (GObjectClass *) klass;
137   GstAggregatorPadClass *aggpadclass = (GstAggregatorPadClass *) klass;
138
139   g_type_class_add_private (klass, sizeof (GstAudioAggregatorPadPrivate));
140
141   gobject_class->finalize = gst_audio_aggregator_pad_finalize;
142   aggpadclass->flush = GST_DEBUG_FUNCPTR (gst_audio_aggregator_pad_flush_pad);
143 }
144
145 static void
146 gst_audio_aggregator_pad_init (GstAudioAggregatorPad * pad)
147 {
148   pad->priv =
149       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AUDIO_AGGREGATOR_PAD,
150       GstAudioAggregatorPadPrivate);
151
152   gst_audio_info_init (&pad->info);
153
154   pad->priv->buffer = NULL;
155   pad->priv->input_buffer = NULL;
156   pad->priv->position = 0;
157   pad->priv->size = 0;
158   pad->priv->output_offset = -1;
159   pad->priv->next_offset = -1;
160   pad->priv->discont_time = GST_CLOCK_TIME_NONE;
161 }
162
163
164 static GstFlowReturn
165 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
166     GstAggregator * aggregator)
167 {
168   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
169
170   GST_OBJECT_LOCK (aggpad);
171   pad->priv->position = pad->priv->size = 0;
172   pad->priv->output_offset = pad->priv->next_offset = -1;
173   pad->priv->discont_time = GST_CLOCK_TIME_NONE;
174   gst_buffer_replace (&pad->priv->buffer, NULL);
175   gst_buffer_replace (&pad->priv->input_buffer, NULL);
176   GST_OBJECT_UNLOCK (aggpad);
177
178   return GST_FLOW_OK;
179 }
180
181 struct _GstAudioAggregatorConvertPadPrivate
182 {
183   /* All members are protected by the pad object lock */
184   GstAudioConverter *converter;
185   GstStructure *converter_config;
186   gboolean converter_config_changed;
187 };
188
189
190 G_DEFINE_TYPE (GstAudioAggregatorConvertPad, gst_audio_aggregator_convert_pad,
191     GST_TYPE_AUDIO_AGGREGATOR_PAD);
192
193 static void
194 gst_audio_aggregator_convert_pad_update_converter (GstAudioAggregatorConvertPad
195     * aaggcpad, GstAudioInfo * in_info, GstAudioInfo * out_info)
196 {
197   if (!aaggcpad->priv->converter_config_changed)
198     return;
199
200   if (aaggcpad->priv->converter) {
201     gst_audio_converter_free (aaggcpad->priv->converter);
202     aaggcpad->priv->converter = NULL;
203   }
204
205   if (gst_audio_info_is_equal (in_info, out_info) ||
206       in_info->finfo->format == GST_AUDIO_FORMAT_UNKNOWN) {
207     if (aaggcpad->priv->converter) {
208       gst_audio_converter_free (aaggcpad->priv->converter);
209       aaggcpad->priv->converter = NULL;
210     }
211   } else {
212     /* If we haven't received caps yet, this pad should not have
213      * a buffer to convert anyway */
214     aaggcpad->priv->converter =
215         gst_audio_converter_new (GST_AUDIO_CONVERTER_FLAG_NONE,
216         in_info, out_info,
217         aaggcpad->priv->converter_config ? gst_structure_copy (aaggcpad->
218             priv->converter_config) : NULL);
219   }
220
221   aaggcpad->priv->converter_config_changed = FALSE;
222 }
223
224 static void
225 gst_audio_aggregator_pad_update_conversion_info (GstAudioAggregatorPad *
226     aaggpad)
227 {
228   GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad)->priv->converter_config_changed =
229       TRUE;
230 }
231
232 static GstBuffer *
233 gst_audio_aggregator_convert_pad_convert_buffer (GstAudioAggregatorPad *
234     aaggpad, GstAudioInfo * in_info, GstAudioInfo * out_info,
235     GstBuffer * input_buffer)
236 {
237   GstBuffer *res;
238   GstAudioAggregatorConvertPad *aaggcpad =
239       GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad);
240
241   gst_audio_aggregator_convert_pad_update_converter (aaggcpad, in_info,
242       out_info);
243
244   if (aaggcpad->priv->converter) {
245     gint insize = gst_buffer_get_size (input_buffer);
246     gsize insamples = insize / in_info->bpf;
247     gsize outsamples =
248         gst_audio_converter_get_out_frames (aaggcpad->priv->converter,
249         insamples);
250     gint outsize = outsamples * out_info->bpf;
251     GstMapInfo inmap, outmap;
252
253     res = gst_buffer_new_allocate (NULL, outsize, NULL);
254
255     /* We create a perfectly similar buffer, except obviously for
256      * its converted contents */
257     gst_buffer_copy_into (res, input_buffer,
258         GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS |
259         GST_BUFFER_COPY_META, 0, -1);
260
261     gst_buffer_map (input_buffer, &inmap, GST_MAP_READ);
262     gst_buffer_map (res, &outmap, GST_MAP_WRITE);
263
264     gst_audio_converter_samples (aaggcpad->priv->converter,
265         GST_AUDIO_CONVERTER_FLAG_NONE,
266         (gpointer *) & inmap.data, insamples,
267         (gpointer *) & outmap.data, outsamples);
268
269     gst_buffer_unmap (input_buffer, &inmap);
270     gst_buffer_unmap (res, &outmap);
271   } else {
272     res = gst_buffer_ref (input_buffer);
273   }
274
275   return res;
276 }
277
278 static void
279 gst_audio_aggregator_convert_pad_finalize (GObject * object)
280 {
281   GstAudioAggregatorConvertPad *pad = (GstAudioAggregatorConvertPad *) object;
282
283   if (pad->priv->converter)
284     gst_audio_converter_free (pad->priv->converter);
285
286   if (pad->priv->converter_config)
287     gst_structure_free (pad->priv->converter_config);
288
289   G_OBJECT_CLASS (gst_audio_aggregator_convert_pad_parent_class)->finalize
290       (object);
291 }
292
293 static void
294 gst_audio_aggregator_convert_pad_get_property (GObject * object, guint prop_id,
295     GValue * value, GParamSpec * pspec)
296 {
297   GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
298
299   switch (prop_id) {
300     case PROP_PAD_CONVERTER_CONFIG:
301       GST_OBJECT_LOCK (pad);
302       if (pad->priv->converter_config)
303         g_value_set_boxed (value, pad->priv->converter_config);
304       GST_OBJECT_UNLOCK (pad);
305       break;
306     default:
307       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
308       break;
309   }
310 }
311
312 static void
313 gst_audio_aggregator_convert_pad_set_property (GObject * object, guint prop_id,
314     const GValue * value, GParamSpec * pspec)
315 {
316   GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
317
318   switch (prop_id) {
319     case PROP_PAD_CONVERTER_CONFIG:
320       GST_OBJECT_LOCK (pad);
321       if (pad->priv->converter_config)
322         gst_structure_free (pad->priv->converter_config);
323       pad->priv->converter_config = g_value_dup_boxed (value);
324       pad->priv->converter_config_changed = TRUE;
325       GST_OBJECT_UNLOCK (pad);
326       break;
327     default:
328       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
329       break;
330   }
331 }
332
333 static void
334 gst_audio_aggregator_convert_pad_class_init (GstAudioAggregatorConvertPadClass *
335     klass)
336 {
337   GObjectClass *gobject_class = (GObjectClass *) klass;
338   GstAudioAggregatorPadClass *aaggpad_class =
339       (GstAudioAggregatorPadClass *) klass;
340   g_type_class_add_private (klass,
341       sizeof (GstAudioAggregatorConvertPadPrivate));
342
343   gobject_class->set_property = gst_audio_aggregator_convert_pad_set_property;
344   gobject_class->get_property = gst_audio_aggregator_convert_pad_get_property;
345
346   g_object_class_install_property (gobject_class, PROP_PAD_CONVERTER_CONFIG,
347       g_param_spec_boxed ("converter-config", "Converter configuration",
348           "A GstStructure describing the configuration that should be used "
349           "when converting this pad's audio buffers",
350           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
351
352   aaggpad_class->convert_buffer =
353       gst_audio_aggregator_convert_pad_convert_buffer;
354
355   aaggpad_class->update_conversion_info =
356       gst_audio_aggregator_pad_update_conversion_info;
357
358   gobject_class->finalize = gst_audio_aggregator_convert_pad_finalize;
359 }
360
361 static void
362 gst_audio_aggregator_convert_pad_init (GstAudioAggregatorConvertPad * pad)
363 {
364   pad->priv =
365       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AUDIO_AGGREGATOR_CONVERT_PAD,
366       GstAudioAggregatorConvertPadPrivate);
367 }
368
369 /**************************************
370  * GstAudioAggregator implementation  *
371  **************************************/
372
373 struct _GstAudioAggregatorPrivate
374 {
375   GMutex mutex;
376
377   /* All three properties are unprotected, can't be modified while streaming */
378   /* Size in frames that is output per buffer */
379   GstClockTime output_buffer_duration;
380   GstClockTime alignment_threshold;
381   GstClockTime discont_wait;
382
383   /* Protected by srcpad stream clock */
384   /* Output buffer starting at offset containing blocksize frames (calculated
385    * from output_buffer_duration) */
386   GstBuffer *current_buffer;
387
388   /* counters to keep track of timestamps */
389   /* Readable with object lock, writable with both aag lock and object lock */
390
391   /* Sample offset starting from 0 at aggregator.segment.start */
392   gint64 offset;
393 };
394
395 #define GST_AUDIO_AGGREGATOR_LOCK(self)   g_mutex_lock (&(self)->priv->mutex);
396 #define GST_AUDIO_AGGREGATOR_UNLOCK(self) g_mutex_unlock (&(self)->priv->mutex);
397
398 static void gst_audio_aggregator_set_property (GObject * object, guint prop_id,
399     const GValue * value, GParamSpec * pspec);
400 static void gst_audio_aggregator_get_property (GObject * object, guint prop_id,
401     GValue * value, GParamSpec * pspec);
402 static void gst_audio_aggregator_dispose (GObject * object);
403
404 static gboolean gst_audio_aggregator_src_event (GstAggregator * agg,
405     GstEvent * event);
406 static gboolean gst_audio_aggregator_sink_event (GstAggregator * agg,
407     GstAggregatorPad * aggpad, GstEvent * event);
408 static gboolean gst_audio_aggregator_src_query (GstAggregator * agg,
409     GstQuery * query);
410 static gboolean
411 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
412     GstQuery * query);
413 static gboolean gst_audio_aggregator_start (GstAggregator * agg);
414 static gboolean gst_audio_aggregator_stop (GstAggregator * agg);
415 static GstFlowReturn gst_audio_aggregator_flush (GstAggregator * agg);
416
417 static GstBuffer *gst_audio_aggregator_create_output_buffer (GstAudioAggregator
418     * aagg, guint num_frames);
419 static GstBuffer *gst_audio_aggregator_do_clip (GstAggregator * agg,
420     GstAggregatorPad * bpad, GstBuffer * buffer);
421 static GstFlowReturn gst_audio_aggregator_aggregate (GstAggregator * agg,
422     gboolean timeout);
423 static gboolean sync_pad_values (GstElement * aagg, GstPad * pad, gpointer ud);
424 static gboolean gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg,
425     GstCaps * caps);
426 static GstFlowReturn
427 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
428     GstCaps * caps, GstCaps ** ret);
429 static GstCaps *gst_audio_aggregator_fixate_src_caps (GstAggregator * agg,
430     GstCaps * caps);
431
432 #define DEFAULT_OUTPUT_BUFFER_DURATION (10 * GST_MSECOND)
433 #define DEFAULT_ALIGNMENT_THRESHOLD   (40 * GST_MSECOND)
434 #define DEFAULT_DISCONT_WAIT (1 * GST_SECOND)
435
436 enum
437 {
438   PROP_0,
439   PROP_OUTPUT_BUFFER_DURATION,
440   PROP_ALIGNMENT_THRESHOLD,
441   PROP_DISCONT_WAIT,
442 };
443
444 G_DEFINE_ABSTRACT_TYPE (GstAudioAggregator, gst_audio_aggregator,
445     GST_TYPE_AGGREGATOR);
446
447 static GstClockTime
448 gst_audio_aggregator_get_next_time (GstAggregator * agg)
449 {
450   GstClockTime next_time;
451
452   GST_OBJECT_LOCK (agg);
453   if (agg->segment.position == -1 || agg->segment.position < agg->segment.start)
454     next_time = agg->segment.start;
455   else
456     next_time = agg->segment.position;
457
458   if (agg->segment.stop != -1 && next_time > agg->segment.stop)
459     next_time = agg->segment.stop;
460
461   next_time =
462       gst_segment_to_running_time (&agg->segment, GST_FORMAT_TIME, next_time);
463   GST_OBJECT_UNLOCK (agg);
464
465   return next_time;
466 }
467
468 static GstBuffer *
469 gst_audio_aggregator_convert_buffer (GstAudioAggregator * aagg, GstPad * pad,
470     GstAudioInfo * in_info, GstAudioInfo * out_info, GstBuffer * buffer)
471 {
472   GstAudioAggregatorPadClass *klass = GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad);
473   GstAudioAggregatorPad *aaggpad = GST_AUDIO_AGGREGATOR_PAD (pad);
474
475   g_assert (klass->convert_buffer);
476
477   return klass->convert_buffer (aaggpad, in_info, out_info, buffer);
478 }
479
480 static void
481 gst_audio_aggregator_class_init (GstAudioAggregatorClass * klass)
482 {
483   GObjectClass *gobject_class = (GObjectClass *) klass;
484   GstAggregatorClass *gstaggregator_class = (GstAggregatorClass *) klass;
485
486   g_type_class_add_private (klass, sizeof (GstAudioAggregatorPrivate));
487
488   gobject_class->set_property = gst_audio_aggregator_set_property;
489   gobject_class->get_property = gst_audio_aggregator_get_property;
490   gobject_class->dispose = gst_audio_aggregator_dispose;
491
492   gstaggregator_class->src_event =
493       GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_event);
494   gstaggregator_class->sink_event =
495       GST_DEBUG_FUNCPTR (gst_audio_aggregator_sink_event);
496   gstaggregator_class->src_query =
497       GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_query);
498   gstaggregator_class->sink_query = gst_audio_aggregator_sink_query;
499   gstaggregator_class->start = gst_audio_aggregator_start;
500   gstaggregator_class->stop = gst_audio_aggregator_stop;
501   gstaggregator_class->flush = gst_audio_aggregator_flush;
502   gstaggregator_class->aggregate =
503       GST_DEBUG_FUNCPTR (gst_audio_aggregator_aggregate);
504   gstaggregator_class->clip = GST_DEBUG_FUNCPTR (gst_audio_aggregator_do_clip);
505   gstaggregator_class->get_next_time = gst_audio_aggregator_get_next_time;
506   gstaggregator_class->update_src_caps =
507       GST_DEBUG_FUNCPTR (gst_audio_aggregator_update_src_caps);
508   gstaggregator_class->fixate_src_caps = gst_audio_aggregator_fixate_src_caps;
509   gstaggregator_class->negotiated_src_caps =
510       gst_audio_aggregator_negotiated_src_caps;
511
512   klass->create_output_buffer = gst_audio_aggregator_create_output_buffer;
513
514   GST_DEBUG_CATEGORY_INIT (audio_aggregator_debug, "audioaggregator",
515       GST_DEBUG_FG_MAGENTA, "GstAudioAggregator");
516
517   g_object_class_install_property (gobject_class, PROP_OUTPUT_BUFFER_DURATION,
518       g_param_spec_uint64 ("output-buffer-duration", "Output Buffer Duration",
519           "Output block size in nanoseconds", 1,
520           G_MAXUINT64, DEFAULT_OUTPUT_BUFFER_DURATION,
521           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
522
523   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
524       g_param_spec_uint64 ("alignment-threshold", "Alignment Threshold",
525           "Timestamp alignment threshold in nanoseconds", 0,
526           G_MAXUINT64 - 1, DEFAULT_ALIGNMENT_THRESHOLD,
527           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
528
529   g_object_class_install_property (gobject_class, PROP_DISCONT_WAIT,
530       g_param_spec_uint64 ("discont-wait", "Discont Wait",
531           "Window of time in nanoseconds to wait before "
532           "creating a discontinuity", 0,
533           G_MAXUINT64 - 1, DEFAULT_DISCONT_WAIT,
534           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
535 }
536
537 static void
538 gst_audio_aggregator_init (GstAudioAggregator * aagg)
539 {
540   aagg->priv =
541       G_TYPE_INSTANCE_GET_PRIVATE (aagg, GST_TYPE_AUDIO_AGGREGATOR,
542       GstAudioAggregatorPrivate);
543
544   g_mutex_init (&aagg->priv->mutex);
545
546   aagg->priv->output_buffer_duration = DEFAULT_OUTPUT_BUFFER_DURATION;
547   aagg->priv->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
548   aagg->priv->discont_wait = DEFAULT_DISCONT_WAIT;
549
550   aagg->current_caps = NULL;
551
552   gst_aggregator_set_latency (GST_AGGREGATOR (aagg),
553       aagg->priv->output_buffer_duration, aagg->priv->output_buffer_duration);
554 }
555
556 static void
557 gst_audio_aggregator_dispose (GObject * object)
558 {
559   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
560
561   gst_caps_replace (&aagg->current_caps, NULL);
562
563   g_mutex_clear (&aagg->priv->mutex);
564
565   G_OBJECT_CLASS (gst_audio_aggregator_parent_class)->dispose (object);
566 }
567
568 static void
569 gst_audio_aggregator_set_property (GObject * object, guint prop_id,
570     const GValue * value, GParamSpec * pspec)
571 {
572   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
573
574   switch (prop_id) {
575     case PROP_OUTPUT_BUFFER_DURATION:
576       aagg->priv->output_buffer_duration = g_value_get_uint64 (value);
577       gst_aggregator_set_latency (GST_AGGREGATOR (aagg),
578           aagg->priv->output_buffer_duration,
579           aagg->priv->output_buffer_duration);
580       break;
581     case PROP_ALIGNMENT_THRESHOLD:
582       aagg->priv->alignment_threshold = g_value_get_uint64 (value);
583       break;
584     case PROP_DISCONT_WAIT:
585       aagg->priv->discont_wait = g_value_get_uint64 (value);
586       break;
587     default:
588       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
589       break;
590   }
591 }
592
593 static void
594 gst_audio_aggregator_get_property (GObject * object, guint prop_id,
595     GValue * value, GParamSpec * pspec)
596 {
597   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
598
599   switch (prop_id) {
600     case PROP_OUTPUT_BUFFER_DURATION:
601       g_value_set_uint64 (value, aagg->priv->output_buffer_duration);
602       break;
603     case PROP_ALIGNMENT_THRESHOLD:
604       g_value_set_uint64 (value, aagg->priv->alignment_threshold);
605       break;
606     case PROP_DISCONT_WAIT:
607       g_value_set_uint64 (value, aagg->priv->discont_wait);
608       break;
609     default:
610       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
611       break;
612   }
613 }
614
615 /* Caps negotiation */
616
617 /* Unref after usage */
618 static GstAudioAggregatorPad *
619 gst_audio_aggregator_get_first_configured_pad (GstAggregator * agg)
620 {
621   GstAudioAggregatorPad *res = NULL;
622   GList *l;
623
624   GST_OBJECT_LOCK (agg);
625   for (l = GST_ELEMENT (agg)->sinkpads; l; l = l->next) {
626     GstAudioAggregatorPad *aaggpad = l->data;
627
628     if (GST_AUDIO_INFO_FORMAT (&aaggpad->info) != GST_AUDIO_FORMAT_UNKNOWN) {
629       res = gst_object_ref (aaggpad);
630       break;
631     }
632   }
633   GST_OBJECT_UNLOCK (agg);
634
635   return res;
636 }
637
638 static GstCaps *
639 gst_audio_aggregator_sink_getcaps (GstPad * pad, GstAggregator * agg,
640     GstCaps * filter)
641 {
642   GstAudioAggregatorPad *first_configured_pad =
643       gst_audio_aggregator_get_first_configured_pad (agg);
644   GstCaps *sink_template_caps = gst_pad_get_pad_template_caps (pad);
645   GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
646   GstCaps *sink_caps;
647   GstStructure *s, *s2;
648   gint downstream_rate;
649
650   sink_template_caps = gst_caps_make_writable (sink_template_caps);
651   s = gst_caps_get_structure (sink_template_caps, 0);
652
653   if (downstream_caps && !gst_caps_is_empty (downstream_caps))
654     s2 = gst_caps_get_structure (downstream_caps, 0);
655   else
656     s2 = NULL;
657
658   if (s2 && gst_structure_get_int (s2, "rate", &downstream_rate)) {
659     gst_structure_fixate_field_nearest_int (s, "rate", downstream_rate);
660   } else if (first_configured_pad) {
661     gst_structure_fixate_field_nearest_int (s, "rate",
662         first_configured_pad->info.rate);
663   }
664
665   if (first_configured_pad)
666     gst_object_unref (first_configured_pad);
667
668   sink_caps = filter ? gst_caps_intersect (sink_template_caps,
669       filter) : gst_caps_ref (sink_template_caps);
670
671   GST_INFO_OBJECT (pad, "Getting caps with filter %" GST_PTR_FORMAT, filter);
672   GST_DEBUG_OBJECT (pad, "sink template caps : %" GST_PTR_FORMAT,
673       sink_template_caps);
674   GST_DEBUG_OBJECT (pad, "downstream caps %" GST_PTR_FORMAT, downstream_caps);
675   GST_INFO_OBJECT (pad, "returned sink caps : %" GST_PTR_FORMAT, sink_caps);
676
677   gst_caps_unref (sink_template_caps);
678
679   if (downstream_caps)
680     gst_caps_unref (downstream_caps);
681
682   return sink_caps;
683 }
684
685 static gboolean
686 gst_audio_aggregator_sink_setcaps (GstAudioAggregatorPad * aaggpad,
687     GstAggregator * agg, GstCaps * caps)
688 {
689   GstAudioAggregatorPad *first_configured_pad =
690       gst_audio_aggregator_get_first_configured_pad (agg);
691   GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
692   GstAudioInfo info;
693   gboolean ret = TRUE;
694   gint downstream_rate;
695   GstStructure *s;
696
697   if (!downstream_caps || gst_caps_is_empty (downstream_caps)) {
698     ret = FALSE;
699     goto done;
700   }
701
702   gst_audio_info_from_caps (&info, caps);
703   s = gst_caps_get_structure (downstream_caps, 0);
704
705   /* TODO: handle different rates on sinkpads, a bit complex
706    * because offsets will have to be updated, and audio resampling
707    * has a latency to take into account
708    */
709   if ((gst_structure_get_int (s, "rate", &downstream_rate)
710           && info.rate != downstream_rate) || (first_configured_pad
711           && info.rate != first_configured_pad->info.rate)) {
712     gst_pad_push_event (GST_PAD (aaggpad), gst_event_new_reconfigure ());
713     ret = FALSE;
714   } else {
715     GstAudioAggregatorPadClass *klass =
716         GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (aaggpad);
717     GST_OBJECT_LOCK (aaggpad);
718     gst_audio_info_from_caps (&aaggpad->info, caps);
719     if (klass->update_conversion_info)
720       klass->update_conversion_info (aaggpad);
721     GST_OBJECT_UNLOCK (aaggpad);
722   }
723
724 done:
725   if (first_configured_pad)
726     gst_object_unref (first_configured_pad);
727
728   if (downstream_caps)
729     gst_caps_unref (downstream_caps);
730
731   return ret;
732 }
733
734 static GstFlowReturn
735 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
736     GstCaps * caps, GstCaps ** ret)
737 {
738   GstCaps *src_template_caps = gst_pad_get_pad_template_caps (agg->srcpad);
739   GstCaps *downstream_caps =
740       gst_pad_peer_query_caps (agg->srcpad, src_template_caps);
741
742   gst_caps_unref (src_template_caps);
743
744   *ret = gst_caps_intersect (caps, downstream_caps);
745
746   GST_INFO ("Updated src caps to %" GST_PTR_FORMAT, *ret);
747
748   if (downstream_caps)
749     gst_caps_unref (downstream_caps);
750
751   return GST_FLOW_OK;
752 }
753
754 /* At that point if the caps are not fixed, this means downstream
755  * didn't have fully specified requirements, we'll just go ahead
756  * and fixate raw audio fields using our first configured pad, we don't for
757  * now need a more complicated heuristic
758  */
759 static GstCaps *
760 gst_audio_aggregator_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
761 {
762   GstAudioAggregatorPad *first_configured_pad;
763
764   if (!GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (agg->srcpad)->convert_buffer)
765     return
766         GST_AGGREGATOR_CLASS
767         (gst_audio_aggregator_parent_class)->fixate_src_caps (agg, caps);
768
769   first_configured_pad = gst_audio_aggregator_get_first_configured_pad (agg);
770
771   if (first_configured_pad) {
772     GstStructure *s, *s2;
773     GstCaps *first_configured_caps =
774         gst_audio_info_to_caps (&first_configured_pad->info);
775     gint first_configured_rate, first_configured_channels;
776
777     caps = gst_caps_make_writable (caps);
778     s = gst_caps_get_structure (caps, 0);
779     s2 = gst_caps_get_structure (first_configured_caps, 0);
780
781     gst_structure_get_int (s2, "rate", &first_configured_rate);
782     gst_structure_get_int (s2, "channels", &first_configured_channels);
783
784     gst_structure_fixate_field_string (s, "format",
785         gst_structure_get_string (s2, "format"));
786     gst_structure_fixate_field_string (s, "layout",
787         gst_structure_get_string (s2, "layout"));
788     gst_structure_fixate_field_nearest_int (s, "rate", first_configured_rate);
789     gst_structure_fixate_field_nearest_int (s, "channels",
790         first_configured_channels);
791
792     gst_caps_unref (first_configured_caps);
793     gst_object_unref (first_configured_pad);
794   }
795
796   if (!gst_caps_is_fixed (caps))
797     caps = gst_caps_fixate (caps);
798
799   GST_INFO_OBJECT (agg, "Fixated src caps to %" GST_PTR_FORMAT, caps);
800
801   return caps;
802 }
803
804 /* Must be called with OBJECT_LOCK taken */
805 static void
806 gst_audio_aggregator_update_converters (GstAudioAggregator * aagg,
807     GstAudioInfo * new_info)
808 {
809   GList *l;
810
811   for (l = GST_ELEMENT (aagg)->sinkpads; l; l = l->next) {
812     GstAudioAggregatorPad *aaggpad = l->data;
813     GstAudioAggregatorPadClass *klass =
814         GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (aaggpad);
815
816     if (klass->update_conversion_info)
817       klass->update_conversion_info (aaggpad);
818
819     /* If we currently were mixing a buffer, we need to convert it to the new
820      * format */
821     if (aaggpad->priv->buffer) {
822       GstBuffer *new_converted_buffer =
823           gst_audio_aggregator_convert_buffer (aagg, GST_PAD (aaggpad),
824           &aaggpad->info, new_info, aaggpad->priv->input_buffer);
825       gst_buffer_replace (&aaggpad->priv->buffer, new_converted_buffer);
826     }
827   }
828 }
829
830 /* We now have our final output caps, we can create the required converters */
831 static gboolean
832 gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
833 {
834   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
835   GstAudioInfo info;
836   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
837
838   GST_INFO_OBJECT (agg, "src caps negotiated %" GST_PTR_FORMAT, caps);
839
840   if (!gst_audio_info_from_caps (&info, caps)) {
841     GST_WARNING_OBJECT (aagg, "Rejecting invalid caps: %" GST_PTR_FORMAT, caps);
842     return FALSE;
843   }
844
845   GST_AUDIO_AGGREGATOR_LOCK (aagg);
846   GST_OBJECT_LOCK (aagg);
847
848   if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (agg->srcpad)->convert_buffer) {
849     gst_audio_aggregator_update_converters (aagg, &info);
850
851     if (aagg->priv->current_buffer
852         && !gst_audio_info_is_equal (&srcpad->info, &info)) {
853       GstBuffer *converted;
854       GstAudioAggregatorPadClass *klass =
855           GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (agg->srcpad);
856
857       if (klass->update_conversion_info)
858         klass->update_conversion_info (GST_AUDIO_AGGREGATOR_PAD (agg->srcpad));
859
860       converted =
861           gst_audio_aggregator_convert_buffer (aagg, agg->srcpad, &srcpad->info,
862           &info, aagg->priv->current_buffer);
863       gst_buffer_unref (aagg->priv->current_buffer);
864       aagg->priv->current_buffer = converted;
865     }
866   }
867
868   if (!gst_audio_info_is_equal (&info, &srcpad->info)) {
869     GST_INFO_OBJECT (aagg, "setting caps to %" GST_PTR_FORMAT, caps);
870     gst_caps_replace (&aagg->current_caps, caps);
871
872     memcpy (&srcpad->info, &info, sizeof (info));
873   }
874
875   GST_OBJECT_UNLOCK (aagg);
876   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
877
878   return
879       GST_AGGREGATOR_CLASS
880       (gst_audio_aggregator_parent_class)->negotiated_src_caps (agg, caps);
881 }
882
883 /* event handling */
884
885 static gboolean
886 gst_audio_aggregator_src_event (GstAggregator * agg, GstEvent * event)
887 {
888   gboolean result;
889
890   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
891   GST_DEBUG_OBJECT (agg->srcpad, "Got %s event on src pad",
892       GST_EVENT_TYPE_NAME (event));
893
894   switch (GST_EVENT_TYPE (event)) {
895     case GST_EVENT_QOS:
896       /* QoS might be tricky */
897       gst_event_unref (event);
898       return FALSE;
899     case GST_EVENT_NAVIGATION:
900       /* navigation is rather pointless. */
901       gst_event_unref (event);
902       return FALSE;
903       break;
904     case GST_EVENT_SEEK:
905     {
906       GstSeekFlags flags;
907       gdouble rate;
908       GstSeekType start_type, stop_type;
909       gint64 start, stop;
910       GstFormat seek_format, dest_format;
911
912       /* parse the seek parameters */
913       gst_event_parse_seek (event, &rate, &seek_format, &flags, &start_type,
914           &start, &stop_type, &stop);
915
916       /* Check the seeking parameters before linking up */
917       if ((start_type != GST_SEEK_TYPE_NONE)
918           && (start_type != GST_SEEK_TYPE_SET)) {
919         result = FALSE;
920         GST_DEBUG_OBJECT (aagg,
921             "seeking failed, unhandled seek type for start: %d", start_type);
922         goto done;
923       }
924       if ((stop_type != GST_SEEK_TYPE_NONE) && (stop_type != GST_SEEK_TYPE_SET)) {
925         result = FALSE;
926         GST_DEBUG_OBJECT (aagg,
927             "seeking failed, unhandled seek type for end: %d", stop_type);
928         goto done;
929       }
930
931       GST_OBJECT_LOCK (agg);
932       dest_format = agg->segment.format;
933       GST_OBJECT_UNLOCK (agg);
934       if (seek_format != dest_format) {
935         result = FALSE;
936         GST_DEBUG_OBJECT (aagg,
937             "seeking failed, unhandled seek format: %s",
938             gst_format_get_name (seek_format));
939         goto done;
940       }
941     }
942       break;
943     default:
944       break;
945   }
946
947   return
948       GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_event (agg,
949       event);
950
951 done:
952   return result;
953 }
954
955
956 static gboolean
957 gst_audio_aggregator_sink_event (GstAggregator * agg,
958     GstAggregatorPad * aggpad, GstEvent * event)
959 {
960   GstAudioAggregatorPad *aaggpad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
961   gboolean res = TRUE;
962
963   GST_DEBUG_OBJECT (aggpad, "Got %s event on sink pad",
964       GST_EVENT_TYPE_NAME (event));
965
966   switch (GST_EVENT_TYPE (event)) {
967     case GST_EVENT_SEGMENT:
968     {
969       const GstSegment *segment;
970       gst_event_parse_segment (event, &segment);
971
972       if (segment->format != GST_FORMAT_TIME) {
973         GST_ERROR_OBJECT (agg, "Segment of type %s are not supported,"
974             " only TIME segments are supported",
975             gst_format_get_name (segment->format));
976         gst_event_unref (event);
977         event = NULL;
978         res = FALSE;
979         break;
980       }
981
982       GST_OBJECT_LOCK (agg);
983       if (segment->rate != agg->segment.rate) {
984         GST_ERROR_OBJECT (aggpad,
985             "Got segment event with wrong rate %lf, expected %lf",
986             segment->rate, agg->segment.rate);
987         res = FALSE;
988         gst_event_unref (event);
989         event = NULL;
990       } else if (segment->rate < 0.0) {
991         GST_ERROR_OBJECT (aggpad, "Negative rates not supported yet");
992         res = FALSE;
993         gst_event_unref (event);
994         event = NULL;
995       } else {
996         GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
997
998         GST_OBJECT_LOCK (pad);
999         pad->priv->new_segment = TRUE;
1000         GST_OBJECT_UNLOCK (pad);
1001       }
1002       GST_OBJECT_UNLOCK (agg);
1003
1004       break;
1005     }
1006     case GST_EVENT_CAPS:
1007     {
1008       GstCaps *caps;
1009
1010       gst_event_parse_caps (event, &caps);
1011       GST_INFO_OBJECT (aggpad, "Got caps %" GST_PTR_FORMAT, caps);
1012       res = gst_audio_aggregator_sink_setcaps (aaggpad, agg, caps);
1013       gst_event_unref (event);
1014       event = NULL;
1015       break;
1016     }
1017     default:
1018       break;
1019   }
1020
1021   if (event != NULL)
1022     return
1023         GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_event
1024         (agg, aggpad, event);
1025
1026   return res;
1027 }
1028
1029 static gboolean
1030 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
1031     GstQuery * query)
1032 {
1033   gboolean res = FALSE;
1034
1035   switch (GST_QUERY_TYPE (query)) {
1036     case GST_QUERY_CAPS:
1037     {
1038       GstCaps *filter, *caps;
1039
1040       gst_query_parse_caps (query, &filter);
1041       caps = gst_audio_aggregator_sink_getcaps (GST_PAD (aggpad), agg, filter);
1042       gst_query_set_caps_result (query, caps);
1043       gst_caps_unref (caps);
1044       res = TRUE;
1045       break;
1046     }
1047     default:
1048       res =
1049           GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_query
1050           (agg, aggpad, query);
1051       break;
1052   }
1053
1054   return res;
1055 }
1056
1057
1058 /* FIXME, the duration query should reflect how long you will produce
1059  * data, that is the amount of stream time until you will emit EOS.
1060  *
1061  * For synchronized mixing this is always the max of all the durations
1062  * of upstream since we emit EOS when all of them finished.
1063  *
1064  * We don't do synchronized mixing so this really depends on where the
1065  * streams where punched in and what their relative offsets are against
1066  * eachother which we can get from the first timestamps we see.
1067  *
1068  * When we add a new stream (or remove a stream) the duration might
1069  * also become invalid again and we need to post a new DURATION
1070  * message to notify this fact to the parent.
1071  * For now we take the max of all the upstream elements so the simple
1072  * cases work at least somewhat.
1073  */
1074 static gboolean
1075 gst_audio_aggregator_query_duration (GstAudioAggregator * aagg,
1076     GstQuery * query)
1077 {
1078   gint64 max;
1079   gboolean res;
1080   GstFormat format;
1081   GstIterator *it;
1082   gboolean done;
1083   GValue item = { 0, };
1084
1085   /* parse format */
1086   gst_query_parse_duration (query, &format, NULL);
1087
1088   max = -1;
1089   res = TRUE;
1090   done = FALSE;
1091
1092   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (aagg));
1093   while (!done) {
1094     GstIteratorResult ires;
1095
1096     ires = gst_iterator_next (it, &item);
1097     switch (ires) {
1098       case GST_ITERATOR_DONE:
1099         done = TRUE;
1100         break;
1101       case GST_ITERATOR_OK:
1102       {
1103         GstPad *pad = g_value_get_object (&item);
1104         gint64 duration;
1105
1106         /* ask sink peer for duration */
1107         res &= gst_pad_peer_query_duration (pad, format, &duration);
1108         /* take max from all valid return values */
1109         if (res) {
1110           /* valid unknown length, stop searching */
1111           if (duration == -1) {
1112             max = duration;
1113             done = TRUE;
1114           }
1115           /* else see if bigger than current max */
1116           else if (duration > max)
1117             max = duration;
1118         }
1119         g_value_reset (&item);
1120         break;
1121       }
1122       case GST_ITERATOR_RESYNC:
1123         max = -1;
1124         res = TRUE;
1125         gst_iterator_resync (it);
1126         break;
1127       default:
1128         res = FALSE;
1129         done = TRUE;
1130         break;
1131     }
1132   }
1133   g_value_unset (&item);
1134   gst_iterator_free (it);
1135
1136   if (res) {
1137     /* and store the max */
1138     GST_DEBUG_OBJECT (aagg, "Total duration in format %s: %"
1139         GST_TIME_FORMAT, gst_format_get_name (format), GST_TIME_ARGS (max));
1140     gst_query_set_duration (query, format, max);
1141   }
1142
1143   return res;
1144 }
1145
1146
1147 static gboolean
1148 gst_audio_aggregator_src_query (GstAggregator * agg, GstQuery * query)
1149 {
1150   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1151   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1152   gboolean res = FALSE;
1153
1154   switch (GST_QUERY_TYPE (query)) {
1155     case GST_QUERY_DURATION:
1156       res = gst_audio_aggregator_query_duration (aagg, query);
1157       break;
1158     case GST_QUERY_POSITION:
1159     {
1160       GstFormat format;
1161
1162       gst_query_parse_position (query, &format, NULL);
1163
1164       GST_OBJECT_LOCK (aagg);
1165
1166       switch (format) {
1167         case GST_FORMAT_TIME:
1168           gst_query_set_position (query, format,
1169               gst_segment_to_stream_time (&agg->segment, GST_FORMAT_TIME,
1170                   agg->segment.position));
1171           res = TRUE;
1172           break;
1173         case GST_FORMAT_BYTES:
1174           if (GST_AUDIO_INFO_BPF (&srcpad->info)) {
1175             gst_query_set_position (query, format, aagg->priv->offset *
1176                 GST_AUDIO_INFO_BPF (&srcpad->info));
1177             res = TRUE;
1178           }
1179           break;
1180         case GST_FORMAT_DEFAULT:
1181           gst_query_set_position (query, format, aagg->priv->offset);
1182           res = TRUE;
1183           break;
1184         default:
1185           break;
1186       }
1187
1188       GST_OBJECT_UNLOCK (aagg);
1189
1190       break;
1191     }
1192     default:
1193       res =
1194           GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_query
1195           (agg, query);
1196       break;
1197   }
1198
1199   return res;
1200 }
1201
1202
1203 void
1204 gst_audio_aggregator_set_sink_caps (GstAudioAggregator * aagg,
1205     GstAudioAggregatorPad * pad, GstCaps * caps)
1206 {
1207 #ifndef G_DISABLE_ASSERT
1208   gboolean valid;
1209
1210   GST_OBJECT_LOCK (pad);
1211   valid = gst_audio_info_from_caps (&pad->info, caps);
1212   g_assert (valid);
1213   GST_OBJECT_UNLOCK (pad);
1214 #else
1215   GST_OBJECT_LOCK (pad);
1216   (void) gst_audio_info_from_caps (&pad->info, caps);
1217   GST_OBJECT_UNLOCK (pad);
1218 #endif
1219 }
1220
1221 /* Must hold object lock and aagg lock to call */
1222
1223 static void
1224 gst_audio_aggregator_reset (GstAudioAggregator * aagg)
1225 {
1226   GstAggregator *agg = GST_AGGREGATOR (aagg);
1227
1228   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1229   GST_OBJECT_LOCK (aagg);
1230   agg->segment.position = -1;
1231   aagg->priv->offset = -1;
1232   gst_audio_info_init (&GST_AUDIO_AGGREGATOR_PAD (agg->srcpad)->info);
1233   gst_caps_replace (&aagg->current_caps, NULL);
1234   gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1235   GST_OBJECT_UNLOCK (aagg);
1236   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1237 }
1238
1239 static gboolean
1240 gst_audio_aggregator_start (GstAggregator * agg)
1241 {
1242   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1243
1244   gst_audio_aggregator_reset (aagg);
1245
1246   return TRUE;
1247 }
1248
1249 static gboolean
1250 gst_audio_aggregator_stop (GstAggregator * agg)
1251 {
1252   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1253
1254   gst_audio_aggregator_reset (aagg);
1255
1256   return TRUE;
1257 }
1258
1259 static GstFlowReturn
1260 gst_audio_aggregator_flush (GstAggregator * agg)
1261 {
1262   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1263
1264   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1265   GST_OBJECT_LOCK (aagg);
1266   agg->segment.position = -1;
1267   aagg->priv->offset = -1;
1268   gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1269   GST_OBJECT_UNLOCK (aagg);
1270   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1271
1272   return GST_FLOW_OK;
1273 }
1274
1275 static GstBuffer *
1276 gst_audio_aggregator_do_clip (GstAggregator * agg,
1277     GstAggregatorPad * bpad, GstBuffer * buffer)
1278 {
1279   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (bpad);
1280   gint rate, bpf;
1281
1282   rate = GST_AUDIO_INFO_RATE (&pad->info);
1283   bpf = GST_AUDIO_INFO_BPF (&pad->info);
1284
1285   GST_OBJECT_LOCK (bpad);
1286   buffer = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf);
1287   GST_OBJECT_UNLOCK (bpad);
1288
1289   return buffer;
1290 }
1291
1292 /* Called with the object lock for both the element and pad held,
1293  * as well as the aagg lock
1294  *
1295  * Replace the current buffer with input and update GstAudioAggregatorPadPrivate
1296  * values.
1297  */
1298 static gboolean
1299 gst_audio_aggregator_fill_buffer (GstAudioAggregator * aagg,
1300     GstAudioAggregatorPad * pad)
1301 {
1302   GstClockTime start_time, end_time;
1303   gboolean discont = FALSE;
1304   guint64 start_offset, end_offset;
1305   gint rate, bpf;
1306
1307   GstAggregator *agg = GST_AGGREGATOR (aagg);
1308   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1309   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1310
1311   if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer) {
1312     rate = GST_AUDIO_INFO_RATE (&srcpad->info);
1313     bpf = GST_AUDIO_INFO_BPF (&srcpad->info);
1314   } else {
1315     rate = GST_AUDIO_INFO_RATE (&pad->info);
1316     bpf = GST_AUDIO_INFO_BPF (&pad->info);
1317   }
1318
1319   pad->priv->position = 0;
1320   pad->priv->size = gst_buffer_get_size (pad->priv->buffer) / bpf;
1321
1322   if (pad->priv->size == 0) {
1323     if (!GST_BUFFER_DURATION_IS_VALID (pad->priv->buffer) ||
1324         !GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_GAP)) {
1325       GST_WARNING_OBJECT (pad, "Dropping 0-sized buffer missing either a"
1326           " duration or a GAP flag: %" GST_PTR_FORMAT, pad->priv->buffer);
1327       return FALSE;
1328     }
1329
1330     pad->priv->size =
1331         gst_util_uint64_scale (GST_BUFFER_DURATION (pad->priv->buffer), rate,
1332         GST_SECOND);
1333   }
1334
1335   if (!GST_BUFFER_PTS_IS_VALID (pad->priv->buffer)) {
1336     if (pad->priv->output_offset == -1)
1337       pad->priv->output_offset = aagg->priv->offset;
1338     if (pad->priv->next_offset == -1)
1339       pad->priv->next_offset = pad->priv->size;
1340     else
1341       pad->priv->next_offset += pad->priv->size;
1342     goto done;
1343   }
1344
1345   start_time = GST_BUFFER_PTS (pad->priv->buffer);
1346   end_time =
1347       start_time + gst_util_uint64_scale_ceil (pad->priv->size, GST_SECOND,
1348       rate);
1349
1350   /* Clipping should've ensured this */
1351   g_assert (start_time >= aggpad->segment.start);
1352
1353   start_offset =
1354       gst_util_uint64_scale (start_time - aggpad->segment.start, rate,
1355       GST_SECOND);
1356   end_offset = start_offset + pad->priv->size;
1357
1358   if (GST_BUFFER_IS_DISCONT (pad->priv->buffer)
1359       || GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_RESYNC)
1360       || pad->priv->new_segment || pad->priv->next_offset == -1) {
1361     discont = TRUE;
1362     pad->priv->new_segment = FALSE;
1363   } else {
1364     guint64 diff, max_sample_diff;
1365
1366     /* Check discont, based on audiobasesink */
1367     if (start_offset <= pad->priv->next_offset)
1368       diff = pad->priv->next_offset - start_offset;
1369     else
1370       diff = start_offset - pad->priv->next_offset;
1371
1372     max_sample_diff =
1373         gst_util_uint64_scale_int (aagg->priv->alignment_threshold, rate,
1374         GST_SECOND);
1375
1376     /* Discont! */
1377     if (G_UNLIKELY (diff >= max_sample_diff)) {
1378       if (aagg->priv->discont_wait > 0) {
1379         if (pad->priv->discont_time == GST_CLOCK_TIME_NONE) {
1380           pad->priv->discont_time = start_time;
1381         } else if (start_time - pad->priv->discont_time >=
1382             aagg->priv->discont_wait) {
1383           discont = TRUE;
1384           pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1385         }
1386       } else {
1387         discont = TRUE;
1388       }
1389     } else if (G_UNLIKELY (pad->priv->discont_time != GST_CLOCK_TIME_NONE)) {
1390       /* we have had a discont, but are now back on track! */
1391       pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1392     }
1393   }
1394
1395   if (discont) {
1396     /* Have discont, need resync */
1397     if (pad->priv->next_offset != -1)
1398       GST_DEBUG_OBJECT (pad, "Have discont. Expected %"
1399           G_GUINT64_FORMAT ", got %" G_GUINT64_FORMAT,
1400           pad->priv->next_offset, start_offset);
1401     pad->priv->output_offset = -1;
1402     pad->priv->next_offset = end_offset;
1403   } else {
1404     pad->priv->next_offset += pad->priv->size;
1405   }
1406
1407   if (pad->priv->output_offset == -1) {
1408     GstClockTime start_running_time;
1409     GstClockTime end_running_time;
1410     GstClockTime segment_pos;
1411     guint64 start_output_offset = -1;
1412     guint64 end_output_offset = -1;
1413
1414     start_running_time =
1415         gst_segment_to_running_time (&aggpad->segment,
1416         GST_FORMAT_TIME, start_time);
1417     end_running_time =
1418         gst_segment_to_running_time (&aggpad->segment,
1419         GST_FORMAT_TIME, end_time);
1420
1421     /* Convert to position in the output segment */
1422     segment_pos =
1423         gst_segment_position_from_running_time (&agg->segment, GST_FORMAT_TIME,
1424         start_running_time);
1425     if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1426       start_output_offset =
1427           gst_util_uint64_scale (segment_pos - agg->segment.start, rate,
1428           GST_SECOND);
1429
1430     segment_pos =
1431         gst_segment_position_from_running_time (&agg->segment, GST_FORMAT_TIME,
1432         end_running_time);
1433     if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1434       end_output_offset =
1435           gst_util_uint64_scale (segment_pos - agg->segment.start, rate,
1436           GST_SECOND);
1437
1438     if (start_output_offset == -1 && end_output_offset == -1) {
1439       /* Outside output segment, drop */
1440       pad->priv->position = 0;
1441       pad->priv->size = 0;
1442       pad->priv->output_offset = -1;
1443       GST_DEBUG_OBJECT (pad, "Buffer outside output segment");
1444       return FALSE;
1445     }
1446
1447     /* Calculate end_output_offset if it was outside the output segment */
1448     if (end_output_offset == -1)
1449       end_output_offset = start_output_offset + pad->priv->size;
1450
1451     if (end_output_offset < aagg->priv->offset) {
1452       pad->priv->position = 0;
1453       pad->priv->size = 0;
1454       pad->priv->output_offset = -1;
1455       GST_DEBUG_OBJECT (pad,
1456           "Buffer before segment or current position: %" G_GUINT64_FORMAT " < %"
1457           G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1458       return FALSE;
1459     }
1460
1461     if (start_output_offset == -1 || start_output_offset < aagg->priv->offset) {
1462       guint diff;
1463
1464       if (start_output_offset == -1 && end_output_offset < pad->priv->size) {
1465         diff = pad->priv->size - end_output_offset + aagg->priv->offset;
1466       } else if (start_output_offset == -1) {
1467         start_output_offset = end_output_offset - pad->priv->size;
1468
1469         if (start_output_offset < aagg->priv->offset)
1470           diff = aagg->priv->offset - start_output_offset;
1471         else
1472           diff = 0;
1473       } else {
1474         diff = aagg->priv->offset - start_output_offset;
1475       }
1476
1477       pad->priv->position += diff;
1478       if (pad->priv->position >= pad->priv->size) {
1479         /* Empty buffer, drop */
1480         pad->priv->position = 0;
1481         pad->priv->size = 0;
1482         pad->priv->output_offset = -1;
1483         GST_DEBUG_OBJECT (pad,
1484             "Buffer before segment or current position: %" G_GUINT64_FORMAT
1485             " < %" G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1486         return FALSE;
1487       }
1488     }
1489
1490     if (start_output_offset == -1 || start_output_offset < aagg->priv->offset)
1491       pad->priv->output_offset = aagg->priv->offset;
1492     else
1493       pad->priv->output_offset = start_output_offset;
1494
1495     GST_DEBUG_OBJECT (pad,
1496         "Buffer resynced: Pad offset %" G_GUINT64_FORMAT
1497         ", current audio aggregator offset %" G_GINT64_FORMAT,
1498         pad->priv->output_offset, aagg->priv->offset);
1499   }
1500
1501 done:
1502
1503   GST_LOG_OBJECT (pad,
1504       "Queued new buffer at offset %" G_GUINT64_FORMAT,
1505       pad->priv->output_offset);
1506
1507   return TRUE;
1508 }
1509
1510 /* Called with pad object lock held */
1511
1512 static gboolean
1513 gst_audio_aggregator_mix_buffer (GstAudioAggregator * aagg,
1514     GstAudioAggregatorPad * pad, GstBuffer * inbuf, GstBuffer * outbuf,
1515     guint blocksize)
1516 {
1517   guint overlap;
1518   guint out_start;
1519   gboolean filled;
1520   guint in_offset;
1521   gboolean pad_changed = FALSE;
1522
1523   /* Overlap => mix */
1524   if (aagg->priv->offset < pad->priv->output_offset)
1525     out_start = pad->priv->output_offset - aagg->priv->offset;
1526   else
1527     out_start = 0;
1528
1529   overlap = pad->priv->size - pad->priv->position;
1530   if (overlap > blocksize - out_start)
1531     overlap = blocksize - out_start;
1532
1533   if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) {
1534     /* skip gap buffer */
1535     GST_LOG_OBJECT (pad, "skipping GAP buffer");
1536     pad->priv->output_offset += pad->priv->size - pad->priv->position;
1537     pad->priv->position = pad->priv->size;
1538
1539     gst_buffer_replace (&pad->priv->buffer, NULL);
1540     gst_buffer_replace (&pad->priv->input_buffer, NULL);
1541     return FALSE;
1542   }
1543
1544   gst_buffer_ref (inbuf);
1545   in_offset = pad->priv->position;
1546   GST_OBJECT_UNLOCK (pad);
1547   GST_OBJECT_UNLOCK (aagg);
1548
1549   filled = GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->aggregate_one_buffer (aagg,
1550       pad, inbuf, in_offset, outbuf, out_start, overlap);
1551
1552   GST_OBJECT_LOCK (aagg);
1553   GST_OBJECT_LOCK (pad);
1554
1555   pad_changed = (inbuf != pad->priv->buffer);
1556   gst_buffer_unref (inbuf);
1557
1558   if (filled)
1559     GST_BUFFER_FLAG_UNSET (outbuf, GST_BUFFER_FLAG_GAP);
1560
1561   if (pad_changed)
1562     return FALSE;
1563
1564   pad->priv->position += overlap;
1565   pad->priv->output_offset += overlap;
1566
1567   if (pad->priv->position == pad->priv->size) {
1568     /* Buffer done, drop it */
1569     gst_buffer_replace (&pad->priv->buffer, NULL);
1570     gst_buffer_replace (&pad->priv->input_buffer, NULL);
1571     GST_LOG_OBJECT (pad, "Finished mixing buffer, waiting for next");
1572     return FALSE;
1573   }
1574
1575   return TRUE;
1576 }
1577
1578 static GstBuffer *
1579 gst_audio_aggregator_create_output_buffer (GstAudioAggregator * aagg,
1580     guint num_frames)
1581 {
1582   GstAllocator *allocator;
1583   GstAllocationParams params;
1584   GstBuffer *outbuf;
1585   GstMapInfo outmap;
1586   GstAggregator *agg = GST_AGGREGATOR (aagg);
1587   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1588
1589   gst_aggregator_get_allocator (GST_AGGREGATOR (aagg), &allocator, &params);
1590
1591   GST_DEBUG ("Creating output buffer with size %d",
1592       num_frames * GST_AUDIO_INFO_BPF (&srcpad->info));
1593
1594   outbuf = gst_buffer_new_allocate (allocator, num_frames *
1595       GST_AUDIO_INFO_BPF (&srcpad->info), &params);
1596
1597   if (allocator)
1598     gst_object_unref (allocator);
1599
1600   gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE);
1601   gst_audio_format_fill_silence (srcpad->info.finfo, outmap.data, outmap.size);
1602   gst_buffer_unmap (outbuf, &outmap);
1603
1604   return outbuf;
1605 }
1606
1607 static gboolean
1608 sync_pad_values (GstElement * aagg, GstPad * pad, gpointer user_data)
1609 {
1610   GstAudioAggregatorPad *aapad = GST_AUDIO_AGGREGATOR_PAD (pad);
1611   GstAggregatorPad *bpad = GST_AGGREGATOR_PAD_CAST (pad);
1612   GstClockTime timestamp, stream_time;
1613
1614   if (aapad->priv->buffer == NULL)
1615     return TRUE;
1616
1617   timestamp = GST_BUFFER_PTS (aapad->priv->buffer);
1618   GST_OBJECT_LOCK (bpad);
1619   stream_time = gst_segment_to_stream_time (&bpad->segment, GST_FORMAT_TIME,
1620       timestamp);
1621   GST_OBJECT_UNLOCK (bpad);
1622
1623   /* sync object properties on stream time */
1624   /* TODO: Ideally we would want to do that on every sample */
1625   if (GST_CLOCK_TIME_IS_VALID (stream_time))
1626     gst_object_sync_values (GST_OBJECT_CAST (pad), stream_time);
1627
1628   return TRUE;
1629 }
1630
1631 static GstFlowReturn
1632 gst_audio_aggregator_aggregate (GstAggregator * agg, gboolean timeout)
1633 {
1634   /* Calculate the current output offset/timestamp and offset_end/timestamp_end.
1635    * Allocate a silence buffer for this and store it.
1636    *
1637    * For all pads:
1638    * 1) Once per input buffer (cached)
1639    *   1) Check discont (flag and timestamp with tolerance)
1640    *   2) If discont or new, resync. That means:
1641    *     1) Drop all start data of the buffer that comes before
1642    *        the current position/offset.
1643    *     2) Calculate the offset (output segment!) that the first
1644    *        frame of the input buffer corresponds to. Base this on
1645    *        the running time.
1646    *
1647    * 2) If the current pad's offset/offset_end overlaps with the output
1648    *    offset/offset_end, mix it at the appropiate position in the output
1649    *    buffer and advance the pad's position. Remember if this pad needs
1650    *    a new buffer to advance behind the output offset_end.
1651    *
1652    * If we had no pad with a buffer, go EOS.
1653    *
1654    * If we had at least one pad that did not advance behind output
1655    * offset_end, let aggregate be called again for the current
1656    * output offset/offset_end.
1657    */
1658   GstElement *element;
1659   GstAudioAggregator *aagg;
1660   GList *iter;
1661   GstFlowReturn ret;
1662   GstBuffer *outbuf = NULL;
1663   gint64 next_offset;
1664   gint64 next_timestamp;
1665   gint rate, bpf;
1666   gboolean dropped = FALSE;
1667   gboolean is_eos = TRUE;
1668   gboolean is_done = TRUE;
1669   guint blocksize;
1670   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1671
1672   element = GST_ELEMENT (agg);
1673   aagg = GST_AUDIO_AGGREGATOR (agg);
1674
1675   /* Sync pad properties to the stream time */
1676   gst_element_foreach_sink_pad (element, sync_pad_values, NULL);
1677
1678   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1679   GST_OBJECT_LOCK (agg);
1680
1681   /* Update position from the segment start/stop if needed */
1682   if (agg->segment.position == -1) {
1683     if (agg->segment.rate > 0.0)
1684       agg->segment.position = agg->segment.start;
1685     else
1686       agg->segment.position = agg->segment.stop;
1687   }
1688
1689   if (G_UNLIKELY (srcpad->info.finfo->format == GST_AUDIO_FORMAT_UNKNOWN)) {
1690     if (timeout) {
1691       GST_DEBUG_OBJECT (aagg,
1692           "Got timeout before receiving any caps, don't output anything");
1693
1694       /* Advance position */
1695       if (agg->segment.rate > 0.0)
1696         agg->segment.position += aagg->priv->output_buffer_duration;
1697       else if (agg->segment.position > aagg->priv->output_buffer_duration)
1698         agg->segment.position -= aagg->priv->output_buffer_duration;
1699       else
1700         agg->segment.position = 0;
1701
1702       GST_OBJECT_UNLOCK (agg);
1703       GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1704       return GST_AGGREGATOR_FLOW_NEED_DATA;
1705     } else {
1706       GST_OBJECT_UNLOCK (agg);
1707       goto not_negotiated;
1708     }
1709   }
1710
1711   rate = GST_AUDIO_INFO_RATE (&srcpad->info);
1712   bpf = GST_AUDIO_INFO_BPF (&srcpad->info);
1713
1714   if (aagg->priv->offset == -1) {
1715     aagg->priv->offset =
1716         gst_util_uint64_scale (agg->segment.position - agg->segment.start, rate,
1717         GST_SECOND);
1718     GST_DEBUG_OBJECT (aagg, "Starting at offset %" G_GINT64_FORMAT,
1719         aagg->priv->offset);
1720   }
1721
1722   blocksize = gst_util_uint64_scale (aagg->priv->output_buffer_duration,
1723       rate, GST_SECOND);
1724   blocksize = MAX (1, blocksize);
1725
1726   /* FIXME: Reverse mixing does not work at all yet */
1727   if (agg->segment.rate > 0.0) {
1728     next_offset = aagg->priv->offset + blocksize;
1729   } else {
1730     next_offset = aagg->priv->offset - blocksize;
1731   }
1732
1733   /* Use the sample counter, which will never accumulate rounding errors */
1734   next_timestamp =
1735       agg->segment.start + gst_util_uint64_scale (next_offset, GST_SECOND,
1736       rate);
1737
1738   if (aagg->priv->current_buffer == NULL) {
1739     GST_OBJECT_UNLOCK (agg);
1740     aagg->priv->current_buffer =
1741         GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->create_output_buffer (aagg,
1742         blocksize);
1743     /* Be careful, some things could have changed ? */
1744     GST_OBJECT_LOCK (agg);
1745     GST_BUFFER_FLAG_SET (aagg->priv->current_buffer, GST_BUFFER_FLAG_GAP);
1746   }
1747   outbuf = aagg->priv->current_buffer;
1748
1749   GST_LOG_OBJECT (agg,
1750       "Starting to mix %u samples for offset %" G_GINT64_FORMAT
1751       " with timestamp %" GST_TIME_FORMAT, blocksize,
1752       aagg->priv->offset, GST_TIME_ARGS (agg->segment.position));
1753
1754   for (iter = element->sinkpads; iter; iter = iter->next) {
1755     GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) iter->data;
1756     GstAggregatorPad *aggpad = (GstAggregatorPad *) iter->data;
1757     gboolean pad_eos = gst_aggregator_pad_is_eos (aggpad);
1758
1759     if (!pad_eos)
1760       is_eos = FALSE;
1761
1762     pad->priv->input_buffer = gst_aggregator_pad_peek_buffer (aggpad);
1763
1764     GST_OBJECT_LOCK (pad);
1765     if (!pad->priv->input_buffer) {
1766       if (timeout) {
1767         if (pad->priv->output_offset < next_offset) {
1768           gint64 diff = next_offset - pad->priv->output_offset;
1769           GST_DEBUG_OBJECT (pad, "Timeout, missing %" G_GINT64_FORMAT
1770               " frames (%" GST_TIME_FORMAT ")", diff,
1771               GST_TIME_ARGS (gst_util_uint64_scale (diff, GST_SECOND,
1772                       GST_AUDIO_INFO_RATE (&srcpad->info))));
1773         }
1774       } else if (!pad_eos) {
1775         is_done = FALSE;
1776       }
1777       GST_OBJECT_UNLOCK (pad);
1778       continue;
1779     }
1780
1781     /* New buffer? */
1782     if (!pad->priv->buffer) {
1783       if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer)
1784         pad->priv->buffer =
1785             gst_audio_aggregator_convert_buffer
1786             (aagg, GST_PAD (pad), &pad->info, &srcpad->info,
1787             pad->priv->input_buffer);
1788       else
1789         pad->priv->buffer = gst_buffer_ref (pad->priv->input_buffer);
1790
1791       if (!gst_audio_aggregator_fill_buffer (aagg, pad)) {
1792         gst_buffer_replace (&pad->priv->buffer, NULL);
1793         gst_buffer_replace (&pad->priv->input_buffer, NULL);
1794         pad->priv->buffer = NULL;
1795         dropped = TRUE;
1796         GST_OBJECT_UNLOCK (pad);
1797
1798         gst_aggregator_pad_drop_buffer (aggpad);
1799         continue;
1800       }
1801     } else {
1802       gst_buffer_unref (pad->priv->input_buffer);
1803     }
1804
1805     if (!pad->priv->buffer && !dropped && pad_eos) {
1806       GST_DEBUG_OBJECT (aggpad, "Pad is in EOS state");
1807       GST_OBJECT_UNLOCK (pad);
1808       continue;
1809     }
1810
1811     g_assert (pad->priv->buffer);
1812
1813     /* This pad is lagging behind, we need to update the offset
1814      * and maybe drop the current buffer */
1815     if (pad->priv->output_offset < aagg->priv->offset) {
1816       gint64 diff = aagg->priv->offset - pad->priv->output_offset;
1817       gint64 odiff = diff;
1818
1819       if (pad->priv->position + diff > pad->priv->size)
1820         diff = pad->priv->size - pad->priv->position;
1821       pad->priv->position += diff;
1822       pad->priv->output_offset += diff;
1823
1824       if (pad->priv->position == pad->priv->size) {
1825         GST_DEBUG_OBJECT (pad, "Buffer was late by %" GST_TIME_FORMAT
1826             ", dropping %" GST_PTR_FORMAT,
1827             GST_TIME_ARGS (gst_util_uint64_scale (odiff, GST_SECOND,
1828                     GST_AUDIO_INFO_RATE (&srcpad->info))), pad->priv->buffer);
1829         /* Buffer done, drop it */
1830         gst_buffer_replace (&pad->priv->buffer, NULL);
1831         gst_buffer_replace (&pad->priv->input_buffer, NULL);
1832         dropped = TRUE;
1833         GST_OBJECT_UNLOCK (pad);
1834         gst_aggregator_pad_drop_buffer (aggpad);
1835         continue;
1836       }
1837     }
1838
1839     g_assert (pad->priv->buffer);
1840
1841     if (pad->priv->output_offset >= aagg->priv->offset
1842         && pad->priv->output_offset < aagg->priv->offset + blocksize) {
1843       gboolean drop_buf;
1844
1845       GST_LOG_OBJECT (aggpad, "Mixing buffer for current offset");
1846       drop_buf = !gst_audio_aggregator_mix_buffer (aagg, pad, pad->priv->buffer,
1847           outbuf, blocksize);
1848       if (pad->priv->output_offset >= next_offset) {
1849         GST_LOG_OBJECT (pad,
1850             "Pad is at or after current offset: %" G_GUINT64_FORMAT " >= %"
1851             G_GINT64_FORMAT, pad->priv->output_offset, next_offset);
1852       } else {
1853         is_done = FALSE;
1854       }
1855       if (drop_buf) {
1856         GST_OBJECT_UNLOCK (pad);
1857         gst_aggregator_pad_drop_buffer (aggpad);
1858         continue;
1859       }
1860     }
1861
1862     GST_OBJECT_UNLOCK (pad);
1863   }
1864   GST_OBJECT_UNLOCK (agg);
1865
1866   if (dropped) {
1867     /* We dropped a buffer, retry */
1868     GST_LOG_OBJECT (aagg, "A pad dropped a buffer, wait for the next one");
1869     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1870     return GST_AGGREGATOR_FLOW_NEED_DATA;
1871   }
1872
1873   if (!is_done && !is_eos) {
1874     /* Get more buffers */
1875     GST_LOG_OBJECT (aagg,
1876         "We're not done yet for the current offset, waiting for more data");
1877     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1878     return GST_AGGREGATOR_FLOW_NEED_DATA;
1879   }
1880
1881   if (is_eos) {
1882     gint64 max_offset = 0;
1883
1884     GST_DEBUG_OBJECT (aagg, "We're EOS");
1885
1886     GST_OBJECT_LOCK (agg);
1887     for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) {
1888       GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
1889
1890       max_offset = MAX ((gint64) max_offset, (gint64) pad->priv->output_offset);
1891     }
1892     GST_OBJECT_UNLOCK (agg);
1893
1894     /* This means EOS or nothing mixed in at all */
1895     if (aagg->priv->offset == max_offset) {
1896       gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1897       GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1898       return GST_FLOW_EOS;
1899     }
1900
1901     if (max_offset <= next_offset) {
1902       GST_DEBUG_OBJECT (aagg,
1903           "Last buffer is incomplete: %" G_GUINT64_FORMAT " <= %"
1904           G_GINT64_FORMAT, max_offset, next_offset);
1905       next_offset = max_offset;
1906       next_timestamp =
1907           agg->segment.start + gst_util_uint64_scale (next_offset, GST_SECOND,
1908           rate);
1909
1910       if (next_offset > aagg->priv->offset)
1911         gst_buffer_resize (outbuf, 0, (next_offset - aagg->priv->offset) * bpf);
1912     }
1913   }
1914
1915   /* set timestamps on the output buffer */
1916   GST_OBJECT_LOCK (agg);
1917   if (agg->segment.rate > 0.0) {
1918     GST_BUFFER_PTS (outbuf) = agg->segment.position;
1919     GST_BUFFER_OFFSET (outbuf) = aagg->priv->offset;
1920     GST_BUFFER_OFFSET_END (outbuf) = next_offset;
1921     GST_BUFFER_DURATION (outbuf) = next_timestamp - agg->segment.position;
1922   } else {
1923     GST_BUFFER_PTS (outbuf) = next_timestamp;
1924     GST_BUFFER_OFFSET (outbuf) = next_offset;
1925     GST_BUFFER_OFFSET_END (outbuf) = aagg->priv->offset;
1926     GST_BUFFER_DURATION (outbuf) = agg->segment.position - next_timestamp;
1927   }
1928
1929   GST_OBJECT_UNLOCK (agg);
1930
1931   /* send it out */
1932   GST_LOG_OBJECT (aagg,
1933       "pushing outbuf %p, timestamp %" GST_TIME_FORMAT " offset %"
1934       G_GINT64_FORMAT, outbuf, GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)),
1935       GST_BUFFER_OFFSET (outbuf));
1936
1937   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1938
1939   ret = gst_aggregator_finish_buffer (agg, outbuf);
1940   aagg->priv->current_buffer = NULL;
1941
1942   GST_LOG_OBJECT (aagg, "pushed outbuf, result = %s", gst_flow_get_name (ret));
1943
1944   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1945   GST_OBJECT_LOCK (agg);
1946   aagg->priv->offset = next_offset;
1947   agg->segment.position = next_timestamp;
1948
1949   /* If there was a timeout and there was a gap in data in out of the streams,
1950    * then it's a very good time to for a resync with the timestamps.
1951    */
1952   if (timeout) {
1953     for (iter = element->sinkpads; iter; iter = iter->next) {
1954       GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
1955
1956       GST_OBJECT_LOCK (pad);
1957       if (pad->priv->output_offset < aagg->priv->offset)
1958         pad->priv->output_offset = -1;
1959       GST_OBJECT_UNLOCK (pad);
1960     }
1961   }
1962   GST_OBJECT_UNLOCK (agg);
1963   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1964
1965   return ret;
1966   /* ERRORS */
1967 not_negotiated:
1968   {
1969     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1970     GST_ELEMENT_ERROR (aagg, STREAM, FORMAT, (NULL),
1971         ("Unknown data received, not negotiated"));
1972     return GST_FLOW_NOT_NEGOTIATED;
1973   }
1974 }