gst/adder/gstadder.c: Correctly resync the iterator if gst_iterator_next() returns
[platform/upstream/gstreamer.git] / gst / adder / gstadder.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2001 Thomas <thomas@apestaart.org>
4  *               2005,2006 Wim Taymans <wim@fluendo.com>
5  *
6  * adder.c: Adder element, N in, one out, samples are added
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23 /**
24  * SECTION:element-adder
25  *
26  * <refsect2>
27  * <para>
28  * The Adder allows to mix several streams into one by adding the data.
29  * Mixed data is clamped to the min/max values of the data format.
30  * </para>
31  * <title>Example launch line</title>
32  * <para>
33  * <programlisting>
34  * gst-launch audiotestsrc freq=100 ! adder name=mix ! audioconvert ! alsasink audiotestsrc freq=500 ! mix.
35  * </programlisting>
36  * This pipeline produces two sine waves mixed together.
37  * </para>
38  * <para>
39  * The Adder currently mixes all data received on the sinkpads as soon as possible
40  * without trying to synchronize the streams.
41  * </para>
42  * </refsect2>
43  *
44  * Last reviewed on 2006-05-09 (0.10.7)
45  */
46 /* Element-Checklist-Version: 5 */
47
48 #ifdef HAVE_CONFIG_H
49 #include "config.h"
50 #endif
51 #include "gstadder.h"
52 #include <gst/audio/audio.h>
53 #include <string.h>             /* strcmp */
54
55 /* highest positive/lowest negative x-bit value we can use for clamping */
56 #define MAX_INT_32  ((gint32) (0x7fffffff))
57 #define MAX_INT_16  ((gint16) (0x7fff))
58 #define MAX_INT_8   ((gint8)  (0x7f))
59 #define MAX_UINT_32 ((guint32)(0xffffffff))
60 #define MAX_UINT_16 ((guint16)(0xffff))
61 #define MAX_UINT_8  ((guint8) (0xff))
62
63 #define MIN_INT_32  ((gint32) (0x80000000))
64 #define MIN_INT_16  ((gint16) (0x8000))
65 #define MIN_INT_8   ((gint8)  (0x80))
66 #define MIN_UINT_32 ((guint32)(0x00000000))
67 #define MIN_UINT_16 ((guint16)(0x0000))
68 #define MIN_UINT_8  ((guint8) (0x00))
69
70 #define GST_CAT_DEFAULT gst_adder_debug
71 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
72
73 /* elementfactory information */
74
75 #define CAPS \
76   "audio/x-raw-int, " \
77   "rate = (int) [ 1, MAX ], " \
78   "channels = (int) [ 1, MAX ], " \
79   "endianness = (int) BYTE_ORDER, " \
80   "width = (int) 32, " \
81   "depth = (int) 32, " \
82   "signed = (boolean) { true, false } ;" \
83   "audio/x-raw-int, " \
84   "rate = (int) [ 1, MAX ], " \
85   "channels = (int) [ 1, MAX ], " \
86   "endianness = (int) BYTE_ORDER, " \
87   "width = (int) 16, " \
88   "depth = (int) 16, " \
89   "signed = (boolean) { true, false } ;" \
90   "audio/x-raw-int, " \
91   "rate = (int) [ 1, MAX ], " \
92   "channels = (int) [ 1, MAX ], " \
93   "endianness = (int) BYTE_ORDER, " \
94   "width = (int) 8, " \
95   "depth = (int) 8, " \
96   "signed = (boolean) { true, false } ;" \
97   "audio/x-raw-float, " \
98   "rate = (int) [ 1, MAX ], " \
99   "channels = (int) [ 1, MAX ], " \
100   "endianness = (int) BYTE_ORDER, " \
101   "width = (int) { 32, 64 }"
102
103 static GstStaticPadTemplate gst_adder_src_template =
104 GST_STATIC_PAD_TEMPLATE ("src",
105     GST_PAD_SRC,
106     GST_PAD_ALWAYS,
107     GST_STATIC_CAPS (CAPS)
108     );
109
110 static GstStaticPadTemplate gst_adder_sink_template =
111 GST_STATIC_PAD_TEMPLATE ("sink%d",
112     GST_PAD_SINK,
113     GST_PAD_REQUEST,
114     GST_STATIC_CAPS (CAPS)
115     );
116
117 static void gst_adder_class_init (GstAdderClass * klass);
118 static void gst_adder_init (GstAdder * adder);
119 static void gst_adder_finalize (GObject * object);
120
121 static gboolean gst_adder_setcaps (GstPad * pad, GstCaps * caps);
122 static gboolean gst_adder_query (GstPad * pad, GstQuery * query);
123 static gboolean gst_adder_src_event (GstPad * pad, GstEvent * event);
124 static gboolean gst_adder_sink_event (GstPad * pad, GstEvent * event);
125
126 static GstPad *gst_adder_request_new_pad (GstElement * element,
127     GstPadTemplate * temp, const gchar * unused);
128 static void gst_adder_release_pad (GstElement * element, GstPad * pad);
129
130 static GstStateChangeReturn gst_adder_change_state (GstElement * element,
131     GstStateChange transition);
132
133 static GstFlowReturn gst_adder_collected (GstCollectPads * pads,
134     gpointer user_data);
135
136 static GstElementClass *parent_class = NULL;
137
138 GType
139 gst_adder_get_type (void)
140 {
141   static GType adder_type = 0;
142
143   if (G_UNLIKELY (adder_type == 0)) {
144     static const GTypeInfo adder_info = {
145       sizeof (GstAdderClass), NULL, NULL,
146       (GClassInitFunc) gst_adder_class_init, NULL, NULL,
147       sizeof (GstAdder), 0,
148       (GInstanceInitFunc) gst_adder_init,
149     };
150
151     adder_type = g_type_register_static (GST_TYPE_ELEMENT, "GstAdder",
152         &adder_info, 0);
153     GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "adder", 0,
154         "audio channel mixing element");
155   }
156   return adder_type;
157 }
158
159 /* clipping versions */
160 #define MAKE_FUNC(name,type,ttype,min,max)                      \
161 static void name (type *out, type *in, gint bytes) {            \
162   gint i;                                                       \
163   for (i = 0; i < bytes / sizeof (type); i++)                   \
164     out[i] = CLAMP ((ttype)out[i] + (ttype)in[i], min, max);    \
165 }
166
167 /* non-clipping versions (for float) */
168 #define MAKE_FUNC_NC(name,type,ttype)                           \
169 static void name (type *out, type *in, gint bytes) {            \
170   gint i;                                                       \
171   for (i = 0; i < bytes / sizeof (type); i++)                   \
172     out[i] = (ttype)out[i] + (ttype)in[i];                      \
173 }
174
175 /* *INDENT-OFF* */
176 MAKE_FUNC (add_int32, gint32, gint64, MIN_INT_32, MAX_INT_32)
177 MAKE_FUNC (add_int16, gint16, gint32, MIN_INT_16, MAX_INT_16)
178 MAKE_FUNC (add_int8, gint8, gint16, MIN_INT_8, MAX_INT_8)
179 MAKE_FUNC (add_uint32, guint32, guint64, MIN_UINT_32, MAX_UINT_32)
180 MAKE_FUNC (add_uint16, guint16, guint32, MIN_UINT_16, MAX_UINT_16)
181 MAKE_FUNC (add_uint8, guint8, guint16, MIN_UINT_8, MAX_UINT_8)
182 MAKE_FUNC_NC (add_float64, gdouble, gdouble)
183 MAKE_FUNC_NC (add_float32, gfloat, gfloat)
184 /* *INDENT-ON* */
185
186 /* we can only accept caps that we and downstream can handle. */
187 static GstCaps *
188 gst_adder_sink_getcaps (GstPad * pad)
189 {
190   GstAdder *adder;
191   GstCaps *result, *peercaps, *sinkcaps;
192
193   adder = GST_ADDER (GST_PAD_PARENT (pad));
194
195   GST_OBJECT_LOCK (adder);
196   /* get the downstream possible caps */
197   peercaps = gst_pad_peer_get_caps (adder->srcpad);
198   /* get the allowed caps on this sinkpad, we use the fixed caps function so
199    * that it does not call recursively in this function. */
200   sinkcaps = gst_pad_get_fixed_caps_func (pad);
201   if (peercaps) {
202     /* if the peer has caps, intersect */
203     GST_DEBUG_OBJECT (adder, "intersecting peer and template caps");
204     result = gst_caps_intersect (peercaps, sinkcaps);
205     gst_caps_unref (peercaps);
206     gst_caps_unref (sinkcaps);
207   } else {
208     /* the peer has no caps (or there is no peer), just use the allowed caps
209      * of this sinkpad. */
210     GST_DEBUG_OBJECT (adder, "no peer caps, using sinkcaps");
211     result = sinkcaps;
212   }
213   GST_OBJECT_UNLOCK (adder);
214
215   return result;
216 }
217
218 /* the first caps we receive on any of the sinkpads will define the caps for all
219  * the other sinkpads because we can only mix streams with the same caps.
220  * */
221 static gboolean
222 gst_adder_setcaps (GstPad * pad, GstCaps * caps)
223 {
224   GstAdder *adder;
225   GList *pads;
226   GstStructure *structure;
227   const char *media_type;
228
229   adder = GST_ADDER (GST_PAD_PARENT (pad));
230
231   GST_LOG_OBJECT (adder, "setting caps on pad %p,%s to %" GST_PTR_FORMAT, pad,
232       GST_PAD_NAME (pad), caps);
233
234   /* FIXME, see if the other pads can accept the format. Also lock the
235    * format on the other pads to this new format. */
236   GST_OBJECT_LOCK (adder);
237   pads = GST_ELEMENT (adder)->pads;
238   while (pads) {
239     GstPad *otherpad = GST_PAD (pads->data);
240
241     if (otherpad != pad) {
242       gst_caps_replace (&GST_PAD_CAPS (otherpad), caps);
243     }
244     pads = g_list_next (pads);
245   }
246   GST_OBJECT_UNLOCK (adder);
247
248   /* parse caps now */
249   structure = gst_caps_get_structure (caps, 0);
250   media_type = gst_structure_get_name (structure);
251   if (strcmp (media_type, "audio/x-raw-int") == 0) {
252     GST_DEBUG_OBJECT (adder, "parse_caps sets adder to format int");
253     adder->format = GST_ADDER_FORMAT_INT;
254     gst_structure_get_int (structure, "width", &adder->width);
255     gst_structure_get_int (structure, "depth", &adder->depth);
256     gst_structure_get_int (structure, "endianness", &adder->endianness);
257     gst_structure_get_boolean (structure, "signed", &adder->is_signed);
258
259     if (adder->endianness != G_BYTE_ORDER)
260       goto not_supported;
261
262     switch (adder->width) {
263       case 8:
264         adder->func = (adder->is_signed ?
265             (GstAdderFunction) add_int8 : (GstAdderFunction) add_uint8);
266         break;
267       case 16:
268         adder->func = (adder->is_signed ?
269             (GstAdderFunction) add_int16 : (GstAdderFunction) add_uint16);
270         break;
271       case 32:
272         adder->func = (adder->is_signed ?
273             (GstAdderFunction) add_int32 : (GstAdderFunction) add_uint32);
274         break;
275       default:
276         goto not_supported;
277     }
278   } else if (strcmp (media_type, "audio/x-raw-float") == 0) {
279     GST_DEBUG_OBJECT (adder, "parse_caps sets adder to format float");
280     adder->format = GST_ADDER_FORMAT_FLOAT;
281     gst_structure_get_int (structure, "width", &adder->width);
282     gst_structure_get_int (structure, "endianness", &adder->endianness);
283
284     if (adder->endianness != G_BYTE_ORDER)
285       goto not_supported;
286
287     switch (adder->width) {
288       case 32:
289         adder->func = (GstAdderFunction) add_float32;
290         break;
291       case 64:
292         adder->func = (GstAdderFunction) add_float64;
293         break;
294       default:
295         goto not_supported;
296     }
297   } else {
298     goto not_supported;
299   }
300
301   gst_structure_get_int (structure, "channels", &adder->channels);
302   gst_structure_get_int (structure, "rate", &adder->rate);
303   /* precalc bps */
304   adder->bps = (adder->width / 8) * adder->channels;
305
306   return TRUE;
307
308   /* ERRORS */
309 not_supported:
310   {
311     GST_DEBUG_OBJECT (adder, "unsupported format set as caps");
312     return FALSE;
313   }
314 }
315
316 /* FIXME, the duration query should reflect how long you will produce
317  * data, that is the amount of stream time until you will emit EOS.
318  *
319  * For synchronized mixing this is always the max of all the durations
320  * of upstream since we emit EOS when all of them finished.
321  *
322  * We don't do synchronized mixing so this really depends on where the
323  * streams where punched in and what their relative offsets are against
324  * eachother which we can get from the first timestamps we see.
325  *
326  * When we add a new stream (or remove a stream) the duration might
327  * also become invalid again and we need to post a new DURATION
328  * message to notify this fact to the parent.
329  * For now we take the max of all the upstream elements so the simple
330  * cases work at least somewhat.
331  */
332 static gboolean
333 gst_adder_query_duration (GstAdder * adder, GstQuery * query)
334 {
335   gint64 max;
336   gboolean res;
337   GstFormat format;
338   GstIterator *it;
339   gboolean done;
340
341   /* parse format */
342   gst_query_parse_duration (query, &format, NULL);
343
344   max = -1;
345   res = TRUE;
346   done = FALSE;
347
348   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (adder));
349   while (!done) {
350     GstIteratorResult ires;
351     gpointer item;
352
353     ires = gst_iterator_next (it, &item);
354     switch (ires) {
355       case GST_ITERATOR_DONE:
356         done = TRUE;
357         break;
358       case GST_ITERATOR_OK:
359       {
360         GstPad *pad = GST_PAD_CAST (item);
361         gint64 duration;
362
363         /* ask sink peer for duration */
364         res &= gst_pad_query_peer_duration (pad, &format, &duration);
365         /* take max from all valid return values */
366         if (res) {
367           /* valid unknown length, stop searching */
368           if (duration == -1) {
369             max = duration;
370             done = TRUE;
371           }
372           /* else see if bigger than current max */
373           else if (duration > max)
374             max = duration;
375         }
376         break;
377       }
378       case GST_ITERATOR_RESYNC:
379         max = -1;
380         res = TRUE;
381         gst_iterator_resync (it);
382         break;
383       default:
384         res = FALSE;
385         done = TRUE;
386         break;
387     }
388   }
389   gst_iterator_free (it);
390
391   if (res) {
392     /* and store the max */
393     gst_query_set_duration (query, format, max);
394   }
395
396   return res;
397 }
398
399 static gboolean
400 gst_adder_query (GstPad * pad, GstQuery * query)
401 {
402   GstAdder *adder = GST_ADDER (gst_pad_get_parent (pad));
403   gboolean res = FALSE;
404
405   switch (GST_QUERY_TYPE (query)) {
406     case GST_QUERY_POSITION:
407     {
408       GstFormat format;
409
410       gst_query_parse_position (query, &format, NULL);
411
412       switch (format) {
413         case GST_FORMAT_TIME:
414           /* FIXME, bring to stream time, might be tricky */
415           gst_query_set_position (query, format, adder->timestamp);
416           res = TRUE;
417           break;
418         case GST_FORMAT_DEFAULT:
419           gst_query_set_position (query, format, adder->offset);
420           res = TRUE;
421           break;
422         default:
423           break;
424       }
425       break;
426     }
427     case GST_QUERY_DURATION:
428       res = gst_adder_query_duration (adder, query);
429       break;
430     default:
431       /* FIXME, needs a custom query handler because we have multiple
432        * sinkpads */
433       res = gst_pad_query_default (pad, query);
434       break;
435   }
436
437   gst_object_unref (adder);
438   return res;
439 }
440
441 static gboolean
442 forward_event_func (GstPad * pad, GValue * ret, GstEvent * event)
443 {
444   gst_event_ref (event);
445   GST_LOG_OBJECT (pad, "About to send event %s", GST_EVENT_TYPE_NAME (event));
446   if (!gst_pad_push_event (pad, event)) {
447     g_value_set_boolean (ret, FALSE);
448     GST_WARNING_OBJECT (pad, "Sending event  %p (%s) failed.",
449         event, GST_EVENT_TYPE_NAME (event));
450   } else {
451     GST_LOG_OBJECT (pad, "Sent event  %p (%s).",
452         event, GST_EVENT_TYPE_NAME (event));
453   }
454   gst_object_unref (pad);
455   return TRUE;
456 }
457
458 /* forwards the event to all sinkpads, takes ownership of the
459  * event
460  *
461  * Returns: TRUE if the event could be forwarded on all
462  * sinkpads.
463  */
464 static gboolean
465 forward_event (GstAdder * adder, GstEvent * event)
466 {
467   gboolean ret;
468   GstIterator *it;
469   GValue vret = { 0 };
470
471   GST_LOG_OBJECT (adder, "Forwarding event %p (%s)", event,
472       GST_EVENT_TYPE_NAME (event));
473
474   ret = TRUE;
475
476   g_value_init (&vret, G_TYPE_BOOLEAN);
477   g_value_set_boolean (&vret, TRUE);
478   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (adder));
479   gst_iterator_fold (it, (GstIteratorFoldFunction) forward_event_func, &vret,
480       event);
481   gst_iterator_free (it);
482   gst_event_unref (event);
483
484   ret = g_value_get_boolean (&vret);
485
486   return ret;
487 }
488
489 static gboolean
490 gst_adder_src_event (GstPad * pad, GstEvent * event)
491 {
492   GstAdder *adder;
493   gboolean result;
494
495   adder = GST_ADDER (gst_pad_get_parent (pad));
496
497   switch (GST_EVENT_TYPE (event)) {
498     case GST_EVENT_QOS:
499       /* QoS might be tricky */
500       result = FALSE;
501       break;
502     case GST_EVENT_SEEK:
503     {
504       GstSeekFlags flags;
505       GstSeekType curtype;
506       gint64 cur;
507
508       /* parse the seek parameters */
509       gst_event_parse_seek (event, &adder->segment_rate, NULL, &flags, &curtype,
510           &cur, NULL, NULL);
511
512       /* check if we are flushing */
513       if (flags & GST_SEEK_FLAG_FLUSH) {
514         /* make sure we accept nothing anymore and return WRONG_STATE */
515         gst_collect_pads_set_flushing (adder->collect, TRUE);
516
517         /* flushing seek, start flush downstream, the flush will be done
518          * when all pads received a FLUSH_STOP. */
519         gst_pad_push_event (adder->srcpad, gst_event_new_flush_start ());
520       }
521
522       /* now wait for the collected to be finished and mark a new
523        * segment */
524       GST_OBJECT_LOCK (adder->collect);
525       if (curtype == GST_SEEK_TYPE_SET)
526         adder->segment_position = cur;
527       else
528         adder->segment_position = 0;
529       adder->segment_pending = TRUE;
530       GST_OBJECT_UNLOCK (adder->collect);
531
532       result = forward_event (adder, event);
533       break;
534     }
535     case GST_EVENT_NAVIGATION:
536       /* navigation is rather pointless. */
537       result = FALSE;
538       break;
539     default:
540       /* just forward the rest for now */
541       result = forward_event (adder, event);
542       break;
543   }
544   gst_object_unref (adder);
545
546   return result;
547 }
548
549 static gboolean
550 gst_adder_sink_event (GstPad * pad, GstEvent * event)
551 {
552   GstAdder *adder;
553   gboolean ret;
554
555   adder = GST_ADDER (gst_pad_get_parent (pad));
556
557   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
558       GST_DEBUG_PAD_NAME (pad));
559
560   switch (GST_EVENT_TYPE (event)) {
561     case GST_EVENT_FLUSH_STOP:
562       /* mark a pending new segment. This event is synchronized
563        * with the streaming thread so we can safely update the
564        * variable without races. It's somewhat weird because we
565        * assume the collectpads forwarded the FLUSH_STOP past us
566        * and downstream (using our source pad, the bastard!).
567        */
568       adder->segment_pending = TRUE;
569       break;
570     default:
571       break;
572   }
573
574   /* now GstCollectPads can take care of the rest, e.g. EOS */
575   ret = adder->collect_event (pad, event);
576
577   gst_object_unref (adder);
578   return ret;
579 }
580
581 static void
582 gst_adder_class_init (GstAdderClass * klass)
583 {
584   GObjectClass *gobject_class;
585   GstElementClass *gstelement_class;
586
587   gobject_class = (GObjectClass *) klass;
588
589   gobject_class->finalize = gst_adder_finalize;
590
591   gstelement_class = (GstElementClass *) klass;
592
593   gst_element_class_add_pad_template (gstelement_class,
594       gst_static_pad_template_get (&gst_adder_src_template));
595   gst_element_class_add_pad_template (gstelement_class,
596       gst_static_pad_template_get (&gst_adder_sink_template));
597   gst_element_class_set_details_simple (gstelement_class, "Adder",
598       "Generic/Audio",
599       "Add N audio channels together", "Thomas <thomas@apestaart.org>");
600
601   parent_class = g_type_class_peek_parent (klass);
602
603   gstelement_class->request_new_pad = gst_adder_request_new_pad;
604   gstelement_class->release_pad = gst_adder_release_pad;
605   gstelement_class->change_state = gst_adder_change_state;
606 }
607
608 static void
609 gst_adder_init (GstAdder * adder)
610 {
611   GstPadTemplate *template;
612
613   template = gst_static_pad_template_get (&gst_adder_src_template);
614   adder->srcpad = gst_pad_new_from_template (template, "src");
615   gst_object_unref (template);
616   gst_pad_set_getcaps_function (adder->srcpad,
617       GST_DEBUG_FUNCPTR (gst_pad_proxy_getcaps));
618   gst_pad_set_setcaps_function (adder->srcpad,
619       GST_DEBUG_FUNCPTR (gst_adder_setcaps));
620   gst_pad_set_query_function (adder->srcpad,
621       GST_DEBUG_FUNCPTR (gst_adder_query));
622   gst_pad_set_event_function (adder->srcpad,
623       GST_DEBUG_FUNCPTR (gst_adder_src_event));
624   gst_element_add_pad (GST_ELEMENT (adder), adder->srcpad);
625
626   adder->format = GST_ADDER_FORMAT_UNSET;
627   adder->padcount = 0;
628   adder->func = NULL;
629
630   /* keep track of the sinkpads requested */
631   adder->collect = gst_collect_pads_new ();
632   gst_collect_pads_set_function (adder->collect,
633       GST_DEBUG_FUNCPTR (gst_adder_collected), adder);
634 }
635
636 static void
637 gst_adder_finalize (GObject * object)
638 {
639   GstAdder *adder = GST_ADDER (object);
640
641   gst_object_unref (adder->collect);
642   adder->collect = NULL;
643
644   G_OBJECT_CLASS (parent_class)->finalize (object);
645 }
646
647 static GstPad *
648 gst_adder_request_new_pad (GstElement * element, GstPadTemplate * templ,
649     const gchar * unused)
650 {
651   gchar *name;
652   GstAdder *adder;
653   GstPad *newpad;
654   gint padcount;
655
656   if (templ->direction != GST_PAD_SINK)
657     goto not_sink;
658
659   adder = GST_ADDER (element);
660
661   /* increment pad counter */
662   padcount = g_atomic_int_exchange_and_add (&adder->padcount, 1);
663
664   name = g_strdup_printf ("sink%d", padcount);
665   newpad = gst_pad_new_from_template (templ, name);
666   GST_DEBUG_OBJECT (adder, "request new pad %s", name);
667   g_free (name);
668
669   gst_pad_set_getcaps_function (newpad,
670       GST_DEBUG_FUNCPTR (gst_adder_sink_getcaps));
671   gst_pad_set_setcaps_function (newpad, GST_DEBUG_FUNCPTR (gst_adder_setcaps));
672   gst_collect_pads_add_pad (adder->collect, newpad, sizeof (GstCollectData));
673
674   /* FIXME: hacked way to override/extend the event function of
675    * GstCollectPads; because it sets its own event function giving the
676    * element no access to events */
677   adder->collect_event = (GstPadEventFunction) GST_PAD_EVENTFUNC (newpad);
678   gst_pad_set_event_function (newpad, GST_DEBUG_FUNCPTR (gst_adder_sink_event));
679
680   /* takes ownership of the pad */
681   if (!gst_element_add_pad (GST_ELEMENT (adder), newpad))
682     goto could_not_add;
683
684   return newpad;
685
686   /* errors */
687 not_sink:
688   {
689     g_warning ("gstadder: request new pad that is not a SINK pad\n");
690     return NULL;
691   }
692 could_not_add:
693   {
694     GST_DEBUG_OBJECT (adder, "could not add pad");
695     gst_collect_pads_remove_pad (adder->collect, newpad);
696     gst_object_unref (newpad);
697     return NULL;
698   }
699 }
700
701 static void
702 gst_adder_release_pad (GstElement * element, GstPad * pad)
703 {
704   GstAdder *adder;
705
706   adder = GST_ADDER (element);
707
708   GST_DEBUG_OBJECT (adder, "release pad %s:%s", GST_DEBUG_PAD_NAME (pad));
709
710   gst_collect_pads_remove_pad (adder->collect, pad);
711   gst_element_remove_pad (element, pad);
712 }
713
714 static GstFlowReturn
715 gst_adder_collected (GstCollectPads * pads, gpointer user_data)
716 {
717   /*
718    * combine channels by adding sample values
719    * basic algorithm :
720    * - this function is called when all pads have a buffer
721    * - get available bytes on all pads.
722    * - repeat for each input pad :
723    *   - read available bytes, copy or add to target buffer
724    *   - if there's an EOS event, remove the input channel
725    * - push out the output buffer
726    */
727   GstAdder *adder;
728   guint size;
729   GSList *collected;
730   GstBuffer *outbuf;
731   GstFlowReturn ret;
732   gpointer outbytes;
733   gboolean empty = TRUE;
734
735   adder = GST_ADDER (user_data);
736
737   /* this is fatal */
738   if (G_UNLIKELY (adder->func == NULL))
739     goto not_negotiated;
740
741   /* get available bytes for reading, this can be 0 which could mean
742    * empty buffers or EOS, which we will catch when we loop over the
743    * pads. */
744   size = gst_collect_pads_available (pads);
745
746   GST_LOG_OBJECT (adder,
747       "starting to cycle through channels, %d bytes available (bps = %d)", size,
748       adder->bps);
749
750   outbuf = NULL;
751   outbytes = NULL;
752
753   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
754     GstCollectData *data;
755     guint8 *bytes;
756     guint len;
757     GstBuffer *inbuf;
758
759     data = (GstCollectData *) collected->data;
760
761     /* get a subbuffer of size bytes */
762     inbuf = gst_collect_pads_take_buffer (pads, data, size);
763     /* NULL means EOS or an empty buffer so we still need to flush in
764      * case of an empty buffer. */
765     if (inbuf == NULL) {
766       GST_LOG_OBJECT (adder, "channel %p: no bytes available", data);
767       goto next;
768     }
769
770     bytes = GST_BUFFER_DATA (inbuf);
771     len = GST_BUFFER_SIZE (inbuf);
772
773     if (outbuf == NULL) {
774       GST_LOG_OBJECT (adder, "channel %p: making output buffer of %d bytes",
775           data, size);
776
777       /* first buffer, alloc size bytes. FIXME, we can easily subbuffer
778        * and _make_writable. */
779       outbuf = gst_buffer_new_and_alloc (size);
780       outbytes = GST_BUFFER_DATA (outbuf);
781       gst_buffer_set_caps (outbuf, GST_PAD_CAPS (adder->srcpad));
782
783       if (!GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) {
784         /* clear if we are only going to fill a partial buffer */
785         if (G_UNLIKELY (size > len))
786           memset (outbytes, 0, size);
787
788         GST_LOG_OBJECT (adder, "channel %p: copying %d bytes from data %p",
789             data, len, bytes);
790
791         /* and copy the data into it */
792         memcpy (outbytes, bytes, len);
793         empty = FALSE;
794       } else {
795         GST_LOG_OBJECT (adder, "channel %p: zeroing %d bytes from data %p",
796             data, len, bytes);
797         memset (outbytes, 0, size);
798       }
799     } else {
800       if (!GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) {
801         GST_LOG_OBJECT (adder, "channel %p: mixing %d bytes from data %p",
802             data, len, bytes);
803         /* other buffers, need to add them */
804         adder->func ((gpointer) outbytes, (gpointer) bytes, len);
805         empty = FALSE;
806       } else {
807         GST_LOG_OBJECT (adder, "channel %p: skipping %d bytes from data %p",
808             data, len, bytes);
809       }
810     }
811   next:
812     if (inbuf)
813       gst_buffer_unref (inbuf);
814   }
815
816   /* can only happen when no pads to collect or all EOS */
817   if (outbuf == NULL)
818     goto eos;
819
820   /* our timestamping is very simple, just an ever incrementing
821    * counter, the new segment time will take care of their respective
822    * stream time. */
823   if (adder->segment_pending) {
824     GstEvent *event;
825
826     /* FIXME, use rate/applied_rate as set on all sinkpads.
827      * - currently we just set rate as received from last seek-event
828      * We could potentially figure out the duration as well using
829      * the current segment positions and the stated stop positions.
830      * Also we just start from stream time 0 which is rather
831      * weird. For non-synchronized mixing, the time should be
832      * the min of the stream times of all received segments,
833      * rationale being that the duration is at least going to
834      * be as long as the earliest stream we start mixing. This
835      * would also be correct for synchronized mixing but then
836      * the later streams would be delayed until the stream times
837      * match.
838      */
839     event = gst_event_new_new_segment_full (FALSE, adder->segment_rate,
840         1.0, GST_FORMAT_TIME, adder->timestamp, -1, adder->segment_position);
841
842     gst_pad_push_event (adder->srcpad, event);
843     adder->segment_pending = FALSE;
844     adder->segment_position = 0;
845   }
846
847   /* set timestamps on the output buffer */
848   GST_BUFFER_TIMESTAMP (outbuf) = adder->timestamp;
849   GST_BUFFER_OFFSET (outbuf) = adder->offset;
850
851   /* for the next timestamp, use the sample counter, which will
852    * never accumulate rounding errors */
853   adder->offset += size / adder->bps;
854   adder->timestamp = gst_util_uint64_scale_int (adder->offset,
855       GST_SECOND, adder->rate);
856
857   /* now we can set the duration of the buffer */
858   GST_BUFFER_DURATION (outbuf) = adder->timestamp -
859       GST_BUFFER_TIMESTAMP (outbuf);
860
861   /* if we only processed silence, mark output again as silence */
862   if (empty)
863     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_GAP);
864
865   /* send it out */
866   GST_LOG_OBJECT (adder, "pushing outbuf, timestamp %" GST_TIME_FORMAT,
867       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)));
868   ret = gst_pad_push (adder->srcpad, outbuf);
869
870   return ret;
871
872   /* ERRORS */
873 not_negotiated:
874   {
875     GST_ELEMENT_ERROR (adder, STREAM, FORMAT, (NULL),
876         ("Unknown data received, not negotiated"));
877     return GST_FLOW_NOT_NEGOTIATED;
878   }
879 eos:
880   {
881     GST_DEBUG_OBJECT (adder, "no data available, must be EOS");
882     gst_pad_push_event (adder->srcpad, gst_event_new_eos ());
883     return GST_FLOW_UNEXPECTED;
884   }
885 }
886
887 static GstStateChangeReturn
888 gst_adder_change_state (GstElement * element, GstStateChange transition)
889 {
890   GstAdder *adder;
891   GstStateChangeReturn ret;
892
893   adder = GST_ADDER (element);
894
895   switch (transition) {
896     case GST_STATE_CHANGE_NULL_TO_READY:
897       break;
898     case GST_STATE_CHANGE_READY_TO_PAUSED:
899       adder->timestamp = 0;
900       adder->offset = 0;
901       adder->segment_pending = TRUE;
902       adder->segment_position = 0;
903       adder->segment_rate = 1.0;
904       gst_segment_init (&adder->segment, GST_FORMAT_UNDEFINED);
905       gst_collect_pads_start (adder->collect);
906       break;
907     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
908       break;
909     case GST_STATE_CHANGE_PAUSED_TO_READY:
910       /* need to unblock the collectpads before calling the
911        * parent change_state so that streaming can finish */
912       gst_collect_pads_stop (adder->collect);
913       break;
914     default:
915       break;
916   }
917
918   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
919
920   switch (transition) {
921     default:
922       break;
923   }
924
925   return ret;
926 }
927
928
929 static gboolean
930 plugin_init (GstPlugin * plugin)
931 {
932   if (!gst_element_register (plugin, "adder", GST_RANK_NONE, GST_TYPE_ADDER)) {
933     return FALSE;
934   }
935
936   return TRUE;
937 }
938
939 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
940     GST_VERSION_MINOR,
941     "adder",
942     "Adds multiple streams",
943     plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)