4 * Copyright 2012 Collabora Ltd
5 * Copyright 2008 Nokia Corporation
6 * @author: Olivier Crete <olivier.crete@collabora.co.uk>
8 * With parts copied from the adder plugin which is
9 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
10 * 2001 Thomas <thomas@apestaart.org>
11 * 2005,2006 Wim Taymans <wim@fluendo.com>
13 * This library is free software; you can redistribute it and/or
14 * modify it under the terms of the GNU Lesser General Public
15 * License as published by the Free Software Foundation; either
16 * version 2.1 of the License, or (at your option) any later version.
18 * This library is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 * Lesser General Public License for more details.
23 * You should have received a copy of the GNU Lesser General Public
24 * License along with this library; if not, write to the Free Software
25 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
29 * SECTION:element-liveadder
32 * The live adder allows to mix several streams into one by adding the data.
33 * Mixed data is clamped to the min/max values of the data format.
35 * Unlike the adder, the liveadder mixes the streams according the their
36 * timestamps and waits for some milli-seconds before trying doing the mixing.
38 * Last reviewed on 2008-02-10 (0.10.11)
45 #include "liveadder.h"
47 #include <gst/audio/audio.h>
51 #define DEFAULT_LATENCY_MS 60
53 GST_DEBUG_CATEGORY_STATIC (live_adder_debug);
54 #define GST_CAT_DEFAULT (live_adder_debug)
56 static GstStaticPadTemplate gst_live_adder_sink_template =
57 GST_STATIC_PAD_TEMPLATE ("sink_%u",
60 GST_STATIC_CAPS (GST_AUDIO_CAPS_MAKE ("{ S8, U8, "
61 GST_AUDIO_NE (S16) "," GST_AUDIO_NE (U16) ","
62 GST_AUDIO_NE (S32) "," GST_AUDIO_NE (U32) ","
63 GST_AUDIO_NE (F32) "," GST_AUDIO_NE (F64) "}"))
66 static GstStaticPadTemplate gst_live_adder_src_template =
67 GST_STATIC_PAD_TEMPLATE ("src",
70 GST_STATIC_CAPS (GST_AUDIO_CAPS_MAKE ("{ S8, U8, "
71 GST_AUDIO_NE (S16) "," GST_AUDIO_NE (U16) ","
72 GST_AUDIO_NE (S32) "," GST_AUDIO_NE (U32) ","
73 GST_AUDIO_NE (F32) "," GST_AUDIO_NE (F64) "}"))
76 /* Valve signals and args */
89 typedef struct _GstLiveAdderPadPrivate
94 GstClockTime expected_timestamp;
96 } GstLiveAdderPadPrivate;
98 G_DEFINE_TYPE (GstLiveAdder, gst_live_adder, GST_TYPE_ELEMENT);
100 static void gst_live_adder_finalize (GObject * object);
102 gst_live_adder_set_property (GObject * object,
103 guint prop_id, const GValue * value, GParamSpec * pspec);
105 gst_live_adder_get_property (GObject * object,
106 guint prop_id, GValue * value, GParamSpec * pspec);
108 static GstPad *gst_live_adder_request_new_pad (GstElement * element,
109 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
110 static void gst_live_adder_release_pad (GstElement * element, GstPad * pad);
111 static GstStateChangeReturn
112 gst_live_adder_change_state (GstElement * element, GstStateChange transition);
114 static gboolean gst_live_adder_setcaps (GstLiveAdder * adder, GstPad * pad,
116 static GstCaps *gst_live_adder_sink_getcaps (GstLiveAdder * adder, GstPad * pad,
118 static gboolean gst_live_adder_src_activate_mode (GstPad * pad,
119 GstObject * parent, GstPadMode mode, gboolean active);
120 static gboolean gst_live_adder_src_event (GstPad * pad, GstObject * parent,
123 static void gst_live_adder_loop (gpointer data);
124 static gboolean gst_live_adder_src_query (GstPad * pad, GstObject * parent,
126 static gboolean gst_live_adder_sink_query (GstPad * pad, GstObject * parent,
128 static gboolean gst_live_adder_sink_event (GstPad * pad, GstObject * parent,
132 static void reset_pad_private (GstPad * pad);
134 /* clipping versions */
135 #define MAKE_FUNC(name,type,ttype,min,max) \
136 static void name (type *out, type *in, gint bytes) { \
138 for (i = 0; i < bytes / sizeof (type); i++) \
139 out[i] = CLAMP ((ttype)out[i] + (ttype)in[i], min, max); \
142 /* non-clipping versions (for float) */
143 #define MAKE_FUNC_NC(name,type,ttype) \
144 static void name (type *out, type *in, gint bytes) { \
146 for (i = 0; i < bytes / sizeof (type); i++) \
147 out[i] = (ttype)out[i] + (ttype)in[i]; \
151 MAKE_FUNC (add_int32, gint32, gint64, G_MININT32, G_MAXINT32)
152 MAKE_FUNC (add_int16, gint16, gint32, G_MININT16, G_MAXINT16)
153 MAKE_FUNC (add_int8, gint8, gint16, G_MININT8, G_MAXINT8)
154 MAKE_FUNC (add_uint32, guint32, guint64, 0, G_MAXUINT32)
155 MAKE_FUNC (add_uint16, guint16, guint32, 0, G_MAXUINT16)
156 MAKE_FUNC (add_uint8, guint8, guint16, 0, G_MAXUINT8)
157 MAKE_FUNC_NC (add_float64, gdouble, gdouble)
158 MAKE_FUNC_NC (add_float32, gfloat, gfloat)
163 gst_live_adder_class_init (GstLiveAdderClass * klass)
165 GObjectClass *gobject_class = (GObjectClass *) klass;
166 GstElementClass *gstelement_class = (GstElementClass *) klass;
168 GST_DEBUG_CATEGORY_INIT (live_adder_debug, "liveadder", 0, "Live Adder");
170 gst_element_class_add_pad_template (gstelement_class,
171 gst_static_pad_template_get (&gst_live_adder_src_template));
172 gst_element_class_add_pad_template (gstelement_class,
173 gst_static_pad_template_get (&gst_live_adder_sink_template));
174 gst_element_class_set_static_metadata (gstelement_class, "Live Adder element",
176 "Mixes live/discontinuous audio streams",
177 "Olivier Crete <olivier.crete@collabora.co.uk>");
179 gobject_class->finalize = gst_live_adder_finalize;
180 gobject_class->set_property = gst_live_adder_set_property;
181 gobject_class->get_property = gst_live_adder_get_property;
183 gstelement_class->request_new_pad = gst_live_adder_request_new_pad;
184 gstelement_class->release_pad = gst_live_adder_release_pad;
185 gstelement_class->change_state = gst_live_adder_change_state;
187 g_object_class_install_property (gobject_class, PROP_LATENCY,
188 g_param_spec_uint ("latency", "Buffering latency",
189 "Amount of data to buffer (in milliseconds)",
190 0, G_MAXUINT, DEFAULT_LATENCY_MS,
191 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
195 gst_live_adder_init (GstLiveAdder * adder)
198 gst_pad_new_from_static_template (&gst_live_adder_src_template, "src");
199 gst_pad_set_query_function (adder->srcpad,
200 GST_DEBUG_FUNCPTR (gst_live_adder_src_query));
201 gst_pad_set_event_function (adder->srcpad,
202 GST_DEBUG_FUNCPTR (gst_live_adder_src_event));
203 gst_pad_set_activatemode_function (adder->srcpad,
204 GST_DEBUG_FUNCPTR (gst_live_adder_src_activate_mode));
205 gst_element_add_pad (GST_ELEMENT (adder), adder->srcpad);
209 g_cond_init (&adder->not_empty_cond);
211 adder->next_timestamp = GST_CLOCK_TIME_NONE;
213 adder->latency_ms = DEFAULT_LATENCY_MS;
215 adder->buffers = g_queue_new ();
220 gst_live_adder_finalize (GObject * object)
222 GstLiveAdder *adder = GST_LIVE_ADDER (object);
224 g_cond_clear (&adder->not_empty_cond);
226 g_queue_foreach (adder->buffers, (GFunc) gst_mini_object_unref, NULL);
227 while (g_queue_pop_head (adder->buffers)) {
229 g_queue_free (adder->buffers);
231 g_list_free (adder->sinkpads);
233 G_OBJECT_CLASS (gst_live_adder_parent_class)->finalize (object);
238 gst_live_adder_set_property (GObject * object,
239 guint prop_id, const GValue * value, GParamSpec * pspec)
241 GstLiveAdder *adder = GST_LIVE_ADDER (object);
246 guint64 new_latency, old_latency;
248 new_latency = g_value_get_uint (value);
250 GST_OBJECT_LOCK (adder);
251 old_latency = adder->latency_ms;
252 adder->latency_ms = new_latency;
253 GST_OBJECT_UNLOCK (adder);
255 /* post message if latency changed, this will inform the parent pipeline
256 * that a latency reconfiguration is possible/needed. */
257 if (new_latency != old_latency) {
258 GST_DEBUG_OBJECT (adder, "latency changed to: %" GST_TIME_FORMAT,
259 GST_TIME_ARGS (new_latency));
261 gst_element_post_message (GST_ELEMENT_CAST (adder),
262 gst_message_new_latency (GST_OBJECT_CAST (adder)));
267 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
274 gst_live_adder_get_property (GObject * object,
275 guint prop_id, GValue * value, GParamSpec * pspec)
277 GstLiveAdder *adder = GST_LIVE_ADDER (object);
281 GST_OBJECT_LOCK (adder);
282 g_value_set_uint (value, adder->latency_ms);
283 GST_OBJECT_UNLOCK (adder);
286 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
292 /* we can only accept caps that we and downstream can handle. */
294 gst_live_adder_sink_getcaps (GstLiveAdder * adder, GstPad * pad,
297 GstCaps *result, *peercaps, *sinkcaps;
299 /* get the downstream possible caps */
300 peercaps = gst_pad_peer_query_caps (adder->srcpad, filter);
301 /* get the allowed caps on this sinkpad, we use the fixed caps function so
302 * that it does not call recursively in this function. */
303 sinkcaps = gst_pad_get_current_caps (pad);
305 sinkcaps = gst_pad_get_pad_template_caps (pad);
307 /* if the peer has caps, intersect */
308 GST_DEBUG_OBJECT (adder, "intersecting peer and template caps");
309 result = gst_caps_intersect (peercaps, sinkcaps);
310 gst_caps_unref (sinkcaps);
311 gst_caps_unref (peercaps);
313 /* the peer has no caps (or there is no peer), just use the allowed caps
314 * of this sinkpad. */
315 GST_DEBUG_OBJECT (adder, "no peer caps, using sinkcaps");
322 struct SetCapsIterCtx
330 check_other_caps (const GValue * item, gpointer user_data)
332 GstPad *otherpad = GST_PAD (g_value_get_object (item));
333 struct SetCapsIterCtx *ctx = user_data;
335 if (otherpad == ctx->pad)
338 if (!gst_pad_peer_query_accept_caps (otherpad, ctx->caps))
339 ctx->all_valid = FALSE;
343 set_other_caps (const GValue * item, gpointer user_data)
345 GstPad *otherpad = GST_PAD (g_value_get_object (item));
346 struct SetCapsIterCtx *ctx = user_data;
348 if (otherpad == ctx->pad)
351 if (!gst_pad_set_caps (otherpad, ctx->caps))
352 ctx->all_valid = FALSE;
355 /* the first caps we receive on any of the sinkpads will define the caps for all
356 * the other sinkpads because we can only mix streams with the same caps.
359 gst_live_adder_setcaps (GstLiveAdder * adder, GstPad * pad, GstCaps * caps)
362 struct SetCapsIterCtx ctx;
364 GST_LOG_OBJECT (adder, "setting caps on pad %p,%s to %" GST_PTR_FORMAT, pad,
365 GST_PAD_NAME (pad), caps);
367 /* FIXME, see if the other pads can accept the format. Also lock the
368 * format on the other pads to this new format. */
369 iter = gst_element_iterate_sink_pads (GST_ELEMENT (adder));
372 ctx.all_valid = TRUE;
373 while (gst_iterator_foreach (iter, check_other_caps, &ctx) ==
374 GST_ITERATOR_RESYNC) {
375 ctx.all_valid = TRUE;
376 gst_iterator_resync (iter);
378 if (!ctx.all_valid) {
379 GST_WARNING_OBJECT (adder, "Caps are not acceptable by other sinkpads");
380 gst_iterator_free (iter);
384 while (gst_iterator_foreach (iter, set_other_caps, &ctx) ==
385 GST_ITERATOR_RESYNC) {
386 ctx.all_valid = TRUE;
387 gst_iterator_resync (iter);
389 gst_iterator_free (iter);
391 if (!ctx.all_valid) {
392 GST_WARNING_OBJECT (adder, "Could not set caps on the other sink pads");
396 if (!gst_pad_set_caps (adder->srcpad, caps)) {
397 GST_WARNING_OBJECT (adder, "Could not set caps downstream");
401 GST_OBJECT_LOCK (adder);
403 if (!gst_audio_info_from_caps (&adder->info, caps))
406 if (GST_AUDIO_INFO_IS_INTEGER (&adder->info)) {
407 switch (GST_AUDIO_INFO_WIDTH (&adder->info)) {
409 adder->func = GST_AUDIO_INFO_IS_SIGNED (&adder->info) ?
410 (GstLiveAdderFunction) add_int8 : (GstLiveAdderFunction) add_uint8;
413 adder->func = GST_AUDIO_INFO_IS_SIGNED (&adder->info) ?
414 (GstLiveAdderFunction) add_int16 : (GstLiveAdderFunction)
418 adder->func = GST_AUDIO_INFO_IS_SIGNED (&adder->info) ?
419 (GstLiveAdderFunction) add_int32 : (GstLiveAdderFunction)
425 } else if (GST_AUDIO_INFO_IS_FLOAT (&adder->info)) {
426 switch (GST_AUDIO_INFO_WIDTH (&adder->info)) {
428 adder->func = (GstLiveAdderFunction) add_float32;
431 adder->func = (GstLiveAdderFunction) add_float64;
440 GST_OBJECT_UNLOCK (adder);
446 GST_OBJECT_UNLOCK (adder);
447 GST_DEBUG_OBJECT (adder, "unsupported format set as caps");
453 gst_live_adder_flush_start (GstLiveAdder * adder)
455 GST_DEBUG_OBJECT (adder, "Disabling pop on queue");
457 GST_OBJECT_LOCK (adder);
458 /* mark ourselves as flushing */
459 adder->srcresult = GST_FLOW_FLUSHING;
461 /* Empty the queue */
462 g_queue_foreach (adder->buffers, (GFunc) gst_mini_object_unref, NULL);
463 while (g_queue_pop_head (adder->buffers));
465 /* unlock clock, we just unschedule, the entry will be released by the
466 * locking streaming thread. */
468 gst_clock_id_unschedule (adder->clock_id);
470 g_cond_broadcast (&adder->not_empty_cond);
471 GST_OBJECT_UNLOCK (adder);
475 gst_live_adder_src_activate_mode (GstPad * pad, GstObject * parent,
476 GstPadMode mode, gboolean active)
478 GstLiveAdder *adder = GST_LIVE_ADDER (parent);
479 gboolean result = TRUE;
481 if (mode == GST_PAD_MODE_PULL)
485 /* Mark as non flushing */
486 GST_OBJECT_LOCK (adder);
487 adder->srcresult = GST_FLOW_OK;
488 GST_OBJECT_UNLOCK (adder);
490 /* start pushing out buffers */
491 GST_DEBUG_OBJECT (adder, "Starting task on srcpad");
492 gst_pad_start_task (adder->srcpad,
493 (GstTaskFunction) gst_live_adder_loop, adder, NULL);
495 /* make sure all data processing stops ASAP */
496 gst_live_adder_flush_start (adder);
498 /* NOTE this will hardlock if the state change is called from the src pad
499 * task thread because we will _join() the thread. */
500 GST_DEBUG_OBJECT (adder, "Stopping task on srcpad");
501 result = gst_pad_stop_task (pad);
508 gst_live_adder_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
510 GstLiveAdder *adder = GST_LIVE_ADDER (parent);
511 GstLiveAdderPadPrivate *padprivate = NULL;
514 padprivate = gst_pad_get_element_private (pad);
519 GST_LOG_OBJECT (adder, "received %s", GST_EVENT_TYPE_NAME (event));
521 switch (GST_EVENT_TYPE (event)) {
526 gst_event_parse_caps (event, &caps);
527 ret = gst_live_adder_setcaps (adder, pad, caps);
528 gst_event_unref (event);
531 case GST_EVENT_SEGMENT:
533 const GstSegment *segment;
534 GstSegment livesegment;
536 gst_event_parse_segment (event, &segment);
538 /* we need time for now */
539 if (segment->format != GST_FORMAT_TIME)
540 goto newseg_wrong_format;
542 /* now configure the values, we need these to time the release of the
543 * buffers on the srcpad. */
544 GST_OBJECT_LOCK (adder);
545 gst_segment_copy_into (segment, &padprivate->segment);
546 GST_OBJECT_UNLOCK (adder);
547 gst_event_unref (event);
549 gst_segment_init (&livesegment, GST_FORMAT_TIME);
550 gst_pad_push_event (adder->srcpad, gst_event_new_segment (&livesegment));
553 case GST_EVENT_FLUSH_START:
554 gst_live_adder_flush_start (adder);
555 ret = gst_pad_push_event (adder->srcpad, event);
557 case GST_EVENT_FLUSH_STOP:
558 GST_OBJECT_LOCK (adder);
559 adder->next_timestamp = GST_CLOCK_TIME_NONE;
560 reset_pad_private (pad);
561 GST_OBJECT_UNLOCK (adder);
562 ret = gst_pad_push_event (adder->srcpad, event);
563 ret = gst_live_adder_src_activate_mode (adder->srcpad, GST_OBJECT (adder),
564 GST_PAD_MODE_PUSH, TRUE);
568 GST_OBJECT_LOCK (adder);
570 ret = adder->srcresult == GST_FLOW_OK;
571 if (ret && !padprivate->eos) {
572 GST_DEBUG_OBJECT (adder, "queuing EOS");
573 padprivate->eos = TRUE;
574 g_cond_broadcast (&adder->not_empty_cond);
575 } else if (padprivate->eos) {
576 GST_DEBUG_OBJECT (adder, "dropping EOS, we are already EOS");
578 GST_DEBUG_OBJECT (adder, "dropping EOS, reason %s",
579 gst_flow_get_name (adder->srcresult));
582 GST_OBJECT_UNLOCK (adder);
584 gst_event_unref (event);
588 ret = gst_pad_push_event (adder->srcpad, event);
599 GST_DEBUG_OBJECT (adder, "received non TIME segment");
606 gst_live_adder_query_pos_dur (GstLiveAdder * adder, GstFormat format,
607 gboolean position, gint64 * outvalue)
609 gint64 max = G_MININT64;
612 gboolean done = FALSE;
615 it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (adder));
617 GstIteratorResult ires;
620 ires = gst_iterator_next (it, &item);
622 case GST_ITERATOR_DONE:
625 case GST_ITERATOR_OK:
627 GstPad *pad = GST_PAD_CAST (g_value_get_object (&item));
631 /* ask sink peer for duration */
633 curres = gst_pad_peer_query_position (pad, format, &value);
635 curres = gst_pad_peer_query_duration (pad, format, &value);
637 /* take max from all valid return values */
638 /* Only if the format is the one we requested, otherwise ignore it ?
644 /* valid unknown length, stop searching */
648 } else if (value > max) {
654 case GST_ITERATOR_RESYNC:
664 gst_iterator_free (it);
674 * When we add a new stream (or remove a stream) the duration might
675 * also become invalid again and we need to post a new DURATION
676 * message to notify this fact to the parent.
677 * For now we take the max of all the upstream elements so the simple
678 * cases work at least somewhat.
681 gst_live_adder_query_duration (GstLiveAdder * adder, GstQuery * query)
688 gst_query_parse_duration (query, &format, NULL);
690 res = gst_live_adder_query_pos_dur (adder, format, FALSE, &max);
693 /* and store the max */
694 gst_query_set_duration (query, format, max);
701 gst_live_adder_query_position (GstLiveAdder * adder, GstQuery * query)
708 gst_query_parse_position (query, &format, NULL);
710 res = gst_live_adder_query_pos_dur (adder, format, TRUE, &max);
713 /* and store the max */
714 gst_query_set_position (query, format, max);
723 gst_live_adder_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
725 GstLiveAdder *adder = GST_LIVE_ADDER (parent);
726 gboolean res = FALSE;
728 switch (GST_QUERY_TYPE (query)) {
729 case GST_QUERY_LATENCY:
731 /* We need to send the query upstream and add the returned latency to our
733 GstClockTime min_latency = 0, max_latency = G_MAXUINT64;
735 GstIterator *iter = NULL;
736 gboolean done = FALSE;
738 iter = gst_element_iterate_sink_pads (GST_ELEMENT (adder));
741 switch (gst_iterator_next (iter, &item)) {
742 case GST_ITERATOR_OK:
744 GstPad *sinkpad = GST_PAD (g_value_get_object (&item));
745 GstClockTime pad_min_latency, pad_max_latency;
746 gboolean pad_us_live;
748 if (gst_pad_peer_query (sinkpad, query)) {
749 gst_query_parse_latency (query, &pad_us_live, &pad_min_latency,
754 GST_DEBUG_OBJECT (adder, "Peer latency for pad %s: min %"
755 GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
756 GST_PAD_NAME (sinkpad),
757 GST_TIME_ARGS (pad_min_latency),
758 GST_TIME_ARGS (pad_max_latency));
760 min_latency = MAX (pad_min_latency, min_latency);
761 max_latency = MIN (pad_max_latency, max_latency);
765 case GST_ITERATOR_RESYNC:
767 max_latency = G_MAXUINT64;
769 gst_iterator_resync (iter);
771 case GST_ITERATOR_ERROR:
772 GST_ERROR_OBJECT (adder, "Error looping sink pads");
775 case GST_ITERATOR_DONE:
780 gst_iterator_free (iter);
783 GstClockTime my_latency = adder->latency_ms * GST_MSECOND;
784 GST_OBJECT_LOCK (adder);
785 adder->peer_latency = min_latency;
786 min_latency += my_latency;
787 GST_OBJECT_UNLOCK (adder);
789 /* Make sure we don't risk an overflow */
790 if (max_latency < G_MAXUINT64 - my_latency)
791 max_latency += my_latency;
793 max_latency = G_MAXUINT64;
794 gst_query_set_latency (query, TRUE, min_latency, max_latency);
795 GST_DEBUG_OBJECT (adder, "Calculated total latency : min %"
796 GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
797 GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
801 case GST_QUERY_DURATION:
802 res = gst_live_adder_query_duration (adder, query);
804 case GST_QUERY_POSITION:
805 res = gst_live_adder_query_position (adder, query);
808 res = gst_pad_query_default (pad, parent, query);
816 gst_live_adder_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
818 GstLiveAdder *adder = GST_LIVE_ADDER (parent);
821 switch (GST_QUERY_TYPE (query)) {
827 gst_query_parse_caps (query, &filter);
828 result = gst_live_adder_sink_getcaps (adder, pad, filter);
829 gst_query_set_caps_result (query, result);
834 res = gst_pad_query_default (pad, parent, query);
842 forward_event_func (const GValue * item, GValue * ret, gpointer user_data)
844 GstPad *pad = GST_PAD (g_value_get_object (item));
845 GstEvent *event = user_data;
847 gst_event_ref (event);
848 GST_LOG_OBJECT (pad, "About to send event %s", GST_EVENT_TYPE_NAME (event));
849 if (!gst_pad_push_event (pad, event)) {
850 g_value_set_boolean (ret, FALSE);
851 GST_WARNING_OBJECT (pad, "Sending event %p (%s) failed.",
852 event, GST_EVENT_TYPE_NAME (event));
854 GST_LOG_OBJECT (pad, "Sent event %p (%s).",
855 event, GST_EVENT_TYPE_NAME (event));
860 /* forwards the event to all sinkpads, takes ownership of the
863 * Returns: TRUE if the event could be forwarded on all
867 forward_event (GstLiveAdder * adder, GstEvent * event)
873 GST_LOG_OBJECT (adder, "Forwarding event %p (%s)", event,
874 GST_EVENT_TYPE_NAME (event));
878 g_value_init (&vret, G_TYPE_BOOLEAN);
879 g_value_set_boolean (&vret, TRUE);
880 it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (adder));
881 gst_iterator_fold (it, forward_event_func, &vret, event);
882 gst_iterator_free (it);
884 ret = g_value_get_boolean (&vret);
891 gst_live_adder_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
893 GstLiveAdder *adder = GST_LIVE_ADDER (parent);
896 switch (GST_EVENT_TYPE (event)) {
898 /* TODO : QoS might be tricky */
901 case GST_EVENT_NAVIGATION:
902 /* TODO : navigation is rather pointless. */
906 /* just forward the rest for now */
907 result = forward_event (adder, event);
911 gst_event_unref (event);
917 gst_live_adder_length_from_duration (GstLiveAdder * adder,
918 GstClockTime duration)
920 guint64 ret = GST_AUDIO_INFO_BPF (&adder->info) *
921 (duration * GST_AUDIO_INFO_RATE (&adder->info) / GST_SECOND);
927 gst_live_live_adder_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
929 GstLiveAdder *adder = GST_LIVE_ADDER (parent);
930 GstLiveAdderPadPrivate *padprivate = NULL;
931 GstFlowReturn ret = GST_FLOW_OK;
933 GstClockTime skip = 0;
934 gint64 drift = 0; /* Positive if new buffer after old buffer */
936 GST_OBJECT_LOCK (adder);
938 ret = adder->srcresult;
940 GST_DEBUG ("Incoming buffer time:%" GST_TIME_FORMAT " duration:%"
941 GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
942 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
944 if (ret != GST_FLOW_OK) {
945 GST_DEBUG_OBJECT (adder, "Passing non-ok result from src: %s",
946 gst_flow_get_name (ret));
947 gst_buffer_unref (buffer);
951 padprivate = gst_pad_get_element_private (pad);
954 ret = GST_FLOW_NOT_LINKED;
955 gst_buffer_unref (buffer);
959 if (padprivate->eos) {
960 GST_DEBUG_OBJECT (adder, "Received buffer after EOS");
962 gst_buffer_unref (buffer);
966 if (!GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
967 goto invalid_timestamp;
969 if (padprivate->segment.format == GST_FORMAT_UNDEFINED) {
970 GST_WARNING_OBJECT (adder, "No new-segment received,"
971 " initializing segment with time 0..-1");
972 gst_segment_init (&padprivate->segment, GST_FORMAT_TIME);
975 buffer = gst_buffer_make_writable (buffer);
977 drift = GST_BUFFER_TIMESTAMP (buffer) - padprivate->expected_timestamp;
979 /* Just see if we receive invalid timestamp/durations */
980 if (GST_CLOCK_TIME_IS_VALID (padprivate->expected_timestamp) &&
981 !GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT) &&
983 GST_LOG_OBJECT (adder,
984 "Timestamp discontinuity without the DISCONT flag set"
985 " (expected %" GST_TIME_FORMAT ", got %" GST_TIME_FORMAT
986 " drift:%" G_GINT64_FORMAT "ms)",
987 GST_TIME_ARGS (padprivate->expected_timestamp),
988 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), drift / GST_MSECOND);
990 /* We accept drifts of 10ms */
991 if (ABS (drift) < (10 * GST_MSECOND)) {
992 GST_DEBUG ("Correcting minor drift");
993 GST_BUFFER_TIMESTAMP (buffer) = padprivate->expected_timestamp;
998 /* If there is no duration, lets set one */
999 if (!GST_BUFFER_DURATION_IS_VALID (buffer)) {
1000 GST_BUFFER_DURATION (buffer) = (gst_buffer_get_size (buffer) * GST_SECOND) /
1001 (GST_AUDIO_INFO_BPF (&adder->info) *
1002 GST_AUDIO_INFO_RATE (&adder->info));
1003 padprivate->expected_timestamp = GST_CLOCK_TIME_NONE;
1005 padprivate->expected_timestamp = GST_BUFFER_TIMESTAMP (buffer) +
1006 GST_BUFFER_DURATION (buffer);
1011 * Lets clip the buffer to the segment (so we don't have to worry about
1012 * cliping afterwards).
1013 * This should also guarantee us that we'll have valid timestamps and
1014 * durations afterwards
1017 buffer = gst_audio_buffer_clip (buffer, &padprivate->segment,
1018 GST_AUDIO_INFO_RATE (&adder->info), GST_AUDIO_INFO_BPF (&adder->info));
1020 /* buffer can be NULL if it's completely outside of the segment */
1022 GST_DEBUG ("Buffer completely outside of configured segment, dropping it");
1027 * Make sure all incoming buffers share the same timestamping
1029 GST_BUFFER_TIMESTAMP (buffer) =
1030 gst_segment_to_running_time (&padprivate->segment,
1031 padprivate->segment.format, GST_BUFFER_TIMESTAMP (buffer));
1034 if (GST_CLOCK_TIME_IS_VALID (adder->next_timestamp) &&
1035 GST_BUFFER_TIMESTAMP (buffer) < adder->next_timestamp) {
1036 if (GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer) <
1037 adder->next_timestamp) {
1038 GST_DEBUG_OBJECT (adder, "Buffer is late, dropping (ts: %" GST_TIME_FORMAT
1039 " duration: %" GST_TIME_FORMAT ")",
1040 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
1041 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
1042 gst_buffer_unref (buffer);
1045 skip = adder->next_timestamp - GST_BUFFER_TIMESTAMP (buffer);
1046 GST_DEBUG_OBJECT (adder, "Buffer is partially late, skipping %"
1047 GST_TIME_FORMAT, GST_TIME_ARGS (skip));
1051 /* If our new buffer's head is higher than the queue's head, lets wake up,
1052 * we may not have to wait for as long
1054 if (adder->clock_id &&
1055 g_queue_peek_head (adder->buffers) != NULL &&
1056 GST_BUFFER_TIMESTAMP (buffer) + skip <
1057 GST_BUFFER_TIMESTAMP (g_queue_peek_head (adder->buffers)))
1058 gst_clock_id_unschedule (adder->clock_id);
1060 for (item = g_queue_peek_head_link (adder->buffers);
1061 item; item = g_list_next (item)) {
1062 GstBuffer *oldbuffer = item->data;
1063 GstClockTime old_skip = 0;
1064 GstClockTime mix_duration = 0;
1065 GstClockTime mix_start = 0;
1066 GstClockTime mix_end = 0;
1067 GstMapInfo oldmap, map;
1069 /* We haven't reached our place yet */
1070 if (GST_BUFFER_TIMESTAMP (buffer) + skip >=
1071 GST_BUFFER_TIMESTAMP (oldbuffer) + GST_BUFFER_DURATION (oldbuffer))
1074 /* We're past our place, lets insert ouselves here */
1075 if (GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer) <=
1076 GST_BUFFER_TIMESTAMP (oldbuffer))
1079 /* if we reach this spot, we have overlap, so we must mix */
1081 /* First make a subbuffer with the non-overlapping part */
1082 if (GST_BUFFER_TIMESTAMP (buffer) + skip < GST_BUFFER_TIMESTAMP (oldbuffer)) {
1083 GstBuffer *subbuffer = NULL;
1084 GstClockTime subbuffer_duration = GST_BUFFER_TIMESTAMP (oldbuffer) -
1085 (GST_BUFFER_TIMESTAMP (buffer) + skip);
1087 subbuffer = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL,
1088 gst_live_adder_length_from_duration (adder, skip),
1089 gst_live_adder_length_from_duration (adder, subbuffer_duration));
1091 GST_BUFFER_TIMESTAMP (subbuffer) = GST_BUFFER_TIMESTAMP (buffer) + skip;
1092 GST_BUFFER_DURATION (subbuffer) = subbuffer_duration;
1094 skip += subbuffer_duration;
1096 g_queue_insert_before (adder->buffers, item, subbuffer);
1099 /* Now we are on the overlapping part */
1100 oldbuffer = gst_buffer_make_writable (oldbuffer);
1101 item->data = oldbuffer;
1103 old_skip = GST_BUFFER_TIMESTAMP (buffer) + skip -
1104 GST_BUFFER_TIMESTAMP (oldbuffer);
1106 mix_start = GST_BUFFER_TIMESTAMP (oldbuffer) + old_skip;
1108 if (GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer) <
1109 GST_BUFFER_TIMESTAMP (oldbuffer) + GST_BUFFER_DURATION (oldbuffer))
1110 mix_end = GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer);
1112 mix_end = GST_BUFFER_TIMESTAMP (oldbuffer) +
1113 GST_BUFFER_DURATION (oldbuffer);
1115 mix_duration = mix_end - mix_start;
1117 gst_buffer_map (oldbuffer, &oldmap, GST_MAP_WRITE);
1118 gst_buffer_map (buffer, &map, GST_MAP_READ);
1119 adder->func (oldmap.data +
1120 gst_live_adder_length_from_duration (adder, old_skip),
1122 gst_live_adder_length_from_duration (adder, skip),
1123 gst_live_adder_length_from_duration (adder, mix_duration));
1124 gst_buffer_unmap (oldbuffer, &oldmap);
1125 gst_buffer_unmap (buffer, &map);
1126 skip += mix_duration;
1129 g_cond_broadcast (&adder->not_empty_cond);
1131 if (skip == GST_BUFFER_DURATION (buffer)) {
1132 gst_buffer_unref (buffer);
1135 GstClockTime subbuffer_duration = GST_BUFFER_DURATION (buffer) - skip;
1136 GstClockTime subbuffer_ts = GST_BUFFER_TIMESTAMP (buffer) + skip;
1137 GstBuffer *new_buffer = gst_buffer_copy_region (buffer,
1138 GST_BUFFER_COPY_ALL,
1139 gst_live_adder_length_from_duration (adder, skip),
1140 gst_live_adder_length_from_duration (adder, subbuffer_duration));
1141 gst_buffer_unref (buffer);
1142 buffer = new_buffer;
1143 GST_BUFFER_PTS (buffer) = subbuffer_ts;
1144 GST_BUFFER_DURATION (buffer) = subbuffer_duration;
1148 g_queue_insert_before (adder->buffers, item, buffer);
1150 g_queue_push_tail (adder->buffers, buffer);
1155 GST_OBJECT_UNLOCK (adder);
1161 GST_OBJECT_UNLOCK (adder);
1162 gst_buffer_unref (buffer);
1163 GST_ELEMENT_ERROR (adder, STREAM, FAILED,
1164 ("Buffer without a valid timestamp received"),
1165 ("Invalid timestamp received on buffer"));
1167 return GST_FLOW_ERROR;
1171 * This only works because the GstObject lock is taken
1173 * It checks if all sink pads are EOS
1176 check_eos_locked (GstLiveAdder * adder)
1180 /* We can't be EOS if we have no sinkpads */
1181 if (adder->sinkpads == NULL)
1184 for (item = adder->sinkpads; item; item = g_list_next (item)) {
1185 GstPad *pad = item->data;
1186 GstLiveAdderPadPrivate *padprivate = gst_pad_get_element_private (pad);
1188 if (padprivate && padprivate->eos != TRUE)
1196 gst_live_adder_loop (gpointer data)
1198 GstLiveAdder *adder = GST_LIVE_ADDER (data);
1199 GstClockTime buffer_timestamp = 0;
1200 GstClockTime sync_time = 0;
1201 GstClock *clock = NULL;
1202 GstClockID id = NULL;
1204 GstBuffer *buffer = NULL;
1205 GstFlowReturn result;
1206 GstEvent *newseg_event = NULL;
1208 GST_OBJECT_LOCK (adder);
1213 if (adder->srcresult != GST_FLOW_OK)
1215 if (!g_queue_is_empty (adder->buffers))
1217 if (check_eos_locked (adder))
1219 g_cond_wait (&adder->not_empty_cond, GST_OBJECT_GET_LOCK (adder));
1222 buffer_timestamp = GST_BUFFER_TIMESTAMP (g_queue_peek_head (adder->buffers));
1224 clock = GST_ELEMENT_CLOCK (adder);
1226 /* If we have no clock, then we can't do anything.. error */
1234 GST_DEBUG_OBJECT (adder, "sync to timestamp %" GST_TIME_FORMAT,
1235 GST_TIME_ARGS (buffer_timestamp));
1237 sync_time = buffer_timestamp + GST_ELEMENT_CAST (adder)->base_time;
1238 /* add latency, this includes our own latency and the peer latency. */
1239 sync_time += adder->latency_ms * GST_MSECOND;
1240 sync_time += adder->peer_latency;
1242 /* create an entry for the clock */
1243 id = adder->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1244 GST_OBJECT_UNLOCK (adder);
1246 ret = gst_clock_id_wait (id, NULL);
1248 GST_OBJECT_LOCK (adder);
1250 /* and free the entry */
1251 gst_clock_id_unref (id);
1252 adder->clock_id = NULL;
1254 /* at this point, the clock could have been unlocked by a timeout, a new
1255 * head element was added to the queue or because we are shutting down. Check
1256 * for shutdown first. */
1258 if (adder->srcresult != GST_FLOW_OK)
1261 if (ret == GST_CLOCK_UNSCHEDULED) {
1262 GST_DEBUG_OBJECT (adder,
1263 "Wait got unscheduled, will retry to push with new buffer");
1267 if (ret != GST_CLOCK_OK && ret != GST_CLOCK_EARLY)
1272 buffer = g_queue_pop_head (adder->buffers);
1278 * We make sure the timestamps are exactly contiguous
1279 * If its only small skew (due to rounding errors), we correct it
1280 * silently. Otherwise we put the discont flag
1282 if (GST_CLOCK_TIME_IS_VALID (adder->next_timestamp) &&
1283 GST_BUFFER_TIMESTAMP (buffer) != adder->next_timestamp) {
1284 GstClockTimeDiff diff = GST_CLOCK_DIFF (GST_BUFFER_TIMESTAMP (buffer),
1285 adder->next_timestamp);
1289 if (diff < GST_SECOND / GST_AUDIO_INFO_RATE (&adder->info)) {
1290 GST_BUFFER_TIMESTAMP (buffer) = adder->next_timestamp;
1291 GST_DEBUG_OBJECT (adder, "Correcting slight skew");
1292 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
1294 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1295 GST_DEBUG_OBJECT (adder, "Expected buffer at %" GST_TIME_FORMAT
1296 ", but is at %" GST_TIME_FORMAT ", setting discont",
1297 GST_TIME_ARGS (adder->next_timestamp),
1298 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
1301 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
1304 GST_BUFFER_OFFSET (buffer) = GST_BUFFER_OFFSET_NONE;
1305 GST_BUFFER_OFFSET_END (buffer) = GST_BUFFER_OFFSET_NONE;
1307 if (GST_BUFFER_DURATION_IS_VALID (buffer))
1308 adder->next_timestamp = GST_BUFFER_TIMESTAMP (buffer) +
1309 GST_BUFFER_DURATION (buffer);
1311 adder->next_timestamp = GST_CLOCK_TIME_NONE;
1312 GST_OBJECT_UNLOCK (adder);
1315 gst_pad_push_event (adder->srcpad, newseg_event);
1317 GST_LOG_OBJECT (adder, "About to push buffer time:%" GST_TIME_FORMAT
1318 " duration:%" GST_TIME_FORMAT,
1319 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
1320 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
1322 result = gst_pad_push (adder->srcpad, buffer);
1323 if (result != GST_FLOW_OK)
1330 GST_DEBUG_OBJECT (adder, "we are flushing");
1331 gst_pad_pause_task (adder->srcpad);
1332 GST_OBJECT_UNLOCK (adder);
1338 gst_pad_pause_task (adder->srcpad);
1339 GST_OBJECT_UNLOCK (adder);
1340 GST_ELEMENT_ERROR (adder, STREAM, MUX, ("Error with the clock"),
1341 ("Error with the clock: %d", ret));
1342 GST_ERROR_OBJECT (adder, "Error with the clock: %d", ret);
1348 gst_pad_pause_task (adder->srcpad);
1349 GST_OBJECT_UNLOCK (adder);
1350 GST_ELEMENT_ERROR (adder, STREAM, MUX, ("No available clock"),
1351 ("No available clock"));
1352 GST_ERROR_OBJECT (adder, "No available clock");
1358 GST_DEBUG_OBJECT (adder, "pausing task, reason %s",
1359 gst_flow_get_name (result));
1361 GST_OBJECT_LOCK (adder);
1364 adder->srcresult = result;
1365 /* we don't post errors or anything because upstream will do that for us
1366 * when we pass the return value upstream. */
1367 gst_pad_pause_task (adder->srcpad);
1368 GST_OBJECT_UNLOCK (adder);
1374 /* store result, we are flushing now */
1375 GST_DEBUG_OBJECT (adder, "We are EOS, pushing EOS downstream");
1376 adder->srcresult = GST_FLOW_EOS;
1377 gst_pad_pause_task (adder->srcpad);
1378 GST_OBJECT_UNLOCK (adder);
1379 gst_pad_push_event (adder->srcpad, gst_event_new_eos ());
1385 gst_live_adder_request_new_pad (GstElement * element, GstPadTemplate * templ,
1386 const gchar * ignored_name, const GstCaps * caps)
1389 GstLiveAdder *adder;
1392 GstLiveAdderPadPrivate *padprivate = NULL;
1394 if (templ->direction != GST_PAD_SINK)
1397 adder = GST_LIVE_ADDER (element);
1399 /* increment pad counter */
1400 #if GLIB_CHECK_VERSION(2,29,5)
1401 padcount = g_atomic_int_add (&adder->padcount, 1);
1403 padcount = g_atomic_int_exchange_and_add (&adder->padcount, 1);
1406 name = g_strdup_printf ("sink_%u", padcount);
1407 newpad = gst_pad_new_from_template (templ, name);
1408 GST_DEBUG_OBJECT (adder, "request new pad %s", name);
1411 gst_pad_set_event_function (newpad,
1412 GST_DEBUG_FUNCPTR (gst_live_adder_sink_event));
1413 gst_pad_set_query_function (newpad,
1414 GST_DEBUG_FUNCPTR (gst_live_adder_sink_query));
1416 padprivate = g_new0 (GstLiveAdderPadPrivate, 1);
1418 gst_segment_init (&padprivate->segment, GST_FORMAT_UNDEFINED);
1419 padprivate->eos = FALSE;
1420 padprivate->expected_timestamp = GST_CLOCK_TIME_NONE;
1422 gst_pad_set_element_private (newpad, padprivate);
1424 gst_pad_set_chain_function (newpad, gst_live_live_adder_chain);
1427 if (!gst_pad_set_active (newpad, TRUE))
1428 goto could_not_activate;
1430 /* takes ownership of the pad */
1431 if (!gst_element_add_pad (GST_ELEMENT (adder), newpad))
1434 GST_OBJECT_LOCK (adder);
1435 adder->sinkpads = g_list_prepend (adder->sinkpads, newpad);
1436 GST_OBJECT_UNLOCK (adder);
1443 g_warning ("gstadder: request new pad that is not a SINK pad\n");
1448 GST_DEBUG_OBJECT (adder, "could not add pad");
1449 g_free (padprivate);
1450 gst_object_unref (newpad);
1455 GST_DEBUG_OBJECT (adder, "could not activate new pad");
1456 g_free (padprivate);
1457 gst_object_unref (newpad);
1463 gst_live_adder_release_pad (GstElement * element, GstPad * pad)
1465 GstLiveAdder *adder;
1466 GstLiveAdderPadPrivate *padprivate;
1468 adder = GST_LIVE_ADDER (element);
1470 GST_DEBUG_OBJECT (adder, "release pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1472 GST_OBJECT_LOCK (element);
1473 padprivate = gst_pad_get_element_private (pad);
1474 gst_pad_set_element_private (pad, NULL);
1475 adder->sinkpads = g_list_remove_all (adder->sinkpads, pad);
1476 GST_OBJECT_UNLOCK (element);
1478 g_free (padprivate);
1480 gst_element_remove_pad (element, pad);
1484 reset_pad_private (GstPad * pad)
1486 GstLiveAdderPadPrivate *padprivate;
1488 padprivate = gst_pad_get_element_private (pad);
1493 gst_segment_init (&padprivate->segment, GST_FORMAT_UNDEFINED);
1495 padprivate->expected_timestamp = GST_CLOCK_TIME_NONE;
1496 padprivate->eos = FALSE;
1499 static GstStateChangeReturn
1500 gst_live_adder_change_state (GstElement * element, GstStateChange transition)
1502 GstLiveAdder *adder;
1503 GstStateChangeReturn ret;
1505 adder = GST_LIVE_ADDER (element);
1507 switch (transition) {
1508 case GST_STATE_CHANGE_READY_TO_PAUSED:
1509 GST_OBJECT_LOCK (adder);
1510 adder->segment_pending = TRUE;
1511 adder->peer_latency = 0;
1512 adder->next_timestamp = GST_CLOCK_TIME_NONE;
1513 g_list_foreach (adder->sinkpads, (GFunc) reset_pad_private, NULL);
1514 GST_OBJECT_UNLOCK (adder);
1516 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1517 GST_OBJECT_LOCK (adder);
1518 adder->playing = FALSE;
1519 GST_OBJECT_UNLOCK (adder);
1525 ret = GST_ELEMENT_CLASS (gst_live_adder_parent_class)->change_state (element,
1528 switch (transition) {
1529 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1530 GST_OBJECT_LOCK (adder);
1531 adder->playing = TRUE;
1532 GST_OBJECT_UNLOCK (adder);
1543 plugin_init (GstPlugin * plugin)
1545 if (!gst_element_register (plugin, "liveadder", GST_RANK_NONE,
1546 GST_TYPE_LIVE_ADDER)) {
1553 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
1556 "Adds multiple live discontinuous streams",
1557 plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)