audiomixer: remove, moved to -base
[platform/upstream/gstreamer.git] / gst-libs / gst / audio / gstaudioaggregator.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2001 Thomas <thomas@apestaart.org>
4  *               2005,2006 Wim Taymans <wim@fluendo.com>
5  *                    2013 Sebastian Dröge <sebastian@centricular.com>
6  *                    2014 Collabora
7  *                             Olivier Crete <olivier.crete@collabora.com>
8  *
9  * gstaudioaggregator.c:
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION: gstaudioaggregator
28  * @short_description: manages a set of pads with the purpose of
29  * aggregating their buffers for raw audio
30  * @see_also: #GstAggregator
31  *
32  * #GstAudioAggregator will perform conversion on the data arriving
33  * on its sink pads, based on the format expected downstream.
34  *
35  * Subclasses can opt out of the conversion behaviour by setting
36  * #GstAudioAggregator.convert_buffer() to %NULL.
37  *
38  * Subclasses that wish to use the default conversion implementation
39  * should use a (subclass of) #GstAudioAggregatorConvertPad as their
40  * #GstAggregatorClass.sinkpads_type, as it will cache the created
41  * #GstAudioConverter and install a property allowing to configure it,
42  * #GstAudioAggregatorPadClass:converter-config.
43  *
44  * Subclasses that wish to perform custom conversion should override
45  * #GstAudioAggregator.convert_buffer().
46  *
47  * When conversion is enabled, #GstAudioAggregator will accept
48  * any type of raw audio caps and perform conversion
49  * on the data arriving on its sink pads, with whatever downstream
50  * expects as the target format.
51  *
52  * In case downstream caps are not fully fixated, it will use
53  * the first configured sink pad to finish fixating its source pad
54  * caps.
55  *
56  * Additionally, handling audio conversion directly in the element
57  * means that this base class supports safely reconfiguring its
58  * source pad.
59  *
60  * A notable exception for now is the sample rate, sink pads must
61  * have the same sample rate as either the downstream requirement,
62  * or the first configured pad, or a combination of both (when
63  * downstream specifies a range or a set of acceptable rates).
64  */
65
66
67 #ifdef HAVE_CONFIG_H
68 #  include "config.h"
69 #endif
70
71 #include "gstaudioaggregator.h"
72
73 #include <string.h>
74
75 GST_DEBUG_CATEGORY_STATIC (audio_aggregator_debug);
76 #define GST_CAT_DEFAULT audio_aggregator_debug
77
78 struct _GstAudioAggregatorPadPrivate
79 {
80   /* All members are protected by the pad object lock */
81
82   GstBuffer *buffer;            /* current buffer we're mixing, for
83                                    comparison with a new input buffer from
84                                    aggregator to see if we need to update our
85                                    cached values. */
86
87   guint position, size;         /* position in the input buffer and size of the
88                                    input buffer in number of samples */
89
90   GstBuffer *input_buffer;
91
92   guint64 output_offset;        /* Sample offset in output segment relative to
93                                    pad.segment.start that position refers to
94                                    in the current buffer. */
95
96   guint64 next_offset;          /* Next expected sample offset relative to
97                                    pad.segment.start */
98
99   /* Last time we noticed a discont */
100   GstClockTime discont_time;
101
102   /* A new unhandled segment event has been received */
103   gboolean new_segment;
104 };
105
106
107 /*****************************************
108  * GstAudioAggregatorPad implementation  *
109  *****************************************/
110 G_DEFINE_TYPE (GstAudioAggregatorPad, gst_audio_aggregator_pad,
111     GST_TYPE_AGGREGATOR_PAD);
112
113 enum
114 {
115   PROP_PAD_0,
116   PROP_PAD_CONVERTER_CONFIG,
117 };
118
119 static GstFlowReturn
120 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
121     GstAggregator * aggregator);
122
123 static void
124 gst_audio_aggregator_pad_finalize (GObject * object)
125 {
126   GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) object;
127
128   gst_buffer_replace (&pad->priv->buffer, NULL);
129   gst_buffer_replace (&pad->priv->input_buffer, NULL);
130
131   G_OBJECT_CLASS (gst_audio_aggregator_pad_parent_class)->finalize (object);
132 }
133
134 static void
135 gst_audio_aggregator_pad_class_init (GstAudioAggregatorPadClass * klass)
136 {
137   GObjectClass *gobject_class = (GObjectClass *) klass;
138   GstAggregatorPadClass *aggpadclass = (GstAggregatorPadClass *) klass;
139
140   g_type_class_add_private (klass, sizeof (GstAudioAggregatorPadPrivate));
141
142   gobject_class->finalize = gst_audio_aggregator_pad_finalize;
143   aggpadclass->flush = GST_DEBUG_FUNCPTR (gst_audio_aggregator_pad_flush_pad);
144 }
145
146 static void
147 gst_audio_aggregator_pad_init (GstAudioAggregatorPad * pad)
148 {
149   pad->priv =
150       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AUDIO_AGGREGATOR_PAD,
151       GstAudioAggregatorPadPrivate);
152
153   gst_audio_info_init (&pad->info);
154
155   pad->priv->buffer = NULL;
156   pad->priv->input_buffer = NULL;
157   pad->priv->position = 0;
158   pad->priv->size = 0;
159   pad->priv->output_offset = -1;
160   pad->priv->next_offset = -1;
161   pad->priv->discont_time = GST_CLOCK_TIME_NONE;
162 }
163
164
165 static GstFlowReturn
166 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
167     GstAggregator * aggregator)
168 {
169   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
170
171   GST_OBJECT_LOCK (aggpad);
172   pad->priv->position = pad->priv->size = 0;
173   pad->priv->output_offset = pad->priv->next_offset = -1;
174   pad->priv->discont_time = GST_CLOCK_TIME_NONE;
175   gst_buffer_replace (&pad->priv->buffer, NULL);
176   gst_buffer_replace (&pad->priv->input_buffer, NULL);
177   GST_OBJECT_UNLOCK (aggpad);
178
179   return GST_FLOW_OK;
180 }
181
182 struct _GstAudioAggregatorConvertPadPrivate
183 {
184   /* All members are protected by the pad object lock */
185   GstAudioConverter *converter;
186   GstStructure *converter_config;
187   gboolean converter_config_changed;
188 };
189
190
191 G_DEFINE_TYPE (GstAudioAggregatorConvertPad, gst_audio_aggregator_convert_pad,
192     GST_TYPE_AUDIO_AGGREGATOR_PAD);
193
194 static void
195 gst_audio_aggregator_convert_pad_update_converter (GstAudioAggregatorConvertPad
196     * aaggcpad, GstAudioInfo * in_info, GstAudioInfo * out_info)
197 {
198   if (!aaggcpad->priv->converter_config_changed)
199     return;
200
201   if (aaggcpad->priv->converter) {
202     gst_audio_converter_free (aaggcpad->priv->converter);
203     aaggcpad->priv->converter = NULL;
204   }
205
206   if (gst_audio_info_is_equal (in_info, out_info) ||
207       in_info->finfo->format == GST_AUDIO_FORMAT_UNKNOWN) {
208     if (aaggcpad->priv->converter) {
209       gst_audio_converter_free (aaggcpad->priv->converter);
210       aaggcpad->priv->converter = NULL;
211     }
212   } else {
213     /* If we haven't received caps yet, this pad should not have
214      * a buffer to convert anyway */
215     aaggcpad->priv->converter =
216         gst_audio_converter_new (GST_AUDIO_CONVERTER_FLAG_NONE,
217         in_info, out_info,
218         aaggcpad->priv->converter_config ? gst_structure_copy (aaggcpad->
219             priv->converter_config) : NULL);
220   }
221
222   aaggcpad->priv->converter_config_changed = FALSE;
223 }
224
225 static GstBuffer *
226 gst_audio_aggregator_convert_pad_convert_buffer (GstAudioAggregatorConvertPad *
227     aaggcpad, GstAudioInfo * in_info, GstAudioInfo * out_info,
228     GstBuffer * input_buffer)
229 {
230   GstBuffer *res;
231
232   gst_audio_aggregator_convert_pad_update_converter (aaggcpad, in_info,
233       out_info);
234
235   if (aaggcpad->priv->converter) {
236     gint insize = gst_buffer_get_size (input_buffer);
237     gsize insamples = insize / in_info->bpf;
238     gsize outsamples =
239         gst_audio_converter_get_out_frames (aaggcpad->priv->converter,
240         insamples);
241     gint outsize = outsamples * out_info->bpf;
242     GstMapInfo inmap, outmap;
243
244     res = gst_buffer_new_allocate (NULL, outsize, NULL);
245
246     /* We create a perfectly similar buffer, except obviously for
247      * its converted contents */
248     gst_buffer_copy_into (res, input_buffer,
249         GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS |
250         GST_BUFFER_COPY_META, 0, -1);
251
252     gst_buffer_map (input_buffer, &inmap, GST_MAP_READ);
253     gst_buffer_map (res, &outmap, GST_MAP_WRITE);
254
255     gst_audio_converter_samples (aaggcpad->priv->converter,
256         GST_AUDIO_CONVERTER_FLAG_NONE,
257         (gpointer *) & inmap.data, insamples,
258         (gpointer *) & outmap.data, outsamples);
259
260     gst_buffer_unmap (input_buffer, &inmap);
261     gst_buffer_unmap (res, &outmap);
262   } else {
263     res = gst_buffer_ref (input_buffer);
264   }
265
266   return res;
267 }
268
269 static void
270 gst_audio_aggregator_convert_pad_finalize (GObject * object)
271 {
272   GstAudioAggregatorConvertPad *pad = (GstAudioAggregatorConvertPad *) object;
273
274   if (pad->priv->converter)
275     gst_audio_converter_free (pad->priv->converter);
276
277   if (pad->priv->converter_config)
278     gst_structure_free (pad->priv->converter_config);
279
280   G_OBJECT_CLASS (gst_audio_aggregator_convert_pad_parent_class)->finalize
281       (object);
282 }
283
284 static void
285 gst_audio_aggregator_convert_pad_get_property (GObject * object, guint prop_id,
286     GValue * value, GParamSpec * pspec)
287 {
288   GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
289
290   switch (prop_id) {
291     case PROP_PAD_CONVERTER_CONFIG:
292       GST_OBJECT_LOCK (pad);
293       if (pad->priv->converter_config)
294         g_value_set_boxed (value, pad->priv->converter_config);
295       GST_OBJECT_UNLOCK (pad);
296       break;
297     default:
298       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
299       break;
300   }
301 }
302
303 static void
304 gst_audio_aggregator_convert_pad_set_property (GObject * object, guint prop_id,
305     const GValue * value, GParamSpec * pspec)
306 {
307   GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
308
309   switch (prop_id) {
310     case PROP_PAD_CONVERTER_CONFIG:
311       GST_OBJECT_LOCK (pad);
312       if (pad->priv->converter_config)
313         gst_structure_free (pad->priv->converter_config);
314       pad->priv->converter_config = g_value_dup_boxed (value);
315       pad->priv->converter_config_changed = TRUE;
316       GST_OBJECT_UNLOCK (pad);
317       break;
318     default:
319       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
320       break;
321   }
322 }
323
324 static void
325 gst_audio_aggregator_convert_pad_class_init (GstAudioAggregatorConvertPadClass *
326     klass)
327 {
328   GObjectClass *gobject_class = (GObjectClass *) klass;
329   g_type_class_add_private (klass,
330       sizeof (GstAudioAggregatorConvertPadPrivate));
331
332   gobject_class->set_property = gst_audio_aggregator_convert_pad_set_property;
333   gobject_class->get_property = gst_audio_aggregator_convert_pad_get_property;
334
335   g_object_class_install_property (gobject_class, PROP_PAD_CONVERTER_CONFIG,
336       g_param_spec_boxed ("converter-config", "Converter configuration",
337           "A GstStructure describing the configuration that should be used "
338           "when converting this pad's audio buffers",
339           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
340
341   gobject_class->finalize = gst_audio_aggregator_convert_pad_finalize;
342 }
343
344 static void
345 gst_audio_aggregator_convert_pad_init (GstAudioAggregatorConvertPad * pad)
346 {
347   pad->priv =
348       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AUDIO_AGGREGATOR_CONVERT_PAD,
349       GstAudioAggregatorConvertPadPrivate);
350 }
351
352 /**************************************
353  * GstAudioAggregator implementation  *
354  **************************************/
355
356 struct _GstAudioAggregatorPrivate
357 {
358   GMutex mutex;
359
360   /* All three properties are unprotected, can't be modified while streaming */
361   /* Size in frames that is output per buffer */
362   GstClockTime output_buffer_duration;
363   GstClockTime alignment_threshold;
364   GstClockTime discont_wait;
365
366   /* Protected by srcpad stream clock */
367   /* Output buffer starting at offset containing blocksize frames (calculated
368    * from output_buffer_duration) */
369   GstBuffer *current_buffer;
370
371   /* counters to keep track of timestamps */
372   /* Readable with object lock, writable with both aag lock and object lock */
373
374   /* Sample offset starting from 0 at aggregator.segment.start */
375   gint64 offset;
376 };
377
378 #define GST_AUDIO_AGGREGATOR_LOCK(self)   g_mutex_lock (&(self)->priv->mutex);
379 #define GST_AUDIO_AGGREGATOR_UNLOCK(self) g_mutex_unlock (&(self)->priv->mutex);
380
381 static void gst_audio_aggregator_set_property (GObject * object, guint prop_id,
382     const GValue * value, GParamSpec * pspec);
383 static void gst_audio_aggregator_get_property (GObject * object, guint prop_id,
384     GValue * value, GParamSpec * pspec);
385 static void gst_audio_aggregator_dispose (GObject * object);
386
387 static gboolean gst_audio_aggregator_src_event (GstAggregator * agg,
388     GstEvent * event);
389 static gboolean gst_audio_aggregator_sink_event (GstAggregator * agg,
390     GstAggregatorPad * aggpad, GstEvent * event);
391 static gboolean gst_audio_aggregator_src_query (GstAggregator * agg,
392     GstQuery * query);
393 static gboolean
394 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
395     GstQuery * query);
396 static gboolean gst_audio_aggregator_start (GstAggregator * agg);
397 static gboolean gst_audio_aggregator_stop (GstAggregator * agg);
398 static GstFlowReturn gst_audio_aggregator_flush (GstAggregator * agg);
399
400 static GstBuffer *gst_audio_aggregator_create_output_buffer (GstAudioAggregator
401     * aagg, guint num_frames);
402 static GstBuffer *gst_audio_aggregator_do_clip (GstAggregator * agg,
403     GstAggregatorPad * bpad, GstBuffer * buffer);
404 static GstFlowReturn gst_audio_aggregator_aggregate (GstAggregator * agg,
405     gboolean timeout);
406 static gboolean sync_pad_values (GstElement * aagg, GstPad * pad, gpointer ud);
407 static gboolean gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg,
408     GstCaps * caps);
409 static GstFlowReturn
410 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
411     GstCaps * caps, GstCaps ** ret);
412 static GstCaps *gst_audio_aggregator_fixate_src_caps (GstAggregator * agg,
413     GstCaps * caps);
414
415 #define DEFAULT_OUTPUT_BUFFER_DURATION (10 * GST_MSECOND)
416 #define DEFAULT_ALIGNMENT_THRESHOLD   (40 * GST_MSECOND)
417 #define DEFAULT_DISCONT_WAIT (1 * GST_SECOND)
418
419 enum
420 {
421   PROP_0,
422   PROP_OUTPUT_BUFFER_DURATION,
423   PROP_ALIGNMENT_THRESHOLD,
424   PROP_DISCONT_WAIT,
425 };
426
427 G_DEFINE_ABSTRACT_TYPE (GstAudioAggregator, gst_audio_aggregator,
428     GST_TYPE_AGGREGATOR);
429
430 static GstClockTime
431 gst_audio_aggregator_get_next_time (GstAggregator * agg)
432 {
433   GstClockTime next_time;
434
435   GST_OBJECT_LOCK (agg);
436   if (agg->segment.position == -1 || agg->segment.position < agg->segment.start)
437     next_time = agg->segment.start;
438   else
439     next_time = agg->segment.position;
440
441   if (agg->segment.stop != -1 && next_time > agg->segment.stop)
442     next_time = agg->segment.stop;
443
444   next_time =
445       gst_segment_to_running_time (&agg->segment, GST_FORMAT_TIME, next_time);
446   GST_OBJECT_UNLOCK (agg);
447
448   return next_time;
449 }
450
451 static GstBuffer *
452 gst_audio_aggregator_convert_once (GstAudioAggregator * aagg, GstPad * pad,
453     GstAudioInfo * in_info, GstAudioInfo * out_info, GstBuffer * buffer)
454 {
455   GstAudioConverter *converter =
456       gst_audio_converter_new (GST_AUDIO_CONVERTER_FLAG_NONE,
457       in_info, out_info, NULL);
458   gint insize = gst_buffer_get_size (buffer);
459   gsize insamples = insize / in_info->bpf;
460   gsize outsamples = gst_audio_converter_get_out_frames (converter,
461       insamples);
462   gint outsize = outsamples * out_info->bpf;
463   GstMapInfo inmap, outmap;
464   GstBuffer *converted = gst_buffer_new_allocate (NULL, outsize, NULL);
465
466   gst_buffer_copy_into (converted, buffer,
467       GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS |
468       GST_BUFFER_COPY_META, 0, -1);
469
470   gst_buffer_map (buffer, &inmap, GST_MAP_READ);
471   gst_buffer_map (converted, &outmap, GST_MAP_WRITE);
472
473   gst_audio_converter_samples (converter,
474       GST_AUDIO_CONVERTER_FLAG_NONE,
475       (gpointer *) & inmap.data, insamples,
476       (gpointer *) & outmap.data, outsamples);
477
478   gst_buffer_unmap (buffer, &inmap);
479   gst_buffer_unmap (converted, &outmap);
480   gst_audio_converter_free (converter);
481
482   return converted;
483 }
484
485 static GstBuffer *
486 gst_audio_aggregator_default_convert_buffer (GstAudioAggregator * aagg,
487     GstPad * pad, GstAudioInfo * in_info, GstAudioInfo * out_info,
488     GstBuffer * buffer)
489 {
490   if (GST_IS_AUDIO_AGGREGATOR_CONVERT_PAD (pad))
491     return
492         gst_audio_aggregator_convert_pad_convert_buffer
493         (GST_AUDIO_AGGREGATOR_CONVERT_PAD (pad),
494         &GST_AUDIO_AGGREGATOR_PAD (pad)->info, out_info, buffer);
495   else
496     return gst_audio_aggregator_convert_once (aagg, pad, in_info, out_info,
497         buffer);
498 }
499
500 static GstBuffer *
501 gst_audio_aggregator_convert_buffer (GstAudioAggregator * aagg, GstPad * pad,
502     GstAudioInfo * in_info, GstAudioInfo * out_info, GstBuffer * buffer)
503 {
504   GstAudioAggregatorClass *klass = GST_AUDIO_AGGREGATOR_GET_CLASS (aagg);
505
506   g_assert (klass->convert_buffer);
507
508   return klass->convert_buffer (aagg, pad, in_info, out_info, buffer);
509 }
510
511 static void
512 gst_audio_aggregator_class_init (GstAudioAggregatorClass * klass)
513 {
514   GObjectClass *gobject_class = (GObjectClass *) klass;
515   GstAggregatorClass *gstaggregator_class = (GstAggregatorClass *) klass;
516
517   g_type_class_add_private (klass, sizeof (GstAudioAggregatorPrivate));
518
519   gobject_class->set_property = gst_audio_aggregator_set_property;
520   gobject_class->get_property = gst_audio_aggregator_get_property;
521   gobject_class->dispose = gst_audio_aggregator_dispose;
522
523   gstaggregator_class->src_event =
524       GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_event);
525   gstaggregator_class->sink_event =
526       GST_DEBUG_FUNCPTR (gst_audio_aggregator_sink_event);
527   gstaggregator_class->src_query =
528       GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_query);
529   gstaggregator_class->sink_query = gst_audio_aggregator_sink_query;
530   gstaggregator_class->start = gst_audio_aggregator_start;
531   gstaggregator_class->stop = gst_audio_aggregator_stop;
532   gstaggregator_class->flush = gst_audio_aggregator_flush;
533   gstaggregator_class->aggregate =
534       GST_DEBUG_FUNCPTR (gst_audio_aggregator_aggregate);
535   gstaggregator_class->clip = GST_DEBUG_FUNCPTR (gst_audio_aggregator_do_clip);
536   gstaggregator_class->get_next_time = gst_audio_aggregator_get_next_time;
537   gstaggregator_class->update_src_caps =
538       GST_DEBUG_FUNCPTR (gst_audio_aggregator_update_src_caps);
539   gstaggregator_class->fixate_src_caps = gst_audio_aggregator_fixate_src_caps;
540   gstaggregator_class->negotiated_src_caps =
541       gst_audio_aggregator_negotiated_src_caps;
542
543   klass->create_output_buffer = gst_audio_aggregator_create_output_buffer;
544   klass->convert_buffer = gst_audio_aggregator_default_convert_buffer;
545
546   GST_DEBUG_CATEGORY_INIT (audio_aggregator_debug, "audioaggregator",
547       GST_DEBUG_FG_MAGENTA, "GstAudioAggregator");
548
549   g_object_class_install_property (gobject_class, PROP_OUTPUT_BUFFER_DURATION,
550       g_param_spec_uint64 ("output-buffer-duration", "Output Buffer Duration",
551           "Output block size in nanoseconds", 1,
552           G_MAXUINT64, DEFAULT_OUTPUT_BUFFER_DURATION,
553           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
554
555   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
556       g_param_spec_uint64 ("alignment-threshold", "Alignment Threshold",
557           "Timestamp alignment threshold in nanoseconds", 0,
558           G_MAXUINT64 - 1, DEFAULT_ALIGNMENT_THRESHOLD,
559           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
560
561   g_object_class_install_property (gobject_class, PROP_DISCONT_WAIT,
562       g_param_spec_uint64 ("discont-wait", "Discont Wait",
563           "Window of time in nanoseconds to wait before "
564           "creating a discontinuity", 0,
565           G_MAXUINT64 - 1, DEFAULT_DISCONT_WAIT,
566           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
567 }
568
569 static void
570 gst_audio_aggregator_init (GstAudioAggregator * aagg)
571 {
572   aagg->priv =
573       G_TYPE_INSTANCE_GET_PRIVATE (aagg, GST_TYPE_AUDIO_AGGREGATOR,
574       GstAudioAggregatorPrivate);
575
576   g_mutex_init (&aagg->priv->mutex);
577
578   aagg->priv->output_buffer_duration = DEFAULT_OUTPUT_BUFFER_DURATION;
579   aagg->priv->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
580   aagg->priv->discont_wait = DEFAULT_DISCONT_WAIT;
581
582   aagg->current_caps = NULL;
583   gst_audio_info_init (&aagg->info);
584
585   gst_aggregator_set_latency (GST_AGGREGATOR (aagg),
586       aagg->priv->output_buffer_duration, aagg->priv->output_buffer_duration);
587 }
588
589 static void
590 gst_audio_aggregator_dispose (GObject * object)
591 {
592   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
593
594   gst_caps_replace (&aagg->current_caps, NULL);
595
596   g_mutex_clear (&aagg->priv->mutex);
597
598   G_OBJECT_CLASS (gst_audio_aggregator_parent_class)->dispose (object);
599 }
600
601 static void
602 gst_audio_aggregator_set_property (GObject * object, guint prop_id,
603     const GValue * value, GParamSpec * pspec)
604 {
605   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
606
607   switch (prop_id) {
608     case PROP_OUTPUT_BUFFER_DURATION:
609       aagg->priv->output_buffer_duration = g_value_get_uint64 (value);
610       gst_aggregator_set_latency (GST_AGGREGATOR (aagg),
611           aagg->priv->output_buffer_duration,
612           aagg->priv->output_buffer_duration);
613       break;
614     case PROP_ALIGNMENT_THRESHOLD:
615       aagg->priv->alignment_threshold = g_value_get_uint64 (value);
616       break;
617     case PROP_DISCONT_WAIT:
618       aagg->priv->discont_wait = g_value_get_uint64 (value);
619       break;
620     default:
621       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
622       break;
623   }
624 }
625
626 static void
627 gst_audio_aggregator_get_property (GObject * object, guint prop_id,
628     GValue * value, GParamSpec * pspec)
629 {
630   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
631
632   switch (prop_id) {
633     case PROP_OUTPUT_BUFFER_DURATION:
634       g_value_set_uint64 (value, aagg->priv->output_buffer_duration);
635       break;
636     case PROP_ALIGNMENT_THRESHOLD:
637       g_value_set_uint64 (value, aagg->priv->alignment_threshold);
638       break;
639     case PROP_DISCONT_WAIT:
640       g_value_set_uint64 (value, aagg->priv->discont_wait);
641       break;
642     default:
643       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
644       break;
645   }
646 }
647
648 /* Caps negotiation */
649
650 /* Unref after usage */
651 static GstAudioAggregatorPad *
652 gst_audio_aggregator_get_first_configured_pad (GstAggregator * agg)
653 {
654   GstAudioAggregatorPad *res = NULL;
655   GList *l;
656
657   GST_OBJECT_LOCK (agg);
658   for (l = GST_ELEMENT (agg)->sinkpads; l; l = l->next) {
659     GstAudioAggregatorPad *aaggpad = l->data;
660
661     if (GST_AUDIO_INFO_FORMAT (&aaggpad->info) != GST_AUDIO_FORMAT_UNKNOWN) {
662       res = gst_object_ref (aaggpad);
663       break;
664     }
665   }
666   GST_OBJECT_UNLOCK (agg);
667
668   return res;
669 }
670
671 static GstCaps *
672 gst_audio_aggregator_sink_getcaps (GstPad * pad, GstAggregator * agg,
673     GstCaps * filter)
674 {
675   GstAudioAggregatorPad *first_configured_pad =
676       gst_audio_aggregator_get_first_configured_pad (agg);
677   GstCaps *sink_template_caps = gst_pad_get_pad_template_caps (pad);
678   GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
679   GstCaps *sink_caps;
680   GstStructure *s, *s2;
681   gint downstream_rate;
682
683   sink_template_caps = gst_caps_make_writable (sink_template_caps);
684   s = gst_caps_get_structure (sink_template_caps, 0);
685
686   if (downstream_caps && !gst_caps_is_empty (downstream_caps))
687     s2 = gst_caps_get_structure (downstream_caps, 0);
688   else
689     s2 = NULL;
690
691   if (s2 && gst_structure_get_int (s2, "rate", &downstream_rate)) {
692     gst_structure_fixate_field_nearest_int (s, "rate", downstream_rate);
693   } else if (first_configured_pad) {
694     gst_structure_fixate_field_nearest_int (s, "rate",
695         first_configured_pad->info.rate);
696   }
697
698   if (first_configured_pad)
699     gst_object_unref (first_configured_pad);
700
701   sink_caps = filter ? gst_caps_intersect (sink_template_caps,
702       filter) : gst_caps_ref (sink_template_caps);
703
704   GST_INFO_OBJECT (pad, "Getting caps with filter %" GST_PTR_FORMAT, filter);
705   GST_DEBUG_OBJECT (pad, "sink template caps : %" GST_PTR_FORMAT,
706       sink_template_caps);
707   GST_DEBUG_OBJECT (pad, "downstream caps %" GST_PTR_FORMAT, downstream_caps);
708   GST_INFO_OBJECT (pad, "returned sink caps : %" GST_PTR_FORMAT, sink_caps);
709
710   gst_caps_unref (sink_template_caps);
711
712   if (downstream_caps)
713     gst_caps_unref (downstream_caps);
714
715   return sink_caps;
716 }
717
718 static gboolean
719 gst_audio_aggregator_sink_setcaps (GstAudioAggregatorPad * aaggpad,
720     GstAggregator * agg, GstCaps * caps)
721 {
722   GstAudioAggregatorPad *first_configured_pad =
723       gst_audio_aggregator_get_first_configured_pad (agg);
724   GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
725   GstAudioInfo info;
726   gboolean ret = TRUE;
727   gint downstream_rate;
728   GstStructure *s;
729
730   if (!downstream_caps || gst_caps_is_empty (downstream_caps)) {
731     ret = FALSE;
732     goto done;
733   }
734
735   gst_audio_info_from_caps (&info, caps);
736   s = gst_caps_get_structure (downstream_caps, 0);
737
738   /* TODO: handle different rates on sinkpads, a bit complex
739    * because offsets will have to be updated, and audio resampling
740    * has a latency to take into account
741    */
742   if ((gst_structure_get_int (s, "rate", &downstream_rate)
743           && info.rate != downstream_rate) || (first_configured_pad
744           && info.rate != first_configured_pad->info.rate)) {
745     gst_pad_push_event (GST_PAD (aaggpad), gst_event_new_reconfigure ());
746     ret = FALSE;
747   } else {
748     GST_OBJECT_LOCK (aaggpad);
749     gst_audio_info_from_caps (&aaggpad->info, caps);
750     if (GST_IS_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad))
751       GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad)->
752           priv->converter_config_changed = TRUE;
753     GST_OBJECT_UNLOCK (aaggpad);
754   }
755
756 done:
757   if (first_configured_pad)
758     gst_object_unref (first_configured_pad);
759
760   if (downstream_caps)
761     gst_caps_unref (downstream_caps);
762
763   return ret;
764 }
765
766 static GstFlowReturn
767 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
768     GstCaps * caps, GstCaps ** ret)
769 {
770   GstCaps *src_template_caps = gst_pad_get_pad_template_caps (agg->srcpad);
771   GstCaps *downstream_caps =
772       gst_pad_peer_query_caps (agg->srcpad, src_template_caps);
773
774   gst_caps_unref (src_template_caps);
775
776   *ret = gst_caps_intersect (caps, downstream_caps);
777
778   GST_INFO ("Updated src caps to %" GST_PTR_FORMAT, *ret);
779
780   if (downstream_caps)
781     gst_caps_unref (downstream_caps);
782
783   return GST_FLOW_OK;
784 }
785
786 /* At that point if the caps are not fixed, this means downstream
787  * didn't have fully specified requirements, we'll just go ahead
788  * and fixate raw audio fields using our first configured pad, we don't for
789  * now need a more complicated heuristic
790  */
791 static GstCaps *
792 gst_audio_aggregator_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
793 {
794   GstAudioAggregatorClass *aaggclass = GST_AUDIO_AGGREGATOR_GET_CLASS (agg);
795   GstAudioAggregatorPad *first_configured_pad;
796
797   if (!aaggclass->convert_buffer)
798     return
799         GST_AGGREGATOR_CLASS
800         (gst_audio_aggregator_parent_class)->fixate_src_caps (agg, caps);
801
802   first_configured_pad = gst_audio_aggregator_get_first_configured_pad (agg);
803
804   if (first_configured_pad) {
805     GstStructure *s, *s2;
806     GstCaps *first_configured_caps =
807         gst_audio_info_to_caps (&first_configured_pad->info);
808     gint first_configured_rate, first_configured_channels;
809
810     caps = gst_caps_make_writable (caps);
811     s = gst_caps_get_structure (caps, 0);
812     s2 = gst_caps_get_structure (first_configured_caps, 0);
813
814     gst_structure_get_int (s2, "rate", &first_configured_rate);
815     gst_structure_get_int (s2, "channels", &first_configured_channels);
816
817     gst_structure_fixate_field_string (s, "format",
818         gst_structure_get_string (s2, "format"));
819     gst_structure_fixate_field_string (s, "layout",
820         gst_structure_get_string (s2, "layout"));
821     gst_structure_fixate_field_nearest_int (s, "rate", first_configured_rate);
822     gst_structure_fixate_field_nearest_int (s, "channels",
823         first_configured_channels);
824
825     gst_caps_unref (first_configured_caps);
826     gst_object_unref (first_configured_pad);
827   }
828
829   if (!gst_caps_is_fixed (caps))
830     caps = gst_caps_fixate (caps);
831
832   GST_INFO_OBJECT (agg, "Fixated src caps to %" GST_PTR_FORMAT, caps);
833
834   return caps;
835 }
836
837 /* Must be called with OBJECT_LOCK taken */
838 static void
839 gst_audio_aggregator_update_converters (GstAudioAggregator * aagg,
840     GstAudioInfo * new_info)
841 {
842   GList *l;
843
844   for (l = GST_ELEMENT (aagg)->sinkpads; l; l = l->next) {
845     GstAudioAggregatorPad *aaggpad = l->data;
846
847     if (GST_IS_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad))
848       GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad)->
849           priv->converter_config_changed = TRUE;
850
851     /* If we currently were mixing a buffer, we need to convert it to the new
852      * format */
853     if (aaggpad->priv->buffer) {
854       GstBuffer *new_converted_buffer =
855           gst_audio_aggregator_convert_buffer (aagg, GST_PAD (aaggpad),
856           &aaggpad->info, new_info, aaggpad->priv->input_buffer);
857       gst_buffer_replace (&aaggpad->priv->buffer, new_converted_buffer);
858     }
859   }
860 }
861
862 /* We now have our final output caps, we can create the required converters */
863 static gboolean
864 gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
865 {
866   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
867   GstAudioAggregatorClass *aaggclass = GST_AUDIO_AGGREGATOR_GET_CLASS (agg);
868   GstAudioInfo info;
869
870   GST_INFO_OBJECT (agg, "src caps negotiated %" GST_PTR_FORMAT, caps);
871
872   if (!gst_audio_info_from_caps (&info, caps)) {
873     GST_WARNING_OBJECT (aagg, "Rejecting invalid caps: %" GST_PTR_FORMAT, caps);
874     return FALSE;
875   }
876
877   GST_AUDIO_AGGREGATOR_LOCK (aagg);
878   GST_OBJECT_LOCK (aagg);
879
880   if (aaggclass->convert_buffer) {
881     gst_audio_aggregator_update_converters (aagg, &info);
882
883     if (aagg->priv->current_buffer
884         && !gst_audio_info_is_equal (&aagg->info, &info)) {
885       GstBuffer *converted =
886           gst_audio_aggregator_convert_buffer (aagg, agg->srcpad, &aagg->info,
887           &info, aagg->priv->current_buffer);
888       gst_buffer_unref (aagg->priv->current_buffer);
889       aagg->priv->current_buffer = converted;
890     }
891   }
892
893   if (!gst_audio_info_is_equal (&info, &aagg->info)) {
894     GST_INFO_OBJECT (aagg, "setting caps to %" GST_PTR_FORMAT, caps);
895     gst_caps_replace (&aagg->current_caps, caps);
896
897     memcpy (&aagg->info, &info, sizeof (info));
898   }
899
900   GST_OBJECT_UNLOCK (aagg);
901   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
902
903   return
904       GST_AGGREGATOR_CLASS
905       (gst_audio_aggregator_parent_class)->negotiated_src_caps (agg, caps);
906 }
907
908 /* event handling */
909
910 static gboolean
911 gst_audio_aggregator_src_event (GstAggregator * agg, GstEvent * event)
912 {
913   gboolean result;
914
915   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
916   GST_DEBUG_OBJECT (agg->srcpad, "Got %s event on src pad",
917       GST_EVENT_TYPE_NAME (event));
918
919   switch (GST_EVENT_TYPE (event)) {
920     case GST_EVENT_QOS:
921       /* QoS might be tricky */
922       gst_event_unref (event);
923       return FALSE;
924     case GST_EVENT_NAVIGATION:
925       /* navigation is rather pointless. */
926       gst_event_unref (event);
927       return FALSE;
928       break;
929     case GST_EVENT_SEEK:
930     {
931       GstSeekFlags flags;
932       gdouble rate;
933       GstSeekType start_type, stop_type;
934       gint64 start, stop;
935       GstFormat seek_format, dest_format;
936
937       /* parse the seek parameters */
938       gst_event_parse_seek (event, &rate, &seek_format, &flags, &start_type,
939           &start, &stop_type, &stop);
940
941       /* Check the seeking parameters before linking up */
942       if ((start_type != GST_SEEK_TYPE_NONE)
943           && (start_type != GST_SEEK_TYPE_SET)) {
944         result = FALSE;
945         GST_DEBUG_OBJECT (aagg,
946             "seeking failed, unhandled seek type for start: %d", start_type);
947         goto done;
948       }
949       if ((stop_type != GST_SEEK_TYPE_NONE) && (stop_type != GST_SEEK_TYPE_SET)) {
950         result = FALSE;
951         GST_DEBUG_OBJECT (aagg,
952             "seeking failed, unhandled seek type for end: %d", stop_type);
953         goto done;
954       }
955
956       GST_OBJECT_LOCK (agg);
957       dest_format = agg->segment.format;
958       GST_OBJECT_UNLOCK (agg);
959       if (seek_format != dest_format) {
960         result = FALSE;
961         GST_DEBUG_OBJECT (aagg,
962             "seeking failed, unhandled seek format: %s",
963             gst_format_get_name (seek_format));
964         goto done;
965       }
966     }
967       break;
968     default:
969       break;
970   }
971
972   return
973       GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_event (agg,
974       event);
975
976 done:
977   return result;
978 }
979
980
981 static gboolean
982 gst_audio_aggregator_sink_event (GstAggregator * agg,
983     GstAggregatorPad * aggpad, GstEvent * event)
984 {
985   GstAudioAggregatorPad *aaggpad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
986   gboolean res = TRUE;
987
988   GST_DEBUG_OBJECT (aggpad, "Got %s event on sink pad",
989       GST_EVENT_TYPE_NAME (event));
990
991   switch (GST_EVENT_TYPE (event)) {
992     case GST_EVENT_SEGMENT:
993     {
994       const GstSegment *segment;
995       gst_event_parse_segment (event, &segment);
996
997       if (segment->format != GST_FORMAT_TIME) {
998         GST_ERROR_OBJECT (agg, "Segment of type %s are not supported,"
999             " only TIME segments are supported",
1000             gst_format_get_name (segment->format));
1001         gst_event_unref (event);
1002         event = NULL;
1003         res = FALSE;
1004         break;
1005       }
1006
1007       GST_OBJECT_LOCK (agg);
1008       if (segment->rate != agg->segment.rate) {
1009         GST_ERROR_OBJECT (aggpad,
1010             "Got segment event with wrong rate %lf, expected %lf",
1011             segment->rate, agg->segment.rate);
1012         res = FALSE;
1013         gst_event_unref (event);
1014         event = NULL;
1015       } else if (segment->rate < 0.0) {
1016         GST_ERROR_OBJECT (aggpad, "Negative rates not supported yet");
1017         res = FALSE;
1018         gst_event_unref (event);
1019         event = NULL;
1020       } else {
1021         GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
1022
1023         GST_OBJECT_LOCK (pad);
1024         pad->priv->new_segment = TRUE;
1025         GST_OBJECT_UNLOCK (pad);
1026       }
1027       GST_OBJECT_UNLOCK (agg);
1028
1029       break;
1030     }
1031     case GST_EVENT_CAPS:
1032     {
1033       GstCaps *caps;
1034
1035       gst_event_parse_caps (event, &caps);
1036       GST_INFO_OBJECT (aggpad, "Got caps %" GST_PTR_FORMAT, caps);
1037       res = gst_audio_aggregator_sink_setcaps (aaggpad, agg, caps);
1038       gst_event_unref (event);
1039       event = NULL;
1040       break;
1041     }
1042     default:
1043       break;
1044   }
1045
1046   if (event != NULL)
1047     return
1048         GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_event
1049         (agg, aggpad, event);
1050
1051   return res;
1052 }
1053
1054 static gboolean
1055 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
1056     GstQuery * query)
1057 {
1058   gboolean res = FALSE;
1059
1060   switch (GST_QUERY_TYPE (query)) {
1061     case GST_QUERY_CAPS:
1062     {
1063       GstCaps *filter, *caps;
1064
1065       gst_query_parse_caps (query, &filter);
1066       caps = gst_audio_aggregator_sink_getcaps (GST_PAD (aggpad), agg, filter);
1067       gst_query_set_caps_result (query, caps);
1068       gst_caps_unref (caps);
1069       res = TRUE;
1070       break;
1071     }
1072     default:
1073       res =
1074           GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_query
1075           (agg, aggpad, query);
1076       break;
1077   }
1078
1079   return res;
1080 }
1081
1082
1083 /* FIXME, the duration query should reflect how long you will produce
1084  * data, that is the amount of stream time until you will emit EOS.
1085  *
1086  * For synchronized mixing this is always the max of all the durations
1087  * of upstream since we emit EOS when all of them finished.
1088  *
1089  * We don't do synchronized mixing so this really depends on where the
1090  * streams where punched in and what their relative offsets are against
1091  * eachother which we can get from the first timestamps we see.
1092  *
1093  * When we add a new stream (or remove a stream) the duration might
1094  * also become invalid again and we need to post a new DURATION
1095  * message to notify this fact to the parent.
1096  * For now we take the max of all the upstream elements so the simple
1097  * cases work at least somewhat.
1098  */
1099 static gboolean
1100 gst_audio_aggregator_query_duration (GstAudioAggregator * aagg,
1101     GstQuery * query)
1102 {
1103   gint64 max;
1104   gboolean res;
1105   GstFormat format;
1106   GstIterator *it;
1107   gboolean done;
1108   GValue item = { 0, };
1109
1110   /* parse format */
1111   gst_query_parse_duration (query, &format, NULL);
1112
1113   max = -1;
1114   res = TRUE;
1115   done = FALSE;
1116
1117   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (aagg));
1118   while (!done) {
1119     GstIteratorResult ires;
1120
1121     ires = gst_iterator_next (it, &item);
1122     switch (ires) {
1123       case GST_ITERATOR_DONE:
1124         done = TRUE;
1125         break;
1126       case GST_ITERATOR_OK:
1127       {
1128         GstPad *pad = g_value_get_object (&item);
1129         gint64 duration;
1130
1131         /* ask sink peer for duration */
1132         res &= gst_pad_peer_query_duration (pad, format, &duration);
1133         /* take max from all valid return values */
1134         if (res) {
1135           /* valid unknown length, stop searching */
1136           if (duration == -1) {
1137             max = duration;
1138             done = TRUE;
1139           }
1140           /* else see if bigger than current max */
1141           else if (duration > max)
1142             max = duration;
1143         }
1144         g_value_reset (&item);
1145         break;
1146       }
1147       case GST_ITERATOR_RESYNC:
1148         max = -1;
1149         res = TRUE;
1150         gst_iterator_resync (it);
1151         break;
1152       default:
1153         res = FALSE;
1154         done = TRUE;
1155         break;
1156     }
1157   }
1158   g_value_unset (&item);
1159   gst_iterator_free (it);
1160
1161   if (res) {
1162     /* and store the max */
1163     GST_DEBUG_OBJECT (aagg, "Total duration in format %s: %"
1164         GST_TIME_FORMAT, gst_format_get_name (format), GST_TIME_ARGS (max));
1165     gst_query_set_duration (query, format, max);
1166   }
1167
1168   return res;
1169 }
1170
1171
1172 static gboolean
1173 gst_audio_aggregator_src_query (GstAggregator * agg, GstQuery * query)
1174 {
1175   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1176   gboolean res = FALSE;
1177
1178   switch (GST_QUERY_TYPE (query)) {
1179     case GST_QUERY_DURATION:
1180       res = gst_audio_aggregator_query_duration (aagg, query);
1181       break;
1182     case GST_QUERY_POSITION:
1183     {
1184       GstFormat format;
1185
1186       gst_query_parse_position (query, &format, NULL);
1187
1188       GST_OBJECT_LOCK (aagg);
1189
1190       switch (format) {
1191         case GST_FORMAT_TIME:
1192           gst_query_set_position (query, format,
1193               gst_segment_to_stream_time (&agg->segment, GST_FORMAT_TIME,
1194                   agg->segment.position));
1195           res = TRUE;
1196           break;
1197         case GST_FORMAT_BYTES:
1198           if (GST_AUDIO_INFO_BPF (&aagg->info)) {
1199             gst_query_set_position (query, format, aagg->priv->offset *
1200                 GST_AUDIO_INFO_BPF (&aagg->info));
1201             res = TRUE;
1202           }
1203           break;
1204         case GST_FORMAT_DEFAULT:
1205           gst_query_set_position (query, format, aagg->priv->offset);
1206           res = TRUE;
1207           break;
1208         default:
1209           break;
1210       }
1211
1212       GST_OBJECT_UNLOCK (aagg);
1213
1214       break;
1215     }
1216     default:
1217       res =
1218           GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_query
1219           (agg, query);
1220       break;
1221   }
1222
1223   return res;
1224 }
1225
1226
1227 void
1228 gst_audio_aggregator_set_sink_caps (GstAudioAggregator * aagg,
1229     GstAudioAggregatorPad * pad, GstCaps * caps)
1230 {
1231 #ifndef G_DISABLE_ASSERT
1232   gboolean valid;
1233
1234   GST_OBJECT_LOCK (pad);
1235   valid = gst_audio_info_from_caps (&pad->info, caps);
1236   g_assert (valid);
1237   GST_OBJECT_UNLOCK (pad);
1238 #else
1239   GST_OBJECT_LOCK (pad);
1240   (void) gst_audio_info_from_caps (&pad->info, caps);
1241   GST_OBJECT_UNLOCK (pad);
1242 #endif
1243 }
1244
1245 /* Must hold object lock and aagg lock to call */
1246
1247 static void
1248 gst_audio_aggregator_reset (GstAudioAggregator * aagg)
1249 {
1250   GstAggregator *agg = GST_AGGREGATOR (aagg);
1251
1252   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1253   GST_OBJECT_LOCK (aagg);
1254   agg->segment.position = -1;
1255   aagg->priv->offset = -1;
1256   gst_audio_info_init (&aagg->info);
1257   gst_caps_replace (&aagg->current_caps, NULL);
1258   gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1259   GST_OBJECT_UNLOCK (aagg);
1260   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1261 }
1262
1263 static gboolean
1264 gst_audio_aggregator_start (GstAggregator * agg)
1265 {
1266   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1267
1268   gst_audio_aggregator_reset (aagg);
1269
1270   return TRUE;
1271 }
1272
1273 static gboolean
1274 gst_audio_aggregator_stop (GstAggregator * agg)
1275 {
1276   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1277
1278   gst_audio_aggregator_reset (aagg);
1279
1280   return TRUE;
1281 }
1282
1283 static GstFlowReturn
1284 gst_audio_aggregator_flush (GstAggregator * agg)
1285 {
1286   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1287
1288   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1289   GST_OBJECT_LOCK (aagg);
1290   agg->segment.position = -1;
1291   aagg->priv->offset = -1;
1292   gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1293   GST_OBJECT_UNLOCK (aagg);
1294   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1295
1296   return GST_FLOW_OK;
1297 }
1298
1299 static GstBuffer *
1300 gst_audio_aggregator_do_clip (GstAggregator * agg,
1301     GstAggregatorPad * bpad, GstBuffer * buffer)
1302 {
1303   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (bpad);
1304   gint rate, bpf;
1305
1306   rate = GST_AUDIO_INFO_RATE (&pad->info);
1307   bpf = GST_AUDIO_INFO_BPF (&pad->info);
1308
1309   GST_OBJECT_LOCK (bpad);
1310   buffer = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf);
1311   GST_OBJECT_UNLOCK (bpad);
1312
1313   return buffer;
1314 }
1315
1316 /* Called with the object lock for both the element and pad held,
1317  * as well as the aagg lock
1318  *
1319  * Replace the current buffer with input and update GstAudioAggregatorPadPrivate
1320  * values.
1321  */
1322 static gboolean
1323 gst_audio_aggregator_fill_buffer (GstAudioAggregator * aagg,
1324     GstAudioAggregatorPad * pad)
1325 {
1326   GstAudioAggregatorClass *aaggclass = GST_AUDIO_AGGREGATOR_GET_CLASS (aagg);
1327   GstClockTime start_time, end_time;
1328   gboolean discont = FALSE;
1329   guint64 start_offset, end_offset;
1330   gint rate, bpf;
1331
1332   GstAggregator *agg = GST_AGGREGATOR (aagg);
1333   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1334
1335   if (aaggclass->convert_buffer) {
1336     rate = GST_AUDIO_INFO_RATE (&aagg->info);
1337     bpf = GST_AUDIO_INFO_BPF (&aagg->info);
1338   } else {
1339     rate = GST_AUDIO_INFO_RATE (&pad->info);
1340     bpf = GST_AUDIO_INFO_BPF (&pad->info);
1341   }
1342
1343   pad->priv->position = 0;
1344   pad->priv->size = gst_buffer_get_size (pad->priv->buffer) / bpf;
1345
1346   if (pad->priv->size == 0) {
1347     if (!GST_BUFFER_DURATION_IS_VALID (pad->priv->buffer) ||
1348         !GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_GAP)) {
1349       GST_WARNING_OBJECT (pad, "Dropping 0-sized buffer missing either a"
1350           " duration or a GAP flag: %" GST_PTR_FORMAT, pad->priv->buffer);
1351       return FALSE;
1352     }
1353
1354     pad->priv->size =
1355         gst_util_uint64_scale (GST_BUFFER_DURATION (pad->priv->buffer), rate,
1356         GST_SECOND);
1357   }
1358
1359   if (!GST_BUFFER_PTS_IS_VALID (pad->priv->buffer)) {
1360     if (pad->priv->output_offset == -1)
1361       pad->priv->output_offset = aagg->priv->offset;
1362     if (pad->priv->next_offset == -1)
1363       pad->priv->next_offset = pad->priv->size;
1364     else
1365       pad->priv->next_offset += pad->priv->size;
1366     goto done;
1367   }
1368
1369   start_time = GST_BUFFER_PTS (pad->priv->buffer);
1370   end_time =
1371       start_time + gst_util_uint64_scale_ceil (pad->priv->size, GST_SECOND,
1372       rate);
1373
1374   /* Clipping should've ensured this */
1375   g_assert (start_time >= aggpad->segment.start);
1376
1377   start_offset =
1378       gst_util_uint64_scale (start_time - aggpad->segment.start, rate,
1379       GST_SECOND);
1380   end_offset = start_offset + pad->priv->size;
1381
1382   if (GST_BUFFER_IS_DISCONT (pad->priv->buffer)
1383       || GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_RESYNC)
1384       || pad->priv->new_segment || pad->priv->next_offset == -1) {
1385     discont = TRUE;
1386     pad->priv->new_segment = FALSE;
1387   } else {
1388     guint64 diff, max_sample_diff;
1389
1390     /* Check discont, based on audiobasesink */
1391     if (start_offset <= pad->priv->next_offset)
1392       diff = pad->priv->next_offset - start_offset;
1393     else
1394       diff = start_offset - pad->priv->next_offset;
1395
1396     max_sample_diff =
1397         gst_util_uint64_scale_int (aagg->priv->alignment_threshold, rate,
1398         GST_SECOND);
1399
1400     /* Discont! */
1401     if (G_UNLIKELY (diff >= max_sample_diff)) {
1402       if (aagg->priv->discont_wait > 0) {
1403         if (pad->priv->discont_time == GST_CLOCK_TIME_NONE) {
1404           pad->priv->discont_time = start_time;
1405         } else if (start_time - pad->priv->discont_time >=
1406             aagg->priv->discont_wait) {
1407           discont = TRUE;
1408           pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1409         }
1410       } else {
1411         discont = TRUE;
1412       }
1413     } else if (G_UNLIKELY (pad->priv->discont_time != GST_CLOCK_TIME_NONE)) {
1414       /* we have had a discont, but are now back on track! */
1415       pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1416     }
1417   }
1418
1419   if (discont) {
1420     /* Have discont, need resync */
1421     if (pad->priv->next_offset != -1)
1422       GST_DEBUG_OBJECT (pad, "Have discont. Expected %"
1423           G_GUINT64_FORMAT ", got %" G_GUINT64_FORMAT,
1424           pad->priv->next_offset, start_offset);
1425     pad->priv->output_offset = -1;
1426     pad->priv->next_offset = end_offset;
1427   } else {
1428     pad->priv->next_offset += pad->priv->size;
1429   }
1430
1431   if (pad->priv->output_offset == -1) {
1432     GstClockTime start_running_time;
1433     GstClockTime end_running_time;
1434     GstClockTime segment_pos;
1435     guint64 start_output_offset = -1;
1436     guint64 end_output_offset = -1;
1437
1438     start_running_time =
1439         gst_segment_to_running_time (&aggpad->segment,
1440         GST_FORMAT_TIME, start_time);
1441     end_running_time =
1442         gst_segment_to_running_time (&aggpad->segment,
1443         GST_FORMAT_TIME, end_time);
1444
1445     /* Convert to position in the output segment */
1446     segment_pos =
1447         gst_segment_position_from_running_time (&agg->segment, GST_FORMAT_TIME,
1448         start_running_time);
1449     if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1450       start_output_offset =
1451           gst_util_uint64_scale (segment_pos - agg->segment.start, rate,
1452           GST_SECOND);
1453
1454     segment_pos =
1455         gst_segment_position_from_running_time (&agg->segment, GST_FORMAT_TIME,
1456         end_running_time);
1457     if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1458       end_output_offset =
1459           gst_util_uint64_scale (segment_pos - agg->segment.start, rate,
1460           GST_SECOND);
1461
1462     if (start_output_offset == -1 && end_output_offset == -1) {
1463       /* Outside output segment, drop */
1464       pad->priv->position = 0;
1465       pad->priv->size = 0;
1466       pad->priv->output_offset = -1;
1467       GST_DEBUG_OBJECT (pad, "Buffer outside output segment");
1468       return FALSE;
1469     }
1470
1471     /* Calculate end_output_offset if it was outside the output segment */
1472     if (end_output_offset == -1)
1473       end_output_offset = start_output_offset + pad->priv->size;
1474
1475     if (end_output_offset < aagg->priv->offset) {
1476       pad->priv->position = 0;
1477       pad->priv->size = 0;
1478       pad->priv->output_offset = -1;
1479       GST_DEBUG_OBJECT (pad,
1480           "Buffer before segment or current position: %" G_GUINT64_FORMAT " < %"
1481           G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1482       return FALSE;
1483     }
1484
1485     if (start_output_offset == -1 || start_output_offset < aagg->priv->offset) {
1486       guint diff;
1487
1488       if (start_output_offset == -1 && end_output_offset < pad->priv->size) {
1489         diff = pad->priv->size - end_output_offset + aagg->priv->offset;
1490       } else if (start_output_offset == -1) {
1491         start_output_offset = end_output_offset - pad->priv->size;
1492
1493         if (start_output_offset < aagg->priv->offset)
1494           diff = aagg->priv->offset - start_output_offset;
1495         else
1496           diff = 0;
1497       } else {
1498         diff = aagg->priv->offset - start_output_offset;
1499       }
1500
1501       pad->priv->position += diff;
1502       if (pad->priv->position >= pad->priv->size) {
1503         /* Empty buffer, drop */
1504         pad->priv->position = 0;
1505         pad->priv->size = 0;
1506         pad->priv->output_offset = -1;
1507         GST_DEBUG_OBJECT (pad,
1508             "Buffer before segment or current position: %" G_GUINT64_FORMAT
1509             " < %" G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1510         return FALSE;
1511       }
1512     }
1513
1514     if (start_output_offset == -1 || start_output_offset < aagg->priv->offset)
1515       pad->priv->output_offset = aagg->priv->offset;
1516     else
1517       pad->priv->output_offset = start_output_offset;
1518
1519     GST_DEBUG_OBJECT (pad,
1520         "Buffer resynced: Pad offset %" G_GUINT64_FORMAT
1521         ", current audio aggregator offset %" G_GINT64_FORMAT,
1522         pad->priv->output_offset, aagg->priv->offset);
1523   }
1524
1525 done:
1526
1527   GST_LOG_OBJECT (pad,
1528       "Queued new buffer at offset %" G_GUINT64_FORMAT,
1529       pad->priv->output_offset);
1530
1531   return TRUE;
1532 }
1533
1534 /* Called with pad object lock held */
1535
1536 static gboolean
1537 gst_audio_aggregator_mix_buffer (GstAudioAggregator * aagg,
1538     GstAudioAggregatorPad * pad, GstBuffer * inbuf, GstBuffer * outbuf,
1539     guint blocksize)
1540 {
1541   guint overlap;
1542   guint out_start;
1543   gboolean filled;
1544   guint in_offset;
1545   gboolean pad_changed = FALSE;
1546
1547   /* Overlap => mix */
1548   if (aagg->priv->offset < pad->priv->output_offset)
1549     out_start = pad->priv->output_offset - aagg->priv->offset;
1550   else
1551     out_start = 0;
1552
1553   overlap = pad->priv->size - pad->priv->position;
1554   if (overlap > blocksize - out_start)
1555     overlap = blocksize - out_start;
1556
1557   if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) {
1558     /* skip gap buffer */
1559     GST_LOG_OBJECT (pad, "skipping GAP buffer");
1560     pad->priv->output_offset += pad->priv->size - pad->priv->position;
1561     pad->priv->position = pad->priv->size;
1562
1563     gst_buffer_replace (&pad->priv->buffer, NULL);
1564     gst_buffer_replace (&pad->priv->input_buffer, NULL);
1565     return FALSE;
1566   }
1567
1568   gst_buffer_ref (inbuf);
1569   in_offset = pad->priv->position;
1570   GST_OBJECT_UNLOCK (pad);
1571   GST_OBJECT_UNLOCK (aagg);
1572
1573   filled = GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->aggregate_one_buffer (aagg,
1574       pad, inbuf, in_offset, outbuf, out_start, overlap);
1575
1576   GST_OBJECT_LOCK (aagg);
1577   GST_OBJECT_LOCK (pad);
1578
1579   pad_changed = (inbuf != pad->priv->buffer);
1580   gst_buffer_unref (inbuf);
1581
1582   if (filled)
1583     GST_BUFFER_FLAG_UNSET (outbuf, GST_BUFFER_FLAG_GAP);
1584
1585   if (pad_changed)
1586     return FALSE;
1587
1588   pad->priv->position += overlap;
1589   pad->priv->output_offset += overlap;
1590
1591   if (pad->priv->position == pad->priv->size) {
1592     /* Buffer done, drop it */
1593     gst_buffer_replace (&pad->priv->buffer, NULL);
1594     gst_buffer_replace (&pad->priv->input_buffer, NULL);
1595     GST_LOG_OBJECT (pad, "Finished mixing buffer, waiting for next");
1596     return FALSE;
1597   }
1598
1599   return TRUE;
1600 }
1601
1602 static GstBuffer *
1603 gst_audio_aggregator_create_output_buffer (GstAudioAggregator * aagg,
1604     guint num_frames)
1605 {
1606   GstAllocator *allocator;
1607   GstAllocationParams params;
1608   GstBuffer *outbuf;
1609   GstMapInfo outmap;
1610
1611   gst_aggregator_get_allocator (GST_AGGREGATOR (aagg), &allocator, &params);
1612
1613   GST_DEBUG ("Creating output buffer with size %d",
1614       num_frames * GST_AUDIO_INFO_BPF (&aagg->info));
1615
1616   outbuf = gst_buffer_new_allocate (allocator, num_frames *
1617       GST_AUDIO_INFO_BPF (&aagg->info), &params);
1618
1619   if (allocator)
1620     gst_object_unref (allocator);
1621
1622   gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE);
1623   gst_audio_format_fill_silence (aagg->info.finfo, outmap.data, outmap.size);
1624   gst_buffer_unmap (outbuf, &outmap);
1625
1626   return outbuf;
1627 }
1628
1629 static gboolean
1630 sync_pad_values (GstElement * aagg, GstPad * pad, gpointer user_data)
1631 {
1632   GstAudioAggregatorPad *aapad = GST_AUDIO_AGGREGATOR_PAD (pad);
1633   GstAggregatorPad *bpad = GST_AGGREGATOR_PAD_CAST (pad);
1634   GstClockTime timestamp, stream_time;
1635
1636   if (aapad->priv->buffer == NULL)
1637     return TRUE;
1638
1639   timestamp = GST_BUFFER_PTS (aapad->priv->buffer);
1640   GST_OBJECT_LOCK (bpad);
1641   stream_time = gst_segment_to_stream_time (&bpad->segment, GST_FORMAT_TIME,
1642       timestamp);
1643   GST_OBJECT_UNLOCK (bpad);
1644
1645   /* sync object properties on stream time */
1646   /* TODO: Ideally we would want to do that on every sample */
1647   if (GST_CLOCK_TIME_IS_VALID (stream_time))
1648     gst_object_sync_values (GST_OBJECT_CAST (pad), stream_time);
1649
1650   return TRUE;
1651 }
1652
1653 static GstFlowReturn
1654 gst_audio_aggregator_aggregate (GstAggregator * agg, gboolean timeout)
1655 {
1656   /* Calculate the current output offset/timestamp and offset_end/timestamp_end.
1657    * Allocate a silence buffer for this and store it.
1658    *
1659    * For all pads:
1660    * 1) Once per input buffer (cached)
1661    *   1) Check discont (flag and timestamp with tolerance)
1662    *   2) If discont or new, resync. That means:
1663    *     1) Drop all start data of the buffer that comes before
1664    *        the current position/offset.
1665    *     2) Calculate the offset (output segment!) that the first
1666    *        frame of the input buffer corresponds to. Base this on
1667    *        the running time.
1668    *
1669    * 2) If the current pad's offset/offset_end overlaps with the output
1670    *    offset/offset_end, mix it at the appropiate position in the output
1671    *    buffer and advance the pad's position. Remember if this pad needs
1672    *    a new buffer to advance behind the output offset_end.
1673    *
1674    * If we had no pad with a buffer, go EOS.
1675    *
1676    * If we had at least one pad that did not advance behind output
1677    * offset_end, let aggregate be called again for the current
1678    * output offset/offset_end.
1679    */
1680   GstElement *element;
1681   GstAudioAggregator *aagg;
1682   GList *iter;
1683   GstFlowReturn ret;
1684   GstBuffer *outbuf = NULL;
1685   gint64 next_offset;
1686   gint64 next_timestamp;
1687   gint rate, bpf;
1688   gboolean dropped = FALSE;
1689   gboolean is_eos = TRUE;
1690   gboolean is_done = TRUE;
1691   guint blocksize;
1692
1693   element = GST_ELEMENT (agg);
1694   aagg = GST_AUDIO_AGGREGATOR (agg);
1695
1696   /* Sync pad properties to the stream time */
1697   gst_element_foreach_sink_pad (element, sync_pad_values, NULL);
1698
1699   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1700   GST_OBJECT_LOCK (agg);
1701
1702   /* Update position from the segment start/stop if needed */
1703   if (agg->segment.position == -1) {
1704     if (agg->segment.rate > 0.0)
1705       agg->segment.position = agg->segment.start;
1706     else
1707       agg->segment.position = agg->segment.stop;
1708   }
1709
1710   if (G_UNLIKELY (aagg->info.finfo->format == GST_AUDIO_FORMAT_UNKNOWN)) {
1711     if (timeout) {
1712       GST_DEBUG_OBJECT (aagg,
1713           "Got timeout before receiving any caps, don't output anything");
1714
1715       /* Advance position */
1716       if (agg->segment.rate > 0.0)
1717         agg->segment.position += aagg->priv->output_buffer_duration;
1718       else if (agg->segment.position > aagg->priv->output_buffer_duration)
1719         agg->segment.position -= aagg->priv->output_buffer_duration;
1720       else
1721         agg->segment.position = 0;
1722
1723       GST_OBJECT_UNLOCK (agg);
1724       GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1725       return GST_AGGREGATOR_FLOW_NEED_DATA;
1726     } else {
1727       GST_OBJECT_UNLOCK (agg);
1728       goto not_negotiated;
1729     }
1730   }
1731
1732   rate = GST_AUDIO_INFO_RATE (&aagg->info);
1733   bpf = GST_AUDIO_INFO_BPF (&aagg->info);
1734
1735   if (aagg->priv->offset == -1) {
1736     aagg->priv->offset =
1737         gst_util_uint64_scale (agg->segment.position - agg->segment.start, rate,
1738         GST_SECOND);
1739     GST_DEBUG_OBJECT (aagg, "Starting at offset %" G_GINT64_FORMAT,
1740         aagg->priv->offset);
1741   }
1742
1743   blocksize = gst_util_uint64_scale (aagg->priv->output_buffer_duration,
1744       rate, GST_SECOND);
1745   blocksize = MAX (1, blocksize);
1746
1747   /* FIXME: Reverse mixing does not work at all yet */
1748   if (agg->segment.rate > 0.0) {
1749     next_offset = aagg->priv->offset + blocksize;
1750   } else {
1751     next_offset = aagg->priv->offset - blocksize;
1752   }
1753
1754   /* Use the sample counter, which will never accumulate rounding errors */
1755   next_timestamp =
1756       agg->segment.start + gst_util_uint64_scale (next_offset, GST_SECOND,
1757       rate);
1758
1759   if (aagg->priv->current_buffer == NULL) {
1760     GST_OBJECT_UNLOCK (agg);
1761     aagg->priv->current_buffer =
1762         GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->create_output_buffer (aagg,
1763         blocksize);
1764     /* Be careful, some things could have changed ? */
1765     GST_OBJECT_LOCK (agg);
1766     GST_BUFFER_FLAG_SET (aagg->priv->current_buffer, GST_BUFFER_FLAG_GAP);
1767   }
1768   outbuf = aagg->priv->current_buffer;
1769
1770   GST_LOG_OBJECT (agg,
1771       "Starting to mix %u samples for offset %" G_GINT64_FORMAT
1772       " with timestamp %" GST_TIME_FORMAT, blocksize,
1773       aagg->priv->offset, GST_TIME_ARGS (agg->segment.position));
1774
1775   for (iter = element->sinkpads; iter; iter = iter->next) {
1776     GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) iter->data;
1777     GstAggregatorPad *aggpad = (GstAggregatorPad *) iter->data;
1778     gboolean pad_eos = gst_aggregator_pad_is_eos (aggpad);
1779
1780     if (!pad_eos)
1781       is_eos = FALSE;
1782
1783     pad->priv->input_buffer = gst_aggregator_pad_peek_buffer (aggpad);
1784
1785     GST_OBJECT_LOCK (pad);
1786     if (!pad->priv->input_buffer) {
1787       if (timeout) {
1788         if (pad->priv->output_offset < next_offset) {
1789           gint64 diff = next_offset - pad->priv->output_offset;
1790           GST_DEBUG_OBJECT (pad, "Timeout, missing %" G_GINT64_FORMAT
1791               " frames (%" GST_TIME_FORMAT ")", diff,
1792               GST_TIME_ARGS (gst_util_uint64_scale (diff, GST_SECOND,
1793                       GST_AUDIO_INFO_RATE (&aagg->info))));
1794         }
1795       } else if (!pad_eos) {
1796         is_done = FALSE;
1797       }
1798       GST_OBJECT_UNLOCK (pad);
1799       continue;
1800     }
1801
1802     /* New buffer? */
1803     if (!pad->priv->buffer) {
1804       if (GST_IS_AUDIO_AGGREGATOR_CONVERT_PAD (pad))
1805         pad->priv->buffer =
1806             gst_audio_aggregator_convert_buffer
1807             (aagg, GST_PAD (pad), &pad->info, &aagg->info,
1808             pad->priv->input_buffer);
1809       else
1810         pad->priv->buffer = gst_buffer_ref (pad->priv->input_buffer);
1811
1812       if (!gst_audio_aggregator_fill_buffer (aagg, pad)) {
1813         gst_buffer_replace (&pad->priv->buffer, NULL);
1814         gst_buffer_replace (&pad->priv->input_buffer, NULL);
1815         pad->priv->buffer = NULL;
1816         dropped = TRUE;
1817         GST_OBJECT_UNLOCK (pad);
1818
1819         gst_aggregator_pad_drop_buffer (aggpad);
1820         continue;
1821       }
1822     } else {
1823       gst_buffer_unref (pad->priv->input_buffer);
1824     }
1825
1826     if (!pad->priv->buffer && !dropped && pad_eos) {
1827       GST_DEBUG_OBJECT (aggpad, "Pad is in EOS state");
1828       GST_OBJECT_UNLOCK (pad);
1829       continue;
1830     }
1831
1832     g_assert (pad->priv->buffer);
1833
1834     /* This pad is lagging behind, we need to update the offset
1835      * and maybe drop the current buffer */
1836     if (pad->priv->output_offset < aagg->priv->offset) {
1837       gint64 diff = aagg->priv->offset - pad->priv->output_offset;
1838       gint64 odiff = diff;
1839
1840       if (pad->priv->position + diff > pad->priv->size)
1841         diff = pad->priv->size - pad->priv->position;
1842       pad->priv->position += diff;
1843       pad->priv->output_offset += diff;
1844
1845       if (pad->priv->position == pad->priv->size) {
1846         GST_DEBUG_OBJECT (pad, "Buffer was late by %" GST_TIME_FORMAT
1847             ", dropping %" GST_PTR_FORMAT,
1848             GST_TIME_ARGS (gst_util_uint64_scale (odiff, GST_SECOND,
1849                     GST_AUDIO_INFO_RATE (&aagg->info))), pad->priv->buffer);
1850         /* Buffer done, drop it */
1851         gst_buffer_replace (&pad->priv->buffer, NULL);
1852         gst_buffer_replace (&pad->priv->input_buffer, NULL);
1853         dropped = TRUE;
1854         GST_OBJECT_UNLOCK (pad);
1855         gst_aggregator_pad_drop_buffer (aggpad);
1856         continue;
1857       }
1858     }
1859
1860     g_assert (pad->priv->buffer);
1861
1862     if (pad->priv->output_offset >= aagg->priv->offset
1863         && pad->priv->output_offset < aagg->priv->offset + blocksize) {
1864       gboolean drop_buf;
1865
1866       GST_LOG_OBJECT (aggpad, "Mixing buffer for current offset");
1867       drop_buf = !gst_audio_aggregator_mix_buffer (aagg, pad, pad->priv->buffer,
1868           outbuf, blocksize);
1869       if (pad->priv->output_offset >= next_offset) {
1870         GST_LOG_OBJECT (pad,
1871             "Pad is at or after current offset: %" G_GUINT64_FORMAT " >= %"
1872             G_GINT64_FORMAT, pad->priv->output_offset, next_offset);
1873       } else {
1874         is_done = FALSE;
1875       }
1876       if (drop_buf) {
1877         GST_OBJECT_UNLOCK (pad);
1878         gst_aggregator_pad_drop_buffer (aggpad);
1879         continue;
1880       }
1881     }
1882
1883     GST_OBJECT_UNLOCK (pad);
1884   }
1885   GST_OBJECT_UNLOCK (agg);
1886
1887   if (dropped) {
1888     /* We dropped a buffer, retry */
1889     GST_LOG_OBJECT (aagg, "A pad dropped a buffer, wait for the next one");
1890     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1891     return GST_AGGREGATOR_FLOW_NEED_DATA;
1892   }
1893
1894   if (!is_done && !is_eos) {
1895     /* Get more buffers */
1896     GST_LOG_OBJECT (aagg,
1897         "We're not done yet for the current offset, waiting for more data");
1898     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1899     return GST_AGGREGATOR_FLOW_NEED_DATA;
1900   }
1901
1902   if (is_eos) {
1903     gint64 max_offset = 0;
1904
1905     GST_DEBUG_OBJECT (aagg, "We're EOS");
1906
1907     GST_OBJECT_LOCK (agg);
1908     for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) {
1909       GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
1910
1911       max_offset = MAX ((gint64) max_offset, (gint64) pad->priv->output_offset);
1912     }
1913     GST_OBJECT_UNLOCK (agg);
1914
1915     /* This means EOS or nothing mixed in at all */
1916     if (aagg->priv->offset == max_offset) {
1917       gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1918       GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1919       return GST_FLOW_EOS;
1920     }
1921
1922     if (max_offset <= next_offset) {
1923       GST_DEBUG_OBJECT (aagg,
1924           "Last buffer is incomplete: %" G_GUINT64_FORMAT " <= %"
1925           G_GINT64_FORMAT, max_offset, next_offset);
1926       next_offset = max_offset;
1927       next_timestamp =
1928           agg->segment.start + gst_util_uint64_scale (next_offset, GST_SECOND,
1929           rate);
1930
1931       if (next_offset > aagg->priv->offset)
1932         gst_buffer_resize (outbuf, 0, (next_offset - aagg->priv->offset) * bpf);
1933     }
1934   }
1935
1936   /* set timestamps on the output buffer */
1937   GST_OBJECT_LOCK (agg);
1938   if (agg->segment.rate > 0.0) {
1939     GST_BUFFER_PTS (outbuf) = agg->segment.position;
1940     GST_BUFFER_OFFSET (outbuf) = aagg->priv->offset;
1941     GST_BUFFER_OFFSET_END (outbuf) = next_offset;
1942     GST_BUFFER_DURATION (outbuf) = next_timestamp - agg->segment.position;
1943   } else {
1944     GST_BUFFER_PTS (outbuf) = next_timestamp;
1945     GST_BUFFER_OFFSET (outbuf) = next_offset;
1946     GST_BUFFER_OFFSET_END (outbuf) = aagg->priv->offset;
1947     GST_BUFFER_DURATION (outbuf) = agg->segment.position - next_timestamp;
1948   }
1949
1950   GST_OBJECT_UNLOCK (agg);
1951
1952   /* send it out */
1953   GST_LOG_OBJECT (aagg,
1954       "pushing outbuf %p, timestamp %" GST_TIME_FORMAT " offset %"
1955       G_GINT64_FORMAT, outbuf, GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)),
1956       GST_BUFFER_OFFSET (outbuf));
1957
1958   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1959
1960   ret = gst_aggregator_finish_buffer (agg, outbuf);
1961   aagg->priv->current_buffer = NULL;
1962
1963   GST_LOG_OBJECT (aagg, "pushed outbuf, result = %s", gst_flow_get_name (ret));
1964
1965   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1966   GST_OBJECT_LOCK (agg);
1967   aagg->priv->offset = next_offset;
1968   agg->segment.position = next_timestamp;
1969
1970   /* If there was a timeout and there was a gap in data in out of the streams,
1971    * then it's a very good time to for a resync with the timestamps.
1972    */
1973   if (timeout) {
1974     for (iter = element->sinkpads; iter; iter = iter->next) {
1975       GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
1976
1977       GST_OBJECT_LOCK (pad);
1978       if (pad->priv->output_offset < aagg->priv->offset)
1979         pad->priv->output_offset = -1;
1980       GST_OBJECT_UNLOCK (pad);
1981     }
1982   }
1983   GST_OBJECT_UNLOCK (agg);
1984   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1985
1986   return ret;
1987   /* ERRORS */
1988 not_negotiated:
1989   {
1990     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1991     GST_ELEMENT_ERROR (aagg, STREAM, FORMAT, (NULL),
1992         ("Unknown data received, not negotiated"));
1993     return GST_FLOW_NOT_NEGOTIATED;
1994   }
1995 }