liveadder: Document that the latency is in milliseconds
[platform/upstream/gstreamer.git] / gst / liveadder / liveadder.c
1 /*
2  * GStreamer
3  *
4  *  Copyright 2012 Collabora Ltd
5  *  Copyright 2008 Nokia Corporation
6  *   @author: Olivier Crete <olivier.crete@collabora.co.uk>
7  *
8  * With parts copied from the adder plugin which is
9  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
10  *                    2001 Thomas <thomas@apestaart.org>
11  *               2005,2006 Wim Taymans <wim@fluendo.com>
12  *
13  * This library is free software; you can redistribute it and/or
14  * modify it under the terms of the GNU Lesser General Public
15  * License as published by the Free Software Foundation; either
16  * version 2.1 of the License, or (at your option) any later version.
17  *
18  * This library is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21  * Lesser General Public License for more details.
22  *
23  * You should have received a copy of the GNU Lesser General Public
24  * License along with this library; if not, write to the Free Software
25  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
26  *
27  */
28 /**
29  * SECTION:element-liveadder
30  * @see_also: adder
31  *
32  * The live adder allows to mix several streams into one by adding the data.
33  * Mixed data is clamped to the min/max values of the data format.
34  *
35  * Unlike the adder, the liveadder mixes the streams according the their
36  * timestamps and waits for some milli-seconds before trying doing the mixing.
37  *
38  * Last reviewed on 2008-02-10 (0.10.11)
39  */
40
41 #ifdef HAVE_CONFIG_H
42 #include "config.h"
43 #endif
44
45 #include "liveadder.h"
46
47 #include <gst/audio/audio.h>
48
49 #include <string.h>
50
51 #define DEFAULT_LATENCY_MS 60
52
53 GST_DEBUG_CATEGORY_STATIC (live_adder_debug);
54 #define GST_CAT_DEFAULT (live_adder_debug)
55
56 static GstStaticPadTemplate gst_live_adder_sink_template =
57 GST_STATIC_PAD_TEMPLATE ("sink_%u",
58     GST_PAD_SINK,
59     GST_PAD_REQUEST,
60     GST_STATIC_CAPS (GST_AUDIO_CAPS_MAKE ("{ S8, U8, "
61             GST_AUDIO_NE (S16) "," GST_AUDIO_NE (U16) ","
62             GST_AUDIO_NE (S32) "," GST_AUDIO_NE (U32) ","
63             GST_AUDIO_NE (F32) "," GST_AUDIO_NE (F64) "}"))
64     );
65
66 static GstStaticPadTemplate gst_live_adder_src_template =
67 GST_STATIC_PAD_TEMPLATE ("src",
68     GST_PAD_SRC,
69     GST_PAD_ALWAYS,
70     GST_STATIC_CAPS (GST_AUDIO_CAPS_MAKE ("{ S8, U8, "
71             GST_AUDIO_NE (S16) "," GST_AUDIO_NE (U16) ","
72             GST_AUDIO_NE (S32) "," GST_AUDIO_NE (U32) ","
73             GST_AUDIO_NE (F32) "," GST_AUDIO_NE (F64) "}"))
74     );
75
76 /* Valve signals and args */
77 enum
78 {
79   /* FILL ME */
80   LAST_SIGNAL
81 };
82
83 enum
84 {
85   PROP_0,
86   PROP_LATENCY,
87 };
88
89 typedef struct _GstLiveAdderPadPrivate
90 {
91   GstSegment segment;
92   gboolean eos;
93
94   GstClockTime expected_timestamp;
95
96 } GstLiveAdderPadPrivate;
97
98 G_DEFINE_TYPE (GstLiveAdder, gst_live_adder, GST_TYPE_ELEMENT);
99
100 static void gst_live_adder_finalize (GObject * object);
101 static void
102 gst_live_adder_set_property (GObject * object,
103     guint prop_id, const GValue * value, GParamSpec * pspec);
104 static void
105 gst_live_adder_get_property (GObject * object,
106     guint prop_id, GValue * value, GParamSpec * pspec);
107
108 static GstPad *gst_live_adder_request_new_pad (GstElement * element,
109     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
110 static void gst_live_adder_release_pad (GstElement * element, GstPad * pad);
111 static GstStateChangeReturn
112 gst_live_adder_change_state (GstElement * element, GstStateChange transition);
113
114 static gboolean gst_live_adder_setcaps (GstLiveAdder * adder, GstPad * pad,
115     GstCaps * caps);
116 static GstCaps *gst_live_adder_sink_getcaps (GstLiveAdder * adder, GstPad * pad,
117     GstCaps * filter);
118 static gboolean gst_live_adder_src_activate_mode (GstPad * pad,
119     GstObject * parent, GstPadMode mode, gboolean active);
120 static gboolean gst_live_adder_src_event (GstPad * pad, GstObject * parent,
121     GstEvent * event);
122
123 static void gst_live_adder_loop (gpointer data);
124 static gboolean gst_live_adder_src_query (GstPad * pad, GstObject * parent,
125     GstQuery * query);
126 static gboolean gst_live_adder_sink_query (GstPad * pad, GstObject * parent,
127     GstQuery * query);
128 static gboolean gst_live_adder_sink_event (GstPad * pad, GstObject * parent,
129     GstEvent * event);
130
131
132 static void reset_pad_private (GstPad * pad);
133
134 /* clipping versions */
135 #define MAKE_FUNC(name,type,ttype,min,max)                      \
136 static void name (type *out, type *in, gint bytes) {            \
137   gint i;                                                       \
138   for (i = 0; i < bytes / sizeof (type); i++)                   \
139     out[i] = CLAMP ((ttype)out[i] + (ttype)in[i], min, max);    \
140 }
141
142 /* non-clipping versions (for float) */
143 #define MAKE_FUNC_NC(name,type,ttype)                           \
144 static void name (type *out, type *in, gint bytes) {            \
145   gint i;                                                       \
146   for (i = 0; i < bytes / sizeof (type); i++)                   \
147     out[i] = (ttype)out[i] + (ttype)in[i];                      \
148 }
149
150 /* *INDENT-OFF* */
151 MAKE_FUNC (add_int32, gint32, gint64, G_MININT32, G_MAXINT32)
152 MAKE_FUNC (add_int16, gint16, gint32, G_MININT16, G_MAXINT16)
153 MAKE_FUNC (add_int8, gint8, gint16, G_MININT8, G_MAXINT8)
154 MAKE_FUNC (add_uint32, guint32, guint64, 0, G_MAXUINT32)
155 MAKE_FUNC (add_uint16, guint16, guint32, 0, G_MAXUINT16)
156 MAKE_FUNC (add_uint8, guint8, guint16, 0, G_MAXUINT8)
157 MAKE_FUNC_NC (add_float64, gdouble, gdouble)
158 MAKE_FUNC_NC (add_float32, gfloat, gfloat)
159 /* *INDENT-ON* */
160
161
162 static void
163 gst_live_adder_class_init (GstLiveAdderClass * klass)
164 {
165   GObjectClass *gobject_class = (GObjectClass *) klass;
166   GstElementClass *gstelement_class = (GstElementClass *) klass;
167
168   GST_DEBUG_CATEGORY_INIT (live_adder_debug, "liveadder", 0, "Live Adder");
169
170   gst_element_class_add_pad_template (gstelement_class,
171       gst_static_pad_template_get (&gst_live_adder_src_template));
172   gst_element_class_add_pad_template (gstelement_class,
173       gst_static_pad_template_get (&gst_live_adder_sink_template));
174   gst_element_class_set_static_metadata (gstelement_class, "Live Adder element",
175       "Generic/Audio",
176       "Mixes live/discontinuous audio streams",
177       "Olivier Crete <olivier.crete@collabora.co.uk>");
178
179   gobject_class->finalize = gst_live_adder_finalize;
180   gobject_class->set_property = gst_live_adder_set_property;
181   gobject_class->get_property = gst_live_adder_get_property;
182
183   gstelement_class->request_new_pad = gst_live_adder_request_new_pad;
184   gstelement_class->release_pad = gst_live_adder_release_pad;
185   gstelement_class->change_state = gst_live_adder_change_state;
186
187   g_object_class_install_property (gobject_class, PROP_LATENCY,
188       g_param_spec_uint ("latency", "Buffering latency",
189           "Amount of data to buffer (in milliseconds)",
190           0, G_MAXUINT, DEFAULT_LATENCY_MS,
191           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192 }
193
194 static void
195 gst_live_adder_init (GstLiveAdder * adder)
196 {
197   adder->srcpad =
198       gst_pad_new_from_static_template (&gst_live_adder_src_template, "src");
199   gst_pad_set_query_function (adder->srcpad,
200       GST_DEBUG_FUNCPTR (gst_live_adder_src_query));
201   gst_pad_set_event_function (adder->srcpad,
202       GST_DEBUG_FUNCPTR (gst_live_adder_src_event));
203   gst_pad_set_activatemode_function (adder->srcpad,
204       GST_DEBUG_FUNCPTR (gst_live_adder_src_activate_mode));
205   gst_element_add_pad (GST_ELEMENT (adder), adder->srcpad);
206
207   adder->padcount = 0;
208   adder->func = NULL;
209   g_cond_init (&adder->not_empty_cond);
210
211   adder->next_timestamp = GST_CLOCK_TIME_NONE;
212
213   adder->latency_ms = DEFAULT_LATENCY_MS;
214
215   adder->buffers = g_queue_new ();
216 }
217
218
219 static void
220 gst_live_adder_finalize (GObject * object)
221 {
222   GstLiveAdder *adder = GST_LIVE_ADDER (object);
223
224   g_cond_clear (&adder->not_empty_cond);
225
226   g_queue_foreach (adder->buffers, (GFunc) gst_mini_object_unref, NULL);
227   while (g_queue_pop_head (adder->buffers)) {
228   }
229   g_queue_free (adder->buffers);
230
231   g_list_free (adder->sinkpads);
232
233   G_OBJECT_CLASS (gst_live_adder_parent_class)->finalize (object);
234 }
235
236
237 static void
238 gst_live_adder_set_property (GObject * object,
239     guint prop_id, const GValue * value, GParamSpec * pspec)
240 {
241   GstLiveAdder *adder = GST_LIVE_ADDER (object);
242
243   switch (prop_id) {
244     case PROP_LATENCY:
245     {
246       guint64 new_latency, old_latency;
247
248       new_latency = g_value_get_uint (value);
249
250       GST_OBJECT_LOCK (adder);
251       old_latency = adder->latency_ms;
252       adder->latency_ms = new_latency;
253       GST_OBJECT_UNLOCK (adder);
254
255       /* post message if latency changed, this will inform the parent pipeline
256        * that a latency reconfiguration is possible/needed. */
257       if (new_latency != old_latency) {
258         GST_DEBUG_OBJECT (adder, "latency changed to: %" GST_TIME_FORMAT,
259             GST_TIME_ARGS (new_latency));
260
261         gst_element_post_message (GST_ELEMENT_CAST (adder),
262             gst_message_new_latency (GST_OBJECT_CAST (adder)));
263       }
264       break;
265     }
266     default:
267       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
268       break;
269   }
270 }
271
272
273 static void
274 gst_live_adder_get_property (GObject * object,
275     guint prop_id, GValue * value, GParamSpec * pspec)
276 {
277   GstLiveAdder *adder = GST_LIVE_ADDER (object);
278
279   switch (prop_id) {
280     case PROP_LATENCY:
281       GST_OBJECT_LOCK (adder);
282       g_value_set_uint (value, adder->latency_ms);
283       GST_OBJECT_UNLOCK (adder);
284       break;
285     default:
286       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
287       break;
288   }
289 }
290
291
292 /* we can only accept caps that we and downstream can handle. */
293 static GstCaps *
294 gst_live_adder_sink_getcaps (GstLiveAdder * adder, GstPad * pad,
295     GstCaps * filter)
296 {
297   GstCaps *result, *peercaps, *sinkcaps;
298
299   /* get the downstream possible caps */
300   peercaps = gst_pad_peer_query_caps (adder->srcpad, filter);
301   /* get the allowed caps on this sinkpad, we use the fixed caps function so
302    * that it does not call recursively in this function. */
303   sinkcaps = gst_pad_get_current_caps (pad);
304   if (!sinkcaps)
305     sinkcaps = gst_pad_get_pad_template_caps (pad);
306   if (peercaps) {
307     /* if the peer has caps, intersect */
308     GST_DEBUG_OBJECT (adder, "intersecting peer and template caps");
309     result = gst_caps_intersect (peercaps, sinkcaps);
310     gst_caps_unref (sinkcaps);
311     gst_caps_unref (peercaps);
312   } else {
313     /* the peer has no caps (or there is no peer), just use the allowed caps
314      * of this sinkpad. */
315     GST_DEBUG_OBJECT (adder, "no peer caps, using sinkcaps");
316     result = sinkcaps;
317   }
318
319   return result;
320 }
321
322 struct SetCapsIterCtx
323 {
324   GstPad *pad;
325   GstCaps *caps;
326   gboolean all_valid;
327 };
328
329 static void
330 check_other_caps (const GValue * item, gpointer user_data)
331 {
332   GstPad *otherpad = GST_PAD (g_value_get_object (item));
333   struct SetCapsIterCtx *ctx = user_data;
334
335   if (otherpad == ctx->pad)
336     return;
337
338   if (!gst_pad_peer_query_accept_caps (otherpad, ctx->caps))
339     ctx->all_valid = FALSE;
340 }
341
342 static void
343 set_other_caps (const GValue * item, gpointer user_data)
344 {
345   GstPad *otherpad = GST_PAD (g_value_get_object (item));
346   struct SetCapsIterCtx *ctx = user_data;
347
348   if (otherpad == ctx->pad)
349     return;
350
351   if (!gst_pad_set_caps (otherpad, ctx->caps))
352     ctx->all_valid = FALSE;
353 }
354
355 /* the first caps we receive on any of the sinkpads will define the caps for all
356  * the other sinkpads because we can only mix streams with the same caps.
357  * */
358 static gboolean
359 gst_live_adder_setcaps (GstLiveAdder * adder, GstPad * pad, GstCaps * caps)
360 {
361   GstIterator *iter;
362   struct SetCapsIterCtx ctx;
363
364   GST_LOG_OBJECT (adder, "setting caps on pad %p,%s to %" GST_PTR_FORMAT, pad,
365       GST_PAD_NAME (pad), caps);
366
367   /* FIXME, see if the other pads can accept the format. Also lock the
368    * format on the other pads to this new format. */
369   iter = gst_element_iterate_sink_pads (GST_ELEMENT (adder));
370   ctx.pad = pad;
371   ctx.caps = caps;
372   ctx.all_valid = TRUE;
373   while (gst_iterator_foreach (iter, check_other_caps, &ctx) ==
374       GST_ITERATOR_RESYNC) {
375     ctx.all_valid = TRUE;
376     gst_iterator_resync (iter);
377   }
378   if (!ctx.all_valid) {
379     GST_WARNING_OBJECT (adder, "Caps are not acceptable by other sinkpads");
380     gst_iterator_free (iter);
381     return FALSE;
382   }
383
384   while (gst_iterator_foreach (iter, set_other_caps, &ctx) ==
385       GST_ITERATOR_RESYNC) {
386     ctx.all_valid = TRUE;
387     gst_iterator_resync (iter);
388   }
389   gst_iterator_free (iter);
390
391   if (!ctx.all_valid) {
392     GST_WARNING_OBJECT (adder, "Could not set caps on the other sink pads");
393     return FALSE;
394   }
395
396   if (!gst_pad_set_caps (adder->srcpad, caps)) {
397     GST_WARNING_OBJECT (adder, "Could not set caps downstream");
398     return FALSE;
399   }
400
401   GST_OBJECT_LOCK (adder);
402   /* parse caps now */
403   if (!gst_audio_info_from_caps (&adder->info, caps))
404     goto not_supported;
405
406   if (GST_AUDIO_INFO_IS_INTEGER (&adder->info)) {
407     switch (GST_AUDIO_INFO_WIDTH (&adder->info)) {
408       case 8:
409         adder->func = GST_AUDIO_INFO_IS_SIGNED (&adder->info) ?
410             (GstLiveAdderFunction) add_int8 : (GstLiveAdderFunction) add_uint8;
411         break;
412       case 16:
413         adder->func = GST_AUDIO_INFO_IS_SIGNED (&adder->info) ?
414             (GstLiveAdderFunction) add_int16 : (GstLiveAdderFunction)
415             add_uint16;
416         break;
417       case 32:
418         adder->func = GST_AUDIO_INFO_IS_SIGNED (&adder->info) ?
419             (GstLiveAdderFunction) add_int32 : (GstLiveAdderFunction)
420             add_uint32;
421         break;
422       default:
423         goto not_supported;
424     }
425   } else if (GST_AUDIO_INFO_IS_FLOAT (&adder->info)) {
426     switch (GST_AUDIO_INFO_WIDTH (&adder->info)) {
427       case 32:
428         adder->func = (GstLiveAdderFunction) add_float32;
429         break;
430       case 64:
431         adder->func = (GstLiveAdderFunction) add_float64;
432         break;
433       default:
434         goto not_supported;
435     }
436   } else {
437     goto not_supported;
438   }
439
440   GST_OBJECT_UNLOCK (adder);
441   return TRUE;
442
443   /* ERRORS */
444 not_supported:
445   {
446     GST_OBJECT_UNLOCK (adder);
447     GST_DEBUG_OBJECT (adder, "unsupported format set as caps");
448     return FALSE;
449   }
450 }
451
452 static void
453 gst_live_adder_flush_start (GstLiveAdder * adder)
454 {
455   GST_DEBUG_OBJECT (adder, "Disabling pop on queue");
456
457   GST_OBJECT_LOCK (adder);
458   /* mark ourselves as flushing */
459   adder->srcresult = GST_FLOW_FLUSHING;
460
461   /* Empty the queue */
462   g_queue_foreach (adder->buffers, (GFunc) gst_mini_object_unref, NULL);
463   while (g_queue_pop_head (adder->buffers));
464
465   /* unlock clock, we just unschedule, the entry will be released by the
466    * locking streaming thread. */
467   if (adder->clock_id)
468     gst_clock_id_unschedule (adder->clock_id);
469
470   g_cond_broadcast (&adder->not_empty_cond);
471   GST_OBJECT_UNLOCK (adder);
472 }
473
474 static gboolean
475 gst_live_adder_src_activate_mode (GstPad * pad, GstObject * parent,
476     GstPadMode mode, gboolean active)
477 {
478   GstLiveAdder *adder = GST_LIVE_ADDER (parent);
479   gboolean result = TRUE;
480
481   if (mode == GST_PAD_MODE_PULL)
482     return FALSE;
483
484   if (active) {
485     /* Mark as non flushing */
486     GST_OBJECT_LOCK (adder);
487     adder->srcresult = GST_FLOW_OK;
488     GST_OBJECT_UNLOCK (adder);
489
490     /* start pushing out buffers */
491     GST_DEBUG_OBJECT (adder, "Starting task on srcpad");
492     gst_pad_start_task (adder->srcpad,
493         (GstTaskFunction) gst_live_adder_loop, adder, NULL);
494   } else {
495     /* make sure all data processing stops ASAP */
496     gst_live_adder_flush_start (adder);
497
498     /* NOTE this will hardlock if the state change is called from the src pad
499      * task thread because we will _join() the thread. */
500     GST_DEBUG_OBJECT (adder, "Stopping task on srcpad");
501     result = gst_pad_stop_task (pad);
502   }
503
504   return result;
505 }
506
507 static gboolean
508 gst_live_adder_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
509 {
510   GstLiveAdder *adder = GST_LIVE_ADDER (parent);
511   GstLiveAdderPadPrivate *padprivate = NULL;
512   gboolean ret = TRUE;
513
514   padprivate = gst_pad_get_element_private (pad);
515
516   if (!padprivate)
517     return FALSE;
518
519   GST_LOG_OBJECT (adder, "received %s", GST_EVENT_TYPE_NAME (event));
520
521   switch (GST_EVENT_TYPE (event)) {
522     case GST_EVENT_CAPS:
523     {
524       GstCaps *caps;
525
526       gst_event_parse_caps (event, &caps);
527       ret = gst_live_adder_setcaps (adder, pad, caps);
528       gst_event_unref (event);
529       break;
530     }
531     case GST_EVENT_SEGMENT:
532     {
533       const GstSegment *segment;
534       GstSegment livesegment;
535
536       gst_event_parse_segment (event, &segment);
537
538       /* we need time for now */
539       if (segment->format != GST_FORMAT_TIME)
540         goto newseg_wrong_format;
541
542       /* now configure the values, we need these to time the release of the
543        * buffers on the srcpad. */
544       GST_OBJECT_LOCK (adder);
545       gst_segment_copy_into (segment, &padprivate->segment);
546       GST_OBJECT_UNLOCK (adder);
547       gst_event_unref (event);
548
549       gst_segment_init (&livesegment, GST_FORMAT_TIME);
550       gst_pad_push_event (adder->srcpad, gst_event_new_segment (&livesegment));
551       break;
552     }
553     case GST_EVENT_FLUSH_START:
554       gst_live_adder_flush_start (adder);
555       ret = gst_pad_push_event (adder->srcpad, event);
556       break;
557     case GST_EVENT_FLUSH_STOP:
558       GST_OBJECT_LOCK (adder);
559       adder->next_timestamp = GST_CLOCK_TIME_NONE;
560       reset_pad_private (pad);
561       GST_OBJECT_UNLOCK (adder);
562       ret = gst_pad_push_event (adder->srcpad, event);
563       ret = gst_live_adder_src_activate_mode (adder->srcpad, GST_OBJECT (adder),
564           GST_PAD_MODE_PUSH, TRUE);
565       break;
566     case GST_EVENT_EOS:
567     {
568       GST_OBJECT_LOCK (adder);
569
570       ret = adder->srcresult == GST_FLOW_OK;
571       if (ret && !padprivate->eos) {
572         GST_DEBUG_OBJECT (adder, "queuing EOS");
573         padprivate->eos = TRUE;
574         g_cond_broadcast (&adder->not_empty_cond);
575       } else if (padprivate->eos) {
576         GST_DEBUG_OBJECT (adder, "dropping EOS, we are already EOS");
577       } else {
578         GST_DEBUG_OBJECT (adder, "dropping EOS, reason %s",
579             gst_flow_get_name (adder->srcresult));
580       }
581
582       GST_OBJECT_UNLOCK (adder);
583
584       gst_event_unref (event);
585       break;
586     }
587     default:
588       ret = gst_pad_push_event (adder->srcpad, event);
589       break;
590   }
591
592 done:
593
594   return ret;
595
596   /* ERRORS */
597 newseg_wrong_format:
598   {
599     GST_DEBUG_OBJECT (adder, "received non TIME segment");
600     ret = FALSE;
601     goto done;
602   }
603 }
604
605 static gboolean
606 gst_live_adder_query_pos_dur (GstLiveAdder * adder, GstFormat format,
607     gboolean position, gint64 * outvalue)
608 {
609   gint64 max = G_MININT64;
610   gboolean res = TRUE;
611   GstIterator *it;
612   gboolean done = FALSE;
613
614
615   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (adder));
616   while (!done) {
617     GstIteratorResult ires;
618     GValue item = { 0 };
619
620     ires = gst_iterator_next (it, &item);
621     switch (ires) {
622       case GST_ITERATOR_DONE:
623         done = TRUE;
624         break;
625       case GST_ITERATOR_OK:
626       {
627         GstPad *pad = GST_PAD_CAST (g_value_get_object (&item));
628         gint64 value;
629         gboolean curres;
630
631         /* ask sink peer for duration */
632         if (position)
633           curres = gst_pad_peer_query_position (pad, format, &value);
634         else
635           curres = gst_pad_peer_query_duration (pad, format, &value);
636
637         /* take max from all valid return values */
638         /* Only if the format is the one we requested, otherwise ignore it ?
639          */
640
641         if (curres) {
642           res &= curres;
643
644           /* valid unknown length, stop searching */
645           if (value == -1) {
646             max = value;
647             done = TRUE;
648           } else if (value > max) {
649             max = value;
650           }
651         }
652         break;
653       }
654       case GST_ITERATOR_RESYNC:
655         max = -1;
656         res = TRUE;
657         break;
658       default:
659         res = FALSE;
660         done = TRUE;
661         break;
662     }
663   }
664   gst_iterator_free (it);
665
666   if (res)
667     *outvalue = max;
668
669   return res;
670 }
671
672 /* FIXME:
673  *
674  * When we add a new stream (or remove a stream) the duration might
675  * also become invalid again and we need to post a new DURATION
676  * message to notify this fact to the parent.
677  * For now we take the max of all the upstream elements so the simple
678  * cases work at least somewhat.
679  */
680 static gboolean
681 gst_live_adder_query_duration (GstLiveAdder * adder, GstQuery * query)
682 {
683   GstFormat format;
684   gint64 max;
685   gboolean res;
686
687   /* parse format */
688   gst_query_parse_duration (query, &format, NULL);
689
690   res = gst_live_adder_query_pos_dur (adder, format, FALSE, &max);
691
692   if (res) {
693     /* and store the max */
694     gst_query_set_duration (query, format, max);
695   }
696
697   return res;
698 }
699
700 static gboolean
701 gst_live_adder_query_position (GstLiveAdder * adder, GstQuery * query)
702 {
703   GstFormat format;
704   gint64 max;
705   gboolean res;
706
707   /* parse format */
708   gst_query_parse_position (query, &format, NULL);
709
710   res = gst_live_adder_query_pos_dur (adder, format, TRUE, &max);
711
712   if (res) {
713     /* and store the max */
714     gst_query_set_position (query, format, max);
715   }
716
717   return res;
718 }
719
720
721
722 static gboolean
723 gst_live_adder_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
724 {
725   GstLiveAdder *adder = GST_LIVE_ADDER (parent);
726   gboolean res = FALSE;
727
728   switch (GST_QUERY_TYPE (query)) {
729     case GST_QUERY_LATENCY:
730     {
731       /* We need to send the query upstream and add the returned latency to our
732        * own */
733       GstClockTime min_latency = 0, max_latency = G_MAXUINT64;
734       GValue item = { 0 };
735       GstIterator *iter = NULL;
736       gboolean done = FALSE;
737
738       iter = gst_element_iterate_sink_pads (GST_ELEMENT (adder));
739
740       while (!done) {
741         switch (gst_iterator_next (iter, &item)) {
742           case GST_ITERATOR_OK:
743           {
744             GstPad *sinkpad = GST_PAD (g_value_get_object (&item));
745             GstClockTime pad_min_latency, pad_max_latency;
746             gboolean pad_us_live;
747
748             if (gst_pad_peer_query (sinkpad, query)) {
749               gst_query_parse_latency (query, &pad_us_live, &pad_min_latency,
750                   &pad_max_latency);
751
752               res = TRUE;
753
754               GST_DEBUG_OBJECT (adder, "Peer latency for pad %s: min %"
755                   GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
756                   GST_PAD_NAME (sinkpad),
757                   GST_TIME_ARGS (pad_min_latency),
758                   GST_TIME_ARGS (pad_max_latency));
759
760               min_latency = MAX (pad_min_latency, min_latency);
761               max_latency = MIN (pad_max_latency, max_latency);
762             }
763           }
764             break;
765           case GST_ITERATOR_RESYNC:
766             min_latency = 0;
767             max_latency = G_MAXUINT64;
768
769             gst_iterator_resync (iter);
770             break;
771           case GST_ITERATOR_ERROR:
772             GST_ERROR_OBJECT (adder, "Error looping sink pads");
773             done = TRUE;
774             break;
775           case GST_ITERATOR_DONE:
776             done = TRUE;
777             break;
778         }
779       }
780       gst_iterator_free (iter);
781
782       if (res) {
783         GstClockTime my_latency = adder->latency_ms * GST_MSECOND;
784         GST_OBJECT_LOCK (adder);
785         adder->peer_latency = min_latency;
786         min_latency += my_latency;
787         GST_OBJECT_UNLOCK (adder);
788
789         /* Make sure we don't risk an overflow */
790         if (max_latency < G_MAXUINT64 - my_latency)
791           max_latency += my_latency;
792         else
793           max_latency = G_MAXUINT64;
794         gst_query_set_latency (query, TRUE, min_latency, max_latency);
795         GST_DEBUG_OBJECT (adder, "Calculated total latency : min %"
796             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
797             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
798       }
799       break;
800     }
801     case GST_QUERY_DURATION:
802       res = gst_live_adder_query_duration (adder, query);
803       break;
804     case GST_QUERY_POSITION:
805       res = gst_live_adder_query_position (adder, query);
806       break;
807     default:
808       res = gst_pad_query_default (pad, parent, query);
809       break;
810   }
811
812   return res;
813 }
814
815 static gboolean
816 gst_live_adder_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
817 {
818   GstLiveAdder *adder = GST_LIVE_ADDER (parent);
819   gboolean res;
820
821   switch (GST_QUERY_TYPE (query)) {
822     case GST_QUERY_CAPS:
823     {
824       GstCaps *filter;
825       GstCaps *result;
826
827       gst_query_parse_caps (query, &filter);
828       result = gst_live_adder_sink_getcaps (adder, pad, filter);
829       gst_query_set_caps_result (query, result);
830       res = TRUE;
831       break;
832     }
833     default:
834       res = gst_pad_query_default (pad, parent, query);
835       break;
836   }
837
838   return res;
839 }
840
841 static gboolean
842 forward_event_func (const GValue * item, GValue * ret, gpointer user_data)
843 {
844   GstPad *pad = GST_PAD (g_value_get_object (item));
845   GstEvent *event = user_data;
846
847   gst_event_ref (event);
848   GST_LOG_OBJECT (pad, "About to send event %s", GST_EVENT_TYPE_NAME (event));
849   if (!gst_pad_push_event (pad, event)) {
850     g_value_set_boolean (ret, FALSE);
851     GST_WARNING_OBJECT (pad, "Sending event  %p (%s) failed.",
852         event, GST_EVENT_TYPE_NAME (event));
853   } else {
854     GST_LOG_OBJECT (pad, "Sent event  %p (%s).",
855         event, GST_EVENT_TYPE_NAME (event));
856   }
857   return TRUE;
858 }
859
860 /* forwards the event to all sinkpads, takes ownership of the
861  * event
862  *
863  * Returns: TRUE if the event could be forwarded on all
864  * sinkpads.
865  */
866 static gboolean
867 forward_event (GstLiveAdder * adder, GstEvent * event)
868 {
869   gboolean ret;
870   GstIterator *it;
871   GValue vret = { 0 };
872
873   GST_LOG_OBJECT (adder, "Forwarding event %p (%s)", event,
874       GST_EVENT_TYPE_NAME (event));
875
876   ret = TRUE;
877
878   g_value_init (&vret, G_TYPE_BOOLEAN);
879   g_value_set_boolean (&vret, TRUE);
880   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (adder));
881   gst_iterator_fold (it, forward_event_func, &vret, event);
882   gst_iterator_free (it);
883
884   ret = g_value_get_boolean (&vret);
885
886   return ret;
887 }
888
889
890 static gboolean
891 gst_live_adder_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
892 {
893   GstLiveAdder *adder = GST_LIVE_ADDER (parent);
894   gboolean result;
895
896   switch (GST_EVENT_TYPE (event)) {
897     case GST_EVENT_QOS:
898       /* TODO : QoS might be tricky */
899       result = FALSE;
900       break;
901     case GST_EVENT_NAVIGATION:
902       /* TODO : navigation is rather pointless. */
903       result = FALSE;
904       break;
905     default:
906       /* just forward the rest for now */
907       result = forward_event (adder, event);
908       break;
909   }
910
911   gst_event_unref (event);
912
913   return result;
914 }
915
916 static guint
917 gst_live_adder_length_from_duration (GstLiveAdder * adder,
918     GstClockTime duration)
919 {
920   guint64 ret = GST_AUDIO_INFO_BPF (&adder->info) *
921       (duration * GST_AUDIO_INFO_RATE (&adder->info) / GST_SECOND);
922
923   return (guint) ret;
924 }
925
926 static GstFlowReturn
927 gst_live_live_adder_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
928 {
929   GstLiveAdder *adder = GST_LIVE_ADDER (parent);
930   GstLiveAdderPadPrivate *padprivate = NULL;
931   GstFlowReturn ret = GST_FLOW_OK;
932   GList *item = NULL;
933   GstClockTime skip = 0;
934   gint64 drift = 0;             /* Positive if new buffer after old buffer */
935
936   GST_OBJECT_LOCK (adder);
937
938   ret = adder->srcresult;
939
940   GST_DEBUG ("Incoming buffer time:%" GST_TIME_FORMAT " duration:%"
941       GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
942       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
943
944   if (ret != GST_FLOW_OK) {
945     GST_DEBUG_OBJECT (adder, "Passing non-ok result from src: %s",
946         gst_flow_get_name (ret));
947     gst_buffer_unref (buffer);
948     goto out;
949   }
950
951   padprivate = gst_pad_get_element_private (pad);
952
953   if (!padprivate) {
954     ret = GST_FLOW_NOT_LINKED;
955     gst_buffer_unref (buffer);
956     goto out;
957   }
958
959   if (padprivate->eos) {
960     GST_DEBUG_OBJECT (adder, "Received buffer after EOS");
961     ret = GST_FLOW_EOS;
962     gst_buffer_unref (buffer);
963     goto out;
964   }
965
966   if (!GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
967     goto invalid_timestamp;
968
969   if (padprivate->segment.format == GST_FORMAT_UNDEFINED) {
970     GST_WARNING_OBJECT (adder, "No new-segment received,"
971         " initializing segment with time 0..-1");
972     gst_segment_init (&padprivate->segment, GST_FORMAT_TIME);
973   }
974
975   buffer = gst_buffer_make_writable (buffer);
976
977   drift = GST_BUFFER_TIMESTAMP (buffer) - padprivate->expected_timestamp;
978
979   /* Just see if we receive invalid timestamp/durations */
980   if (GST_CLOCK_TIME_IS_VALID (padprivate->expected_timestamp) &&
981       !GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT) &&
982       (drift != 0)) {
983     GST_LOG_OBJECT (adder,
984         "Timestamp discontinuity without the DISCONT flag set"
985         " (expected %" GST_TIME_FORMAT ", got %" GST_TIME_FORMAT
986         " drift:%" G_GINT64_FORMAT "ms)",
987         GST_TIME_ARGS (padprivate->expected_timestamp),
988         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), drift / GST_MSECOND);
989
990     /* We accept drifts of 10ms */
991     if (ABS (drift) < (10 * GST_MSECOND)) {
992       GST_DEBUG ("Correcting minor drift");
993       GST_BUFFER_TIMESTAMP (buffer) = padprivate->expected_timestamp;
994     }
995   }
996
997
998   /* If there is no duration, lets set one */
999   if (!GST_BUFFER_DURATION_IS_VALID (buffer)) {
1000     GST_BUFFER_DURATION (buffer) = (gst_buffer_get_size (buffer) * GST_SECOND) /
1001         (GST_AUDIO_INFO_BPF (&adder->info) *
1002         GST_AUDIO_INFO_RATE (&adder->info));
1003     padprivate->expected_timestamp = GST_CLOCK_TIME_NONE;
1004   } else {
1005     padprivate->expected_timestamp = GST_BUFFER_TIMESTAMP (buffer) +
1006         GST_BUFFER_DURATION (buffer);
1007   }
1008
1009
1010   /*
1011    * Lets clip the buffer to the segment (so we don't have to worry about
1012    * cliping afterwards).
1013    * This should also guarantee us that we'll have valid timestamps and
1014    * durations afterwards
1015    */
1016
1017   buffer = gst_audio_buffer_clip (buffer, &padprivate->segment,
1018       GST_AUDIO_INFO_RATE (&adder->info), GST_AUDIO_INFO_BPF (&adder->info));
1019
1020   /* buffer can be NULL if it's completely outside of the segment */
1021   if (!buffer) {
1022     GST_DEBUG ("Buffer completely outside of configured segment, dropping it");
1023     goto out;
1024   }
1025
1026   /*
1027    * Make sure all incoming buffers share the same timestamping
1028    */
1029   GST_BUFFER_TIMESTAMP (buffer) =
1030       gst_segment_to_running_time (&padprivate->segment,
1031       padprivate->segment.format, GST_BUFFER_TIMESTAMP (buffer));
1032
1033
1034   if (GST_CLOCK_TIME_IS_VALID (adder->next_timestamp) &&
1035       GST_BUFFER_TIMESTAMP (buffer) < adder->next_timestamp) {
1036     if (GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer) <
1037         adder->next_timestamp) {
1038       GST_DEBUG_OBJECT (adder, "Buffer is late, dropping (ts: %" GST_TIME_FORMAT
1039           " duration: %" GST_TIME_FORMAT ")",
1040           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
1041           GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
1042       gst_buffer_unref (buffer);
1043       goto out;
1044     } else {
1045       skip = adder->next_timestamp - GST_BUFFER_TIMESTAMP (buffer);
1046       GST_DEBUG_OBJECT (adder, "Buffer is partially late, skipping %"
1047           GST_TIME_FORMAT, GST_TIME_ARGS (skip));
1048     }
1049   }
1050
1051   /* If our new buffer's head is higher than the queue's head, lets wake up,
1052    * we may not have to wait for as long
1053    */
1054   if (adder->clock_id &&
1055       g_queue_peek_head (adder->buffers) != NULL &&
1056       GST_BUFFER_TIMESTAMP (buffer) + skip <
1057       GST_BUFFER_TIMESTAMP (g_queue_peek_head (adder->buffers)))
1058     gst_clock_id_unschedule (adder->clock_id);
1059
1060   for (item = g_queue_peek_head_link (adder->buffers);
1061       item; item = g_list_next (item)) {
1062     GstBuffer *oldbuffer = item->data;
1063     GstClockTime old_skip = 0;
1064     GstClockTime mix_duration = 0;
1065     GstClockTime mix_start = 0;
1066     GstClockTime mix_end = 0;
1067     GstMapInfo oldmap, map;
1068
1069     /* We haven't reached our place yet */
1070     if (GST_BUFFER_TIMESTAMP (buffer) + skip >=
1071         GST_BUFFER_TIMESTAMP (oldbuffer) + GST_BUFFER_DURATION (oldbuffer))
1072       continue;
1073
1074     /* We're past our place, lets insert ouselves here */
1075     if (GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer) <=
1076         GST_BUFFER_TIMESTAMP (oldbuffer))
1077       break;
1078
1079     /* if we reach this spot, we have overlap, so we must mix */
1080
1081     /* First make a subbuffer with the non-overlapping part */
1082     if (GST_BUFFER_TIMESTAMP (buffer) + skip < GST_BUFFER_TIMESTAMP (oldbuffer)) {
1083       GstBuffer *subbuffer = NULL;
1084       GstClockTime subbuffer_duration = GST_BUFFER_TIMESTAMP (oldbuffer) -
1085           (GST_BUFFER_TIMESTAMP (buffer) + skip);
1086
1087       subbuffer = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL,
1088           gst_live_adder_length_from_duration (adder, skip),
1089           gst_live_adder_length_from_duration (adder, subbuffer_duration));
1090
1091       GST_BUFFER_TIMESTAMP (subbuffer) = GST_BUFFER_TIMESTAMP (buffer) + skip;
1092       GST_BUFFER_DURATION (subbuffer) = subbuffer_duration;
1093
1094       skip += subbuffer_duration;
1095
1096       g_queue_insert_before (adder->buffers, item, subbuffer);
1097     }
1098
1099     /* Now we are on the overlapping part */
1100     oldbuffer = gst_buffer_make_writable (oldbuffer);
1101     item->data = oldbuffer;
1102
1103     old_skip = GST_BUFFER_TIMESTAMP (buffer) + skip -
1104         GST_BUFFER_TIMESTAMP (oldbuffer);
1105
1106     mix_start = GST_BUFFER_TIMESTAMP (oldbuffer) + old_skip;
1107
1108     if (GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer) <
1109         GST_BUFFER_TIMESTAMP (oldbuffer) + GST_BUFFER_DURATION (oldbuffer))
1110       mix_end = GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer);
1111     else
1112       mix_end = GST_BUFFER_TIMESTAMP (oldbuffer) +
1113           GST_BUFFER_DURATION (oldbuffer);
1114
1115     mix_duration = mix_end - mix_start;
1116
1117     gst_buffer_map (oldbuffer, &oldmap, GST_MAP_WRITE);
1118     gst_buffer_map (buffer, &map, GST_MAP_READ);
1119     adder->func (oldmap.data +
1120         gst_live_adder_length_from_duration (adder, old_skip),
1121         map.data +
1122         gst_live_adder_length_from_duration (adder, skip),
1123         gst_live_adder_length_from_duration (adder, mix_duration));
1124     gst_buffer_unmap (oldbuffer, &oldmap);
1125     gst_buffer_unmap (buffer, &map);
1126     skip += mix_duration;
1127   }
1128
1129   g_cond_broadcast (&adder->not_empty_cond);
1130
1131   if (skip == GST_BUFFER_DURATION (buffer)) {
1132     gst_buffer_unref (buffer);
1133   } else {
1134     if (skip) {
1135       GstClockTime subbuffer_duration = GST_BUFFER_DURATION (buffer) - skip;
1136       GstClockTime subbuffer_ts = GST_BUFFER_TIMESTAMP (buffer) + skip;
1137       GstBuffer *new_buffer = gst_buffer_copy_region (buffer,
1138           GST_BUFFER_COPY_ALL,
1139           gst_live_adder_length_from_duration (adder, skip),
1140           gst_live_adder_length_from_duration (adder, subbuffer_duration));
1141       gst_buffer_unref (buffer);
1142       buffer = new_buffer;
1143       GST_BUFFER_PTS (buffer) = subbuffer_ts;
1144       GST_BUFFER_DURATION (buffer) = subbuffer_duration;
1145     }
1146
1147     if (item)
1148       g_queue_insert_before (adder->buffers, item, buffer);
1149     else
1150       g_queue_push_tail (adder->buffers, buffer);
1151   }
1152
1153 out:
1154
1155   GST_OBJECT_UNLOCK (adder);
1156
1157   return ret;
1158
1159 invalid_timestamp:
1160
1161   GST_OBJECT_UNLOCK (adder);
1162   gst_buffer_unref (buffer);
1163   GST_ELEMENT_ERROR (adder, STREAM, FAILED,
1164       ("Buffer without a valid timestamp received"),
1165       ("Invalid timestamp received on buffer"));
1166
1167   return GST_FLOW_ERROR;
1168 }
1169
1170 /*
1171  * This only works because the GstObject lock is taken
1172  *
1173  * It checks if all sink pads are EOS
1174  */
1175 static gboolean
1176 check_eos_locked (GstLiveAdder * adder)
1177 {
1178   GList *item;
1179
1180   /* We can't be EOS if we have no sinkpads */
1181   if (adder->sinkpads == NULL)
1182     return FALSE;
1183
1184   for (item = adder->sinkpads; item; item = g_list_next (item)) {
1185     GstPad *pad = item->data;
1186     GstLiveAdderPadPrivate *padprivate = gst_pad_get_element_private (pad);
1187
1188     if (padprivate && padprivate->eos != TRUE)
1189       return FALSE;
1190   }
1191
1192   return TRUE;
1193 }
1194
1195 static void
1196 gst_live_adder_loop (gpointer data)
1197 {
1198   GstLiveAdder *adder = GST_LIVE_ADDER (data);
1199   GstClockTime buffer_timestamp = 0;
1200   GstClockTime sync_time = 0;
1201   GstClock *clock = NULL;
1202   GstClockID id = NULL;
1203   GstClockReturn ret;
1204   GstBuffer *buffer = NULL;
1205   GstFlowReturn result;
1206   GstEvent *newseg_event = NULL;
1207
1208   GST_OBJECT_LOCK (adder);
1209
1210 again:
1211
1212   for (;;) {
1213     if (adder->srcresult != GST_FLOW_OK)
1214       goto flushing;
1215     if (!g_queue_is_empty (adder->buffers))
1216       break;
1217     if (check_eos_locked (adder))
1218       goto eos;
1219     g_cond_wait (&adder->not_empty_cond, GST_OBJECT_GET_LOCK (adder));
1220   }
1221
1222   buffer_timestamp = GST_BUFFER_TIMESTAMP (g_queue_peek_head (adder->buffers));
1223
1224   clock = GST_ELEMENT_CLOCK (adder);
1225
1226   /* If we have no clock, then we can't do anything.. error */
1227   if (!clock) {
1228     if (adder->playing)
1229       goto no_clock;
1230     else
1231       goto push_buffer;
1232   }
1233
1234   GST_DEBUG_OBJECT (adder, "sync to timestamp %" GST_TIME_FORMAT,
1235       GST_TIME_ARGS (buffer_timestamp));
1236
1237   sync_time = buffer_timestamp + GST_ELEMENT_CAST (adder)->base_time;
1238   /* add latency, this includes our own latency and the peer latency. */
1239   sync_time += adder->latency_ms * GST_MSECOND;
1240   sync_time += adder->peer_latency;
1241
1242   /* create an entry for the clock */
1243   id = adder->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1244   GST_OBJECT_UNLOCK (adder);
1245
1246   ret = gst_clock_id_wait (id, NULL);
1247
1248   GST_OBJECT_LOCK (adder);
1249
1250   /* and free the entry */
1251   gst_clock_id_unref (id);
1252   adder->clock_id = NULL;
1253
1254   /* at this point, the clock could have been unlocked by a timeout, a new
1255    * head element was added to the queue or because we are shutting down. Check
1256    * for shutdown first. */
1257
1258   if (adder->srcresult != GST_FLOW_OK)
1259     goto flushing;
1260
1261   if (ret == GST_CLOCK_UNSCHEDULED) {
1262     GST_DEBUG_OBJECT (adder,
1263         "Wait got unscheduled, will retry to push with new buffer");
1264     goto again;
1265   }
1266
1267   if (ret != GST_CLOCK_OK && ret != GST_CLOCK_EARLY)
1268     goto clock_error;
1269
1270 push_buffer:
1271
1272   buffer = g_queue_pop_head (adder->buffers);
1273
1274   if (!buffer)
1275     goto again;
1276
1277   /*
1278    * We make sure the timestamps are exactly contiguous
1279    * If its only small skew (due to rounding errors), we correct it
1280    * silently. Otherwise we put the discont flag
1281    */
1282   if (GST_CLOCK_TIME_IS_VALID (adder->next_timestamp) &&
1283       GST_BUFFER_TIMESTAMP (buffer) != adder->next_timestamp) {
1284     GstClockTimeDiff diff = GST_CLOCK_DIFF (GST_BUFFER_TIMESTAMP (buffer),
1285         adder->next_timestamp);
1286     if (diff < 0)
1287       diff = -diff;
1288
1289     if (diff < GST_SECOND / GST_AUDIO_INFO_RATE (&adder->info)) {
1290       GST_BUFFER_TIMESTAMP (buffer) = adder->next_timestamp;
1291       GST_DEBUG_OBJECT (adder, "Correcting slight skew");
1292       GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
1293     } else {
1294       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1295       GST_DEBUG_OBJECT (adder, "Expected buffer at %" GST_TIME_FORMAT
1296           ", but is at %" GST_TIME_FORMAT ", setting discont",
1297           GST_TIME_ARGS (adder->next_timestamp),
1298           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
1299     }
1300   } else {
1301     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
1302   }
1303
1304   GST_BUFFER_OFFSET (buffer) = GST_BUFFER_OFFSET_NONE;
1305   GST_BUFFER_OFFSET_END (buffer) = GST_BUFFER_OFFSET_NONE;
1306
1307   if (GST_BUFFER_DURATION_IS_VALID (buffer))
1308     adder->next_timestamp = GST_BUFFER_TIMESTAMP (buffer) +
1309         GST_BUFFER_DURATION (buffer);
1310   else
1311     adder->next_timestamp = GST_CLOCK_TIME_NONE;
1312   GST_OBJECT_UNLOCK (adder);
1313
1314   if (newseg_event)
1315     gst_pad_push_event (adder->srcpad, newseg_event);
1316
1317   GST_LOG_OBJECT (adder, "About to push buffer time:%" GST_TIME_FORMAT
1318       " duration:%" GST_TIME_FORMAT,
1319       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
1320       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
1321
1322   result = gst_pad_push (adder->srcpad, buffer);
1323   if (result != GST_FLOW_OK)
1324     goto pause;
1325
1326   return;
1327
1328 flushing:
1329   {
1330     GST_DEBUG_OBJECT (adder, "we are flushing");
1331     gst_pad_pause_task (adder->srcpad);
1332     GST_OBJECT_UNLOCK (adder);
1333     return;
1334   }
1335
1336 clock_error:
1337   {
1338     gst_pad_pause_task (adder->srcpad);
1339     GST_OBJECT_UNLOCK (adder);
1340     GST_ELEMENT_ERROR (adder, STREAM, MUX, ("Error with the clock"),
1341         ("Error with the clock: %d", ret));
1342     GST_ERROR_OBJECT (adder, "Error with the clock: %d", ret);
1343     return;
1344   }
1345
1346 no_clock:
1347   {
1348     gst_pad_pause_task (adder->srcpad);
1349     GST_OBJECT_UNLOCK (adder);
1350     GST_ELEMENT_ERROR (adder, STREAM, MUX, ("No available clock"),
1351         ("No available clock"));
1352     GST_ERROR_OBJECT (adder, "No available clock");
1353     return;
1354   }
1355
1356 pause:
1357   {
1358     GST_DEBUG_OBJECT (adder, "pausing task, reason %s",
1359         gst_flow_get_name (result));
1360
1361     GST_OBJECT_LOCK (adder);
1362
1363     /* store result */
1364     adder->srcresult = result;
1365     /* we don't post errors or anything because upstream will do that for us
1366      * when we pass the return value upstream. */
1367     gst_pad_pause_task (adder->srcpad);
1368     GST_OBJECT_UNLOCK (adder);
1369     return;
1370   }
1371
1372 eos:
1373   {
1374     /* store result, we are flushing now */
1375     GST_DEBUG_OBJECT (adder, "We are EOS, pushing EOS downstream");
1376     adder->srcresult = GST_FLOW_EOS;
1377     gst_pad_pause_task (adder->srcpad);
1378     GST_OBJECT_UNLOCK (adder);
1379     gst_pad_push_event (adder->srcpad, gst_event_new_eos ());
1380     return;
1381   }
1382 }
1383
1384 static GstPad *
1385 gst_live_adder_request_new_pad (GstElement * element, GstPadTemplate * templ,
1386     const gchar * ignored_name, const GstCaps * caps)
1387 {
1388   gchar *name;
1389   GstLiveAdder *adder;
1390   GstPad *newpad;
1391   gint padcount;
1392   GstLiveAdderPadPrivate *padprivate = NULL;
1393
1394   if (templ->direction != GST_PAD_SINK)
1395     goto not_sink;
1396
1397   adder = GST_LIVE_ADDER (element);
1398
1399   /* increment pad counter */
1400 #if GLIB_CHECK_VERSION(2,29,5)
1401   padcount = g_atomic_int_add (&adder->padcount, 1);
1402 #else
1403   padcount = g_atomic_int_exchange_and_add (&adder->padcount, 1);
1404 #endif
1405
1406   name = g_strdup_printf ("sink_%u", padcount);
1407   newpad = gst_pad_new_from_template (templ, name);
1408   GST_DEBUG_OBJECT (adder, "request new pad %s", name);
1409   g_free (name);
1410
1411   gst_pad_set_event_function (newpad,
1412       GST_DEBUG_FUNCPTR (gst_live_adder_sink_event));
1413   gst_pad_set_query_function (newpad,
1414       GST_DEBUG_FUNCPTR (gst_live_adder_sink_query));
1415
1416   padprivate = g_new0 (GstLiveAdderPadPrivate, 1);
1417
1418   gst_segment_init (&padprivate->segment, GST_FORMAT_UNDEFINED);
1419   padprivate->eos = FALSE;
1420   padprivate->expected_timestamp = GST_CLOCK_TIME_NONE;
1421
1422   gst_pad_set_element_private (newpad, padprivate);
1423
1424   gst_pad_set_chain_function (newpad, gst_live_live_adder_chain);
1425
1426
1427   if (!gst_pad_set_active (newpad, TRUE))
1428     goto could_not_activate;
1429
1430   /* takes ownership of the pad */
1431   if (!gst_element_add_pad (GST_ELEMENT (adder), newpad))
1432     goto could_not_add;
1433
1434   GST_OBJECT_LOCK (adder);
1435   adder->sinkpads = g_list_prepend (adder->sinkpads, newpad);
1436   GST_OBJECT_UNLOCK (adder);
1437
1438   return newpad;
1439
1440   /* errors */
1441 not_sink:
1442   {
1443     g_warning ("gstadder: request new pad that is not a SINK pad\n");
1444     return NULL;
1445   }
1446 could_not_add:
1447   {
1448     GST_DEBUG_OBJECT (adder, "could not add pad");
1449     g_free (padprivate);
1450     gst_object_unref (newpad);
1451     return NULL;
1452   }
1453 could_not_activate:
1454   {
1455     GST_DEBUG_OBJECT (adder, "could not activate new pad");
1456     g_free (padprivate);
1457     gst_object_unref (newpad);
1458     return NULL;
1459   }
1460 }
1461
1462 static void
1463 gst_live_adder_release_pad (GstElement * element, GstPad * pad)
1464 {
1465   GstLiveAdder *adder;
1466   GstLiveAdderPadPrivate *padprivate;
1467
1468   adder = GST_LIVE_ADDER (element);
1469
1470   GST_DEBUG_OBJECT (adder, "release pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1471
1472   GST_OBJECT_LOCK (element);
1473   padprivate = gst_pad_get_element_private (pad);
1474   gst_pad_set_element_private (pad, NULL);
1475   adder->sinkpads = g_list_remove_all (adder->sinkpads, pad);
1476   GST_OBJECT_UNLOCK (element);
1477
1478   g_free (padprivate);
1479
1480   gst_element_remove_pad (element, pad);
1481 }
1482
1483 static void
1484 reset_pad_private (GstPad * pad)
1485 {
1486   GstLiveAdderPadPrivate *padprivate;
1487
1488   padprivate = gst_pad_get_element_private (pad);
1489
1490   if (!padprivate)
1491     return;
1492
1493   gst_segment_init (&padprivate->segment, GST_FORMAT_UNDEFINED);
1494
1495   padprivate->expected_timestamp = GST_CLOCK_TIME_NONE;
1496   padprivate->eos = FALSE;
1497 }
1498
1499 static GstStateChangeReturn
1500 gst_live_adder_change_state (GstElement * element, GstStateChange transition)
1501 {
1502   GstLiveAdder *adder;
1503   GstStateChangeReturn ret;
1504
1505   adder = GST_LIVE_ADDER (element);
1506
1507   switch (transition) {
1508     case GST_STATE_CHANGE_READY_TO_PAUSED:
1509       GST_OBJECT_LOCK (adder);
1510       adder->segment_pending = TRUE;
1511       adder->peer_latency = 0;
1512       adder->next_timestamp = GST_CLOCK_TIME_NONE;
1513       g_list_foreach (adder->sinkpads, (GFunc) reset_pad_private, NULL);
1514       GST_OBJECT_UNLOCK (adder);
1515       break;
1516     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1517       GST_OBJECT_LOCK (adder);
1518       adder->playing = FALSE;
1519       GST_OBJECT_UNLOCK (adder);
1520       break;
1521     default:
1522       break;
1523   }
1524
1525   ret = GST_ELEMENT_CLASS (gst_live_adder_parent_class)->change_state (element,
1526       transition);
1527
1528   switch (transition) {
1529     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1530       GST_OBJECT_LOCK (adder);
1531       adder->playing = TRUE;
1532       GST_OBJECT_UNLOCK (adder);
1533       break;
1534     default:
1535       break;
1536   }
1537
1538   return ret;
1539 }
1540
1541
1542 static gboolean
1543 plugin_init (GstPlugin * plugin)
1544 {
1545   if (!gst_element_register (plugin, "liveadder", GST_RANK_NONE,
1546           GST_TYPE_LIVE_ADDER)) {
1547     return FALSE;
1548   }
1549
1550   return TRUE;
1551 }
1552
1553 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
1554     GST_VERSION_MINOR,
1555     liveadder,
1556     "Adds multiple live discontinuous streams",
1557     plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)