1 /* GStreamer concat element
3 * Copyright (c) 2014 Sebastian Dröge <sebastian@centricular.com>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * SECTION:element-concat
23 * @see_also: #GstFunnel
25 * Concatenates streams together to one continous stream.
27 * All streams but the current one are blocked until the current one
28 * finished with %GST_EVENT_EOS. Then the next stream is enabled, while
29 * keeping the running time continous for %GST_FORMAT_TIME segments or
30 * keeping the segment continous for %GST_FORMAT_BYTES segments.
32 * Streams are switched in the order in which the sinkpads were requested.
35 * <title>Example launch line</title>
37 * gst-launch-1.0 concat name=c ! xvimagesink videotestsrc num-buffers=100 ! c. videotestsrc num-buffers=100 pattern=ball ! c.
38 * ]| Plays two video streams one after another.
46 #include "gstconcat.h"
48 GST_DEBUG_CATEGORY_STATIC (gst_concat_debug);
49 #define GST_CAT_DEFAULT gst_concat_debug
51 G_GNUC_INTERNAL GType gst_concat_pad_get_type (void);
53 #define GST_TYPE_CONCAT_PAD (gst_concat_pad_get_type())
54 #define GST_CONCAT_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_CONCAT_PAD, GstConcatPad))
55 #define GST_CONCAT_PAD_CAST(obj) ((GstConcatPad *)(obj))
56 #define GST_CONCAT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_CONCAT_PAD, GstConcatPadClass))
57 #define GST_IS_CONCAT_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_CONCAT_PAD))
58 #define GST_IS_CONCAT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_CONCAT_PAD))
60 typedef struct _GstConcatPad GstConcatPad;
61 typedef struct _GstConcatPadClass GstConcatPadClass;
69 /* Protected by the concat lock */
73 struct _GstConcatPadClass
78 G_DEFINE_TYPE (GstConcatPad, gst_concat_pad, GST_TYPE_PAD);
81 gst_concat_pad_class_init (GstConcatPadClass * klass)
86 gst_concat_pad_init (GstConcatPad * self)
88 gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
89 self->flushing = FALSE;
92 static GstStaticPadTemplate concat_sink_template =
93 GST_STATIC_PAD_TEMPLATE ("sink_%u",
98 static GstStaticPadTemplate concat_src_template =
99 GST_STATIC_PAD_TEMPLATE ("src",
102 GST_STATIC_CAPS_ANY);
105 GST_DEBUG_CATEGORY_INIT (gst_concat_debug, "concat", 0, "concat element");
106 #define gst_concat_parent_class parent_class
107 G_DEFINE_TYPE_WITH_CODE (GstConcat, gst_concat, GST_TYPE_ELEMENT, _do_init);
109 static void gst_concat_dispose (GObject * object);
110 static void gst_concat_finalize (GObject * object);
112 static GstStateChangeReturn gst_concat_change_state (GstElement * element,
113 GstStateChange transition);
114 static GstPad *gst_concat_request_new_pad (GstElement * element,
115 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
116 static void gst_concat_release_pad (GstElement * element, GstPad * pad);
118 static GstFlowReturn gst_concat_sink_chain (GstPad * pad, GstObject * parent,
120 static gboolean gst_concat_sink_event (GstPad * pad, GstObject * parent,
122 static gboolean gst_concat_sink_query (GstPad * pad, GstObject * parent,
125 static gboolean gst_concat_src_event (GstPad * pad, GstObject * parent,
127 static gboolean gst_concat_src_query (GstPad * pad, GstObject * parent,
130 static gboolean gst_concat_switch_pad (GstConcat * self);
133 gst_concat_class_init (GstConcatClass * klass)
135 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
136 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
138 gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_concat_dispose);
139 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_concat_finalize);
141 gst_element_class_set_static_metadata (gstelement_class,
142 "Concat", "Generic", "Concatenate multiple streams",
143 "Sebastian Dröge <sebastian@centricular.com>");
145 gst_element_class_add_pad_template (gstelement_class,
146 gst_static_pad_template_get (&concat_sink_template));
147 gst_element_class_add_pad_template (gstelement_class,
148 gst_static_pad_template_get (&concat_src_template));
150 gstelement_class->request_new_pad =
151 GST_DEBUG_FUNCPTR (gst_concat_request_new_pad);
152 gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_concat_release_pad);
153 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_concat_change_state);
157 gst_concat_init (GstConcat * self)
159 g_mutex_init (&self->lock);
160 g_cond_init (&self->cond);
162 self->srcpad = gst_pad_new_from_static_template (&concat_src_template, "src");
163 gst_pad_set_event_function (self->srcpad,
164 GST_DEBUG_FUNCPTR (gst_concat_src_event));
165 gst_pad_set_query_function (self->srcpad,
166 GST_DEBUG_FUNCPTR (gst_concat_src_query));
167 gst_pad_use_fixed_caps (self->srcpad);
169 gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
173 gst_concat_dispose (GObject * object)
175 GstConcat *self = GST_CONCAT (object);
178 gst_object_replace ((GstObject **) & self->current_sinkpad, NULL);
181 for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
182 GstPad *pad = GST_PAD (item->data);
184 if (GST_PAD_IS_SINK (pad)) {
185 gst_element_release_request_pad (GST_ELEMENT (object), pad);
190 G_OBJECT_CLASS (parent_class)->dispose (object);
194 gst_concat_finalize (GObject * object)
196 GstConcat *self = GST_CONCAT (object);
198 g_mutex_clear (&self->lock);
199 g_cond_clear (&self->cond);
201 G_OBJECT_CLASS (parent_class)->finalize (object);
205 gst_concat_request_new_pad (GstElement * element, GstPadTemplate * templ,
206 const gchar * name, const GstCaps * caps)
208 GstConcat *self = GST_CONCAT (element);
212 GST_DEBUG_OBJECT (element, "requesting pad");
214 g_mutex_lock (&self->lock);
215 pad_name = g_strdup_printf ("sink_%u", self->pad_count);
217 g_mutex_unlock (&self->lock);
219 sinkpad = GST_PAD_CAST (g_object_new (GST_TYPE_CONCAT_PAD,
220 "name", pad_name, "direction", templ->direction, "template", templ,
224 gst_pad_set_chain_function (sinkpad,
225 GST_DEBUG_FUNCPTR (gst_concat_sink_chain));
226 gst_pad_set_event_function (sinkpad,
227 GST_DEBUG_FUNCPTR (gst_concat_sink_event));
228 gst_pad_set_query_function (sinkpad,
229 GST_DEBUG_FUNCPTR (gst_concat_sink_query));
230 GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS);
231 GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_ALLOCATION);
233 gst_pad_set_active (sinkpad, TRUE);
235 g_mutex_lock (&self->lock);
236 self->sinkpads = g_list_prepend (self->sinkpads, gst_object_ref (sinkpad));
237 if (!self->current_sinkpad)
238 self->current_sinkpad = gst_object_ref (sinkpad);
239 g_mutex_unlock (&self->lock);
241 gst_element_add_pad (element, sinkpad);
247 gst_concat_release_pad (GstElement * element, GstPad * pad)
249 GstConcat *self = GST_CONCAT (element);
250 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
252 gboolean current_pad_removed = FALSE;
253 gboolean eos = FALSE;
255 GST_DEBUG_OBJECT (self, "releasing pad");
257 g_mutex_lock (&self->lock);
258 spad->flushing = TRUE;
259 g_cond_broadcast (&self->cond);
260 g_mutex_unlock (&self->lock);
262 gst_pad_set_active (pad, FALSE);
264 /* Now the pad is definitely not running anymore */
266 g_mutex_lock (&self->lock);
267 if (self->current_sinkpad == GST_PAD_CAST (spad)) {
268 eos = ! !gst_concat_switch_pad (self);
269 current_pad_removed = TRUE;
272 for (l = self->sinkpads; l; l = l->next) {
273 if ((gpointer) spad == l->data) {
274 gst_object_unref (spad);
275 self->sinkpads = g_list_delete_link (self->sinkpads, l);
279 g_mutex_unlock (&self->lock);
281 gst_element_remove_pad (GST_ELEMENT_CAST (self), pad);
283 if (GST_STATE (self) > GST_STATE_READY) {
284 if (current_pad_removed && !eos)
285 gst_element_post_message (GST_ELEMENT_CAST (self),
286 gst_message_new_duration_changed (GST_OBJECT_CAST (self)));
288 /* FIXME: Sending EOS from application thread */
290 gst_pad_push_event (self->srcpad, gst_event_new_eos ());
294 /* Returns FALSE if flushing
295 * Must be called from the pad's streaming thread
298 gst_concat_pad_wait (GstConcatPad * spad, GstConcat * self)
300 g_mutex_lock (&self->lock);
301 if (spad->flushing) {
302 g_mutex_unlock (&self->lock);
303 GST_DEBUG_OBJECT (spad, "Flushing");
307 while (spad != GST_CONCAT_PAD_CAST (self->current_sinkpad)) {
308 GST_TRACE_OBJECT (spad, "Not the current sinkpad - waiting");
309 g_cond_wait (&self->cond, &self->lock);
310 if (spad->flushing) {
311 g_mutex_unlock (&self->lock);
312 GST_DEBUG_OBJECT (spad, "Flushing");
316 /* This pad can only become not the current sinkpad from
317 * a) This streaming thread (we hold the stream lock)
318 * b) Releasing the pad (takes the stream lock, see above)
320 * Unlocking here is thus safe and we can safely push
321 * serialized data to our srcpad
323 GST_DEBUG_OBJECT (spad, "Now the current sinkpad");
324 g_mutex_unlock (&self->lock);
330 gst_concat_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
333 GstConcat *self = GST_CONCAT (parent);
334 GstConcatPad *spad = GST_CONCAT_PAD (pad);
336 GST_LOG_OBJECT (pad, "received buffer %p", buffer);
338 if (!gst_concat_pad_wait (spad, self))
339 return GST_FLOW_FLUSHING;
341 if (self->last_stop == GST_CLOCK_TIME_NONE)
342 self->last_stop = spad->segment.start;
344 if (self->format == GST_FORMAT_TIME) {
345 GstClockTime start_time = GST_BUFFER_TIMESTAMP (buffer);
346 GstClockTime end_time = GST_CLOCK_TIME_NONE;
348 if (start_time != GST_CLOCK_TIME_NONE)
349 end_time = start_time;
350 if (GST_BUFFER_DURATION_IS_VALID (buffer))
351 end_time += GST_BUFFER_DURATION (buffer);
353 if (end_time != GST_CLOCK_TIME_NONE && end_time > self->last_stop)
354 self->last_stop = end_time;
356 self->last_stop += gst_buffer_get_size (buffer);
359 ret = gst_pad_push (self->srcpad, buffer);
361 GST_LOG_OBJECT (pad, "handled buffer %s", gst_flow_get_name (ret));
366 /* Returns FALSE if no further pad, must be called with concat lock */
368 gst_concat_switch_pad (GstConcat * self)
375 segment = GST_CONCAT_PAD (self->current_sinkpad)->segment;
377 last_stop = self->last_stop;
378 if (last_stop == GST_CLOCK_TIME_NONE)
379 last_stop = segment.stop;
380 if (last_stop == GST_CLOCK_TIME_NONE)
381 last_stop = segment.start;
382 g_assert (last_stop != GST_CLOCK_TIME_NONE);
384 if (last_stop > segment.stop)
385 last_stop = segment.stop;
387 if (segment.format == GST_FORMAT_TIME)
389 gst_segment_to_running_time (&segment, segment.format, last_stop);
391 last_stop += segment.start;
393 self->current_start_offset += last_stop;
395 for (l = self->sinkpads; l; l = l->next) {
396 if ((gpointer) self->current_sinkpad == l->data) {
398 GST_DEBUG_OBJECT (self,
399 "Switching from pad %" GST_PTR_FORMAT " to %" GST_PTR_FORMAT,
400 self->current_sinkpad, l ? l->data : NULL);
401 gst_object_unref (self->current_sinkpad);
402 self->current_sinkpad = l ? gst_object_ref (l->data) : NULL;
403 g_cond_broadcast (&self->cond);
408 next = self->current_sinkpad != NULL;
410 self->last_stop = GST_CLOCK_TIME_NONE;
416 gst_concat_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
418 GstConcat *self = GST_CONCAT (parent);
419 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
422 GST_LOG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event);
424 switch (GST_EVENT_TYPE (event)) {
425 case GST_EVENT_STREAM_START:{
426 if (!gst_concat_pad_wait (spad, self)) {
428 gst_event_unref (event);
430 ret = gst_pad_event_default (pad, parent, event);
434 case GST_EVENT_SEGMENT:{
435 /* Drop segment event, we create our own one */
436 gst_event_copy_segment (event, &spad->segment);
437 gst_event_unref (event);
439 g_mutex_lock (&self->lock);
440 if (self->format == GST_FORMAT_UNDEFINED) {
441 if (spad->segment.format != GST_FORMAT_TIME
442 && spad->segment.format != GST_FORMAT_BYTES) {
443 g_mutex_unlock (&self->lock);
444 GST_ELEMENT_ERROR (self, CORE, FAILED, (NULL),
445 ("Can only operate in TIME or BYTES format"));
449 self->format = spad->segment.format;
450 GST_DEBUG_OBJECT (self, "Operating in %s format",
451 gst_format_get_name (self->format));
452 g_mutex_unlock (&self->lock);
453 } else if (self->format != spad->segment.format) {
454 g_mutex_unlock (&self->lock);
455 GST_ELEMENT_ERROR (self, CORE, FAILED, (NULL),
456 ("Operating in %s format but new pad has %s",
457 gst_format_get_name (self->format),
458 gst_format_get_name (spad->segment.format)));
461 g_mutex_unlock (&self->lock);
464 if (!gst_concat_pad_wait (spad, self)) {
467 GstSegment segment = spad->segment;
469 /* We know no duration */
470 segment.duration = -1;
472 /* Update segment values to be continous with last stream */
473 if (self->format == GST_FORMAT_TIME) {
474 segment.base += self->current_start_offset;
476 /* Shift start/stop byte position */
477 segment.start += self->current_start_offset;
478 if (segment.stop != -1)
479 segment.stop += self->current_start_offset;
481 gst_pad_push_event (self->srcpad, gst_event_new_segment (&segment));
486 gst_event_unref (event);
488 if (!gst_concat_pad_wait (spad, self)) {
493 g_mutex_lock (&self->lock);
494 next = gst_concat_switch_pad (self);
495 g_mutex_unlock (&self->lock);
499 gst_pad_push_event (self->srcpad, gst_event_new_eos ());
501 gst_element_post_message (GST_ELEMENT_CAST (self),
502 gst_message_new_duration_changed (GST_OBJECT_CAST (self)));
507 case GST_EVENT_FLUSH_START:{
510 g_mutex_lock (&self->lock);
511 spad->flushing = TRUE;
512 g_cond_broadcast (&self->cond);
513 forward = (self->current_sinkpad == GST_PAD_CAST (spad));
514 g_mutex_unlock (&self->lock);
517 ret = gst_pad_event_default (pad, parent, event);
519 gst_event_unref (event);
522 case GST_EVENT_FLUSH_STOP:{
525 gst_segment_init (&spad->segment, GST_FORMAT_UNDEFINED);
526 spad->flushing = FALSE;
528 g_mutex_lock (&self->lock);
529 forward = (self->current_sinkpad == GST_PAD_CAST (spad));
530 g_mutex_unlock (&self->lock);
535 gst_event_parse_flush_stop (event, &reset_time);
537 GST_DEBUG_OBJECT (self,
538 "resetting start offset to 0 after flushing with reset_time = TRUE");
539 self->current_start_offset = 0;
541 ret = gst_pad_event_default (pad, parent, event);
543 gst_event_unref (event);
548 /* Wait for other serialized events before forwarding */
549 if (GST_EVENT_IS_SERIALIZED (event) && !gst_concat_pad_wait (spad, self)) {
550 gst_event_unref (event);
553 ret = gst_pad_event_default (pad, parent, event);
563 gst_concat_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
565 GstConcat *self = GST_CONCAT (parent);
566 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
569 GST_LOG_OBJECT (pad, "received query %" GST_PTR_FORMAT, query);
571 switch (GST_QUERY_TYPE (query)) {
573 /* Wait for other serialized queries before forwarding */
574 if (GST_QUERY_IS_SERIALIZED (query) && !gst_concat_pad_wait (spad, self)) {
577 ret = gst_pad_query_default (pad, parent, query);
586 gst_concat_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
588 GstConcat *self = GST_CONCAT (parent);
591 GST_LOG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event);
593 switch (GST_EVENT_TYPE (event)) {
594 case GST_EVENT_SEEK:{
595 GstPad *sinkpad = NULL;
597 g_mutex_lock (&self->lock);
598 if ((sinkpad = self->current_sinkpad))
599 gst_object_ref (sinkpad);
600 g_mutex_unlock (&self->lock);
602 ret = gst_pad_push_event (sinkpad, event);
603 gst_object_unref (sinkpad);
605 gst_event_unref (event);
612 GstClockTimeDiff diff;
613 GstClockTime timestamp;
616 gst_event_parse_qos (event, &type, &proportion, &diff, ×tamp);
617 gst_event_unref (event);
619 if (timestamp != GST_CLOCK_TIME_NONE
620 && timestamp > self->current_start_offset) {
621 timestamp -= self->current_start_offset;
622 event = gst_event_new_qos (type, proportion, diff, timestamp);
623 ret = gst_pad_push_event (self->current_sinkpad, event);
629 case GST_EVENT_FLUSH_STOP:{
632 gst_event_parse_flush_stop (event, &reset_time);
634 GST_DEBUG_OBJECT (self,
635 "resetting start offset to 0 after flushing with reset_time = TRUE");
636 self->current_start_offset = 0;
639 ret = gst_pad_event_default (pad, parent, event);
643 ret = gst_pad_event_default (pad, parent, event);
651 gst_concat_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
655 GST_LOG_OBJECT (pad, "received query %" GST_PTR_FORMAT, query);
657 switch (GST_QUERY_TYPE (query)) {
659 ret = gst_pad_query_default (pad, parent, query);
667 reset_pad (const GValue * data, gpointer user_data)
669 GstPad *pad = g_value_get_object (data);
670 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
672 gst_segment_init (&spad->segment, GST_FORMAT_UNDEFINED);
673 spad->flushing = FALSE;
677 unblock_pad (const GValue * data, gpointer user_data)
679 GstPad *pad = g_value_get_object (data);
680 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
682 spad->flushing = TRUE;
685 static GstStateChangeReturn
686 gst_concat_change_state (GstElement * element, GstStateChange transition)
688 GstConcat *self = GST_CONCAT (element);
689 GstStateChangeReturn ret;
691 switch (transition) {
692 case GST_STATE_CHANGE_READY_TO_PAUSED:{
693 self->format = GST_FORMAT_UNDEFINED;
694 self->current_start_offset = 0;
695 self->last_stop = GST_CLOCK_TIME_NONE;
698 case GST_STATE_CHANGE_PAUSED_TO_READY:{
699 GstIterator *iter = gst_element_iterate_sink_pads (element);
700 GstIteratorResult res;
702 g_mutex_lock (&self->lock);
704 res = gst_iterator_foreach (iter, unblock_pad, NULL);
705 } while (res == GST_ITERATOR_RESYNC);
707 gst_iterator_free (iter);
708 g_cond_broadcast (&self->cond);
709 g_mutex_unlock (&self->lock);
711 if (res == GST_ITERATOR_ERROR)
712 return GST_STATE_CHANGE_FAILURE;
720 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
721 if (ret == GST_STATE_CHANGE_FAILURE)
724 switch (transition) {
725 case GST_STATE_CHANGE_PAUSED_TO_READY:{
726 GstIterator *iter = gst_element_iterate_sink_pads (element);
727 GstIteratorResult res;
730 res = gst_iterator_foreach (iter, reset_pad, NULL);
731 } while (res == GST_ITERATOR_RESYNC);
733 gst_iterator_free (iter);
735 if (res == GST_ITERATOR_ERROR)
736 return GST_STATE_CHANGE_FAILURE;