1 /* GStreamer concat element
3 * Copyright (c) 2014 Sebastian Dröge <sebastian@centricular.com>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * SECTION:element-concat
23 * @see_also: #GstFunnel
25 * Concatenates streams together to one continous stream.
27 * All streams but the current one are blocked until the current one
28 * finished with %GST_EVENT_EOS. Then the next stream is enabled, while
29 * keeping the running time continous for %GST_FORMAT_TIME segments or
30 * keeping the segment continous for %GST_FORMAT_BYTES segments.
32 * Streams are switched in the order in which the sinkpads were requested.
35 * <title>Example launch line</title>
37 * gst-launch-1.0 concat name=c ! xvimagesink videotestsrc num-buffers=100 ! c. videotestsrc num-buffers=100 pattern=ball ! c.
38 * ]| Plays two video streams one after another.
46 #include "gstconcat.h"
48 GST_DEBUG_CATEGORY_STATIC (gst_concat_debug);
49 #define GST_CAT_DEFAULT gst_concat_debug
51 G_GNUC_INTERNAL GType gst_concat_pad_get_type (void);
53 #define GST_TYPE_CONCAT_PAD (gst_concat_pad_get_type())
54 #define GST_CONCAT_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_CONCAT_PAD, GstConcatPad))
55 #define GST_CONCAT_PAD_CAST(obj) ((GstConcatPad *)(obj))
56 #define GST_CONCAT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_CONCAT_PAD, GstConcatPadClass))
57 #define GST_IS_CONCAT_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_CONCAT_PAD))
58 #define GST_IS_CONCAT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_CONCAT_PAD))
60 typedef struct _GstConcatPad GstConcatPad;
61 typedef struct _GstConcatPadClass GstConcatPadClass;
69 /* Protected by the concat lock */
73 struct _GstConcatPadClass
78 G_DEFINE_TYPE (GstConcatPad, gst_concat_pad, GST_TYPE_PAD);
81 gst_concat_pad_class_init (GstConcatPadClass * klass)
86 gst_concat_pad_init (GstConcatPad * self)
88 gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
89 self->flushing = FALSE;
92 static GstStaticPadTemplate concat_sink_template =
93 GST_STATIC_PAD_TEMPLATE ("sink_%u",
98 static GstStaticPadTemplate concat_src_template =
99 GST_STATIC_PAD_TEMPLATE ("src",
102 GST_STATIC_CAPS_ANY);
105 GST_DEBUG_CATEGORY_INIT (gst_concat_debug, "concat", 0, "concat element");
106 #define gst_concat_parent_class parent_class
107 G_DEFINE_TYPE_WITH_CODE (GstConcat, gst_concat, GST_TYPE_ELEMENT, _do_init);
109 static void gst_concat_dispose (GObject * object);
110 static void gst_concat_finalize (GObject * object);
112 static GstStateChangeReturn gst_concat_change_state (GstElement * element,
113 GstStateChange transition);
114 static GstPad *gst_concat_request_new_pad (GstElement * element,
115 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
116 static void gst_concat_release_pad (GstElement * element, GstPad * pad);
118 static GstFlowReturn gst_concat_sink_chain (GstPad * pad, GstObject * parent,
120 static gboolean gst_concat_sink_event (GstPad * pad, GstObject * parent,
122 static gboolean gst_concat_sink_query (GstPad * pad, GstObject * parent,
125 static gboolean gst_concat_src_event (GstPad * pad, GstObject * parent,
127 static gboolean gst_concat_src_query (GstPad * pad, GstObject * parent,
130 static gboolean gst_concat_switch_pad (GstConcat * self);
133 gst_concat_class_init (GstConcatClass * klass)
135 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
136 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
138 gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_concat_dispose);
139 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_concat_finalize);
141 gst_element_class_set_static_metadata (gstelement_class,
142 "Concat", "Generic", "Concatenate multiple streams",
143 "Sebastian Dröge <sebastian@centricular.com>");
145 gst_element_class_add_pad_template (gstelement_class,
146 gst_static_pad_template_get (&concat_sink_template));
147 gst_element_class_add_pad_template (gstelement_class,
148 gst_static_pad_template_get (&concat_src_template));
150 gstelement_class->request_new_pad =
151 GST_DEBUG_FUNCPTR (gst_concat_request_new_pad);
152 gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_concat_release_pad);
153 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_concat_change_state);
157 gst_concat_init (GstConcat * self)
159 g_mutex_init (&self->lock);
160 g_cond_init (&self->cond);
162 self->srcpad = gst_pad_new_from_static_template (&concat_src_template, "src");
163 gst_pad_set_event_function (self->srcpad,
164 GST_DEBUG_FUNCPTR (gst_concat_src_event));
165 gst_pad_set_query_function (self->srcpad,
166 GST_DEBUG_FUNCPTR (gst_concat_src_query));
167 gst_pad_use_fixed_caps (self->srcpad);
169 gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
173 gst_concat_dispose (GObject * object)
175 GstConcat *self = GST_CONCAT (object);
178 gst_object_replace ((GstObject **) & self->current_sinkpad, NULL);
181 for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
182 GstPad *pad = GST_PAD (item->data);
184 if (GST_PAD_IS_SINK (pad)) {
185 gst_element_release_request_pad (GST_ELEMENT (object), pad);
190 G_OBJECT_CLASS (parent_class)->dispose (object);
194 gst_concat_finalize (GObject * object)
196 GstConcat *self = GST_CONCAT (object);
198 g_mutex_clear (&self->lock);
199 g_cond_clear (&self->cond);
201 G_OBJECT_CLASS (parent_class)->finalize (object);
205 gst_concat_request_new_pad (GstElement * element, GstPadTemplate * templ,
206 const gchar * name, const GstCaps * caps)
208 GstConcat *self = GST_CONCAT (element);
212 GST_DEBUG_OBJECT (element, "requesting pad");
214 g_mutex_lock (&self->lock);
215 pad_name = g_strdup_printf ("sink_%u", self->pad_count);
217 g_mutex_unlock (&self->lock);
219 sinkpad = GST_PAD_CAST (g_object_new (GST_TYPE_CONCAT_PAD,
220 "name", pad_name, "direction", templ->direction, "template", templ,
224 gst_pad_set_chain_function (sinkpad,
225 GST_DEBUG_FUNCPTR (gst_concat_sink_chain));
226 gst_pad_set_event_function (sinkpad,
227 GST_DEBUG_FUNCPTR (gst_concat_sink_event));
228 gst_pad_set_query_function (sinkpad,
229 GST_DEBUG_FUNCPTR (gst_concat_sink_query));
230 GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS);
231 GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_ALLOCATION);
233 gst_pad_set_active (sinkpad, TRUE);
235 g_mutex_lock (&self->lock);
236 self->sinkpads = g_list_prepend (self->sinkpads, gst_object_ref (sinkpad));
237 if (!self->current_sinkpad)
238 self->current_sinkpad = gst_object_ref (sinkpad);
239 g_mutex_unlock (&self->lock);
241 gst_element_add_pad (element, sinkpad);
247 gst_concat_release_pad (GstElement * element, GstPad * pad)
249 GstConcat *self = GST_CONCAT (element);
250 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
252 gboolean current_pad_removed = FALSE;
253 gboolean eos = FALSE;
255 GST_DEBUG_OBJECT (self, "releasing pad");
257 g_mutex_lock (&self->lock);
258 spad->flushing = TRUE;
259 g_cond_broadcast (&self->cond);
260 g_mutex_unlock (&self->lock);
262 gst_pad_set_active (pad, FALSE);
264 /* Now the pad is definitely not running anymore */
266 g_mutex_lock (&self->lock);
267 if (self->current_sinkpad == GST_PAD_CAST (spad)) {
268 eos = ! !gst_concat_switch_pad (self);
269 current_pad_removed = TRUE;
272 for (l = self->sinkpads; l; l = l->next) {
273 if ((gpointer) spad == l->data) {
274 gst_object_unref (spad);
275 self->sinkpads = g_list_delete_link (self->sinkpads, l);
279 g_mutex_unlock (&self->lock);
281 gst_element_remove_pad (GST_ELEMENT_CAST (self), pad);
283 if (GST_STATE (self) > GST_STATE_READY) {
284 if (current_pad_removed && !eos)
285 gst_element_post_message (GST_ELEMENT_CAST (self),
286 gst_message_new_duration_changed (GST_OBJECT_CAST (self)));
288 /* FIXME: Sending EOS from application thread */
290 gst_pad_push_event (self->srcpad, gst_event_new_eos ());
294 /* Returns FALSE if flushing
295 * Must be called from the pad's streaming thread
298 gst_concat_pad_wait (GstConcatPad * spad, GstConcat * self)
300 g_mutex_lock (&self->lock);
301 if (spad->flushing) {
302 g_mutex_unlock (&self->lock);
303 GST_DEBUG_OBJECT (spad, "Flushing");
307 while (spad != GST_CONCAT_PAD_CAST (self->current_sinkpad)) {
308 GST_TRACE_OBJECT (spad, "Not the current sinkpad - waiting");
309 g_cond_wait (&self->cond, &self->lock);
310 if (spad->flushing) {
311 g_mutex_unlock (&self->lock);
312 GST_DEBUG_OBJECT (spad, "Flushing");
316 /* This pad can only become not the current sinkpad from
317 * a) This streaming thread (we hold the stream lock)
318 * b) Releasing the pad (takes the stream lock, see above)
320 * Unlocking here is thus safe and we can safely push
321 * serialized data to our srcpad
323 GST_DEBUG_OBJECT (spad, "Now the current sinkpad");
324 g_mutex_unlock (&self->lock);
330 gst_concat_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
333 GstConcat *self = GST_CONCAT (parent);
334 GstConcatPad *spad = GST_CONCAT_PAD (pad);
336 GST_LOG_OBJECT (pad, "received buffer %p", buffer);
338 if (!gst_concat_pad_wait (spad, self))
339 return GST_FLOW_FLUSHING;
341 if (self->last_stop == GST_CLOCK_TIME_NONE)
342 self->last_stop = spad->segment.start;
344 if (self->format == GST_FORMAT_TIME) {
345 GstClockTime start_time = GST_BUFFER_TIMESTAMP (buffer);
346 GstClockTime end_time = GST_CLOCK_TIME_NONE;
348 if (start_time != GST_CLOCK_TIME_NONE)
349 end_time = start_time;
350 if (GST_BUFFER_DURATION_IS_VALID (buffer))
351 end_time += GST_BUFFER_DURATION (buffer);
353 if (end_time != GST_CLOCK_TIME_NONE && end_time > self->last_stop)
354 self->last_stop = end_time;
356 self->last_stop += gst_buffer_get_size (buffer);
359 ret = gst_pad_push (self->srcpad, buffer);
361 GST_LOG_OBJECT (pad, "handled buffer %s", gst_flow_get_name (ret));
366 /* Returns FALSE if no further pad, must be called with concat lock */
368 gst_concat_switch_pad (GstConcat * self)
375 segment = GST_CONCAT_PAD (self->current_sinkpad)->segment;
377 last_stop = self->last_stop;
378 if (last_stop == GST_CLOCK_TIME_NONE)
379 last_stop = segment.stop;
380 if (last_stop == GST_CLOCK_TIME_NONE)
381 last_stop = segment.start;
382 g_assert (last_stop != GST_CLOCK_TIME_NONE);
384 if (last_stop > segment.stop)
385 last_stop = segment.stop;
387 if (segment.format == GST_FORMAT_TIME)
389 gst_segment_to_running_time (&segment, segment.format, last_stop);
391 last_stop += segment.start;
393 self->current_start_offset += last_stop;
395 for (l = self->sinkpads; l; l = l->next) {
396 if ((gpointer) self->current_sinkpad == l->data) {
398 GST_DEBUG_OBJECT (self,
399 "Switching from pad %" GST_PTR_FORMAT " to %" GST_PTR_FORMAT,
400 self->current_sinkpad, l ? l->data : NULL);
401 gst_object_unref (self->current_sinkpad);
402 self->current_sinkpad = l ? gst_object_ref (l->data) : NULL;
403 g_cond_broadcast (&self->cond);
408 next = self->current_sinkpad != NULL;
410 self->last_stop = GST_CLOCK_TIME_NONE;
416 gst_concat_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
418 GstConcat *self = GST_CONCAT (parent);
419 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
422 GST_LOG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event);
424 switch (GST_EVENT_TYPE (event)) {
425 case GST_EVENT_STREAM_START:{
426 if (!gst_concat_pad_wait (spad, self)) {
428 gst_event_unref (event);
430 ret = gst_pad_event_default (pad, parent, event);
434 case GST_EVENT_SEGMENT:{
435 /* Drop segment event, we create our own one */
436 gst_event_copy_segment (event, &spad->segment);
437 gst_event_unref (event);
439 g_mutex_lock (&self->lock);
440 if (self->format == GST_FORMAT_UNDEFINED) {
441 if (spad->segment.format != GST_FORMAT_TIME
442 && spad->segment.format != GST_FORMAT_BYTES) {
443 g_mutex_unlock (&self->lock);
444 GST_ELEMENT_ERROR (self, CORE, FAILED, (NULL),
445 ("Can only operate in TIME or BYTES format"));
449 self->format = spad->segment.format;
450 GST_DEBUG_OBJECT (self, "Operating in %s format",
451 gst_format_get_name (self->format));
452 g_mutex_unlock (&self->lock);
453 } else if (self->format != spad->segment.format) {
454 g_mutex_unlock (&self->lock);
455 GST_ELEMENT_ERROR (self, CORE, FAILED, (NULL),
456 ("Operating in %s format but new pad has %s",
457 gst_format_get_name (self->format),
458 gst_format_get_name (spad->segment.format)));
461 g_mutex_unlock (&self->lock);
464 if (!gst_concat_pad_wait (spad, self)) {
467 GstSegment segment = spad->segment;
469 /* We know no duration */
470 segment.duration = -1;
472 /* Update segment values to be continous with last stream */
473 if (self->format == GST_FORMAT_TIME) {
474 segment.base += self->current_start_offset;
476 /* Shift start/stop byte position */
477 segment.start += self->current_start_offset;
478 if (segment.stop != -1)
479 segment.stop += self->current_start_offset;
481 gst_pad_push_event (self->srcpad, gst_event_new_segment (&segment));
486 gst_event_unref (event);
487 if (!gst_concat_pad_wait (spad, self)) {
492 g_mutex_lock (&self->lock);
493 next = gst_concat_switch_pad (self);
494 g_mutex_unlock (&self->lock);
498 gst_pad_push_event (self->srcpad, gst_event_new_eos ());
500 gst_element_post_message (GST_ELEMENT_CAST (self),
501 gst_message_new_duration_changed (GST_OBJECT_CAST (self)));
506 case GST_EVENT_FLUSH_START:{
507 g_mutex_lock (&self->lock);
508 spad->flushing = TRUE;
509 g_cond_broadcast (&self->cond);
510 g_mutex_unlock (&self->lock);
513 case GST_EVENT_FLUSH_STOP:{
514 gst_segment_init (&spad->segment, GST_FORMAT_UNDEFINED);
515 spad->flushing = FALSE;
519 /* Wait for other serialized events before forwarding */
520 if (GST_EVENT_IS_SERIALIZED (event) && !gst_concat_pad_wait (spad, self)) {
521 gst_event_unref (event);
524 ret = gst_pad_event_default (pad, parent, event);
534 gst_concat_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
536 GstConcat *self = GST_CONCAT (parent);
537 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
540 GST_LOG_OBJECT (pad, "received query %" GST_PTR_FORMAT, query);
542 switch (GST_QUERY_TYPE (query)) {
544 /* Wait for other serialized queries before forwarding */
545 if (GST_QUERY_IS_SERIALIZED (query) && !gst_concat_pad_wait (spad, self)) {
548 ret = gst_pad_query_default (pad, parent, query);
557 gst_concat_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
559 GstConcat *self = GST_CONCAT (parent);
562 GST_LOG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event);
564 switch (GST_EVENT_TYPE (event)) {
565 case GST_EVENT_SEEK:{
566 GstPad *sinkpad = NULL;
568 g_mutex_lock (&self->lock);
569 if ((sinkpad = self->current_sinkpad))
570 gst_object_ref (sinkpad);
571 g_mutex_unlock (&self->lock);
573 ret = gst_pad_push_event (sinkpad, event);
574 gst_object_unref (sinkpad);
576 gst_event_unref (event);
583 GstClockTimeDiff diff;
584 GstClockTime timestamp;
587 gst_event_parse_qos (event, &type, &proportion, &diff, ×tamp);
588 gst_event_unref (event);
590 if (timestamp != GST_CLOCK_TIME_NONE
591 && timestamp > self->current_start_offset) {
592 timestamp -= self->current_start_offset;
593 event = gst_event_new_qos (type, proportion, diff, timestamp);
594 ret = gst_pad_push_event (self->current_sinkpad, event);
601 ret = gst_pad_event_default (pad, parent, event);
609 gst_concat_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
613 GST_LOG_OBJECT (pad, "received query %" GST_PTR_FORMAT, query);
615 switch (GST_QUERY_TYPE (query)) {
617 ret = gst_pad_query_default (pad, parent, query);
625 reset_pad (const GValue * data, gpointer user_data)
627 GstPad *pad = g_value_get_object (data);
628 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
630 gst_segment_init (&spad->segment, GST_FORMAT_UNDEFINED);
631 spad->flushing = FALSE;
635 unblock_pad (const GValue * data, gpointer user_data)
637 GstPad *pad = g_value_get_object (data);
638 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
640 spad->flushing = TRUE;
643 static GstStateChangeReturn
644 gst_concat_change_state (GstElement * element, GstStateChange transition)
646 GstConcat *self = GST_CONCAT (element);
647 GstStateChangeReturn ret;
649 switch (transition) {
650 case GST_STATE_CHANGE_READY_TO_PAUSED:{
651 self->format = GST_FORMAT_UNDEFINED;
652 self->current_start_offset = 0;
653 self->last_stop = GST_CLOCK_TIME_NONE;
656 case GST_STATE_CHANGE_PAUSED_TO_READY:{
657 GstIterator *iter = gst_element_iterate_sink_pads (element);
658 GstIteratorResult res;
660 g_mutex_lock (&self->lock);
662 res = gst_iterator_foreach (iter, unblock_pad, NULL);
663 } while (res == GST_ITERATOR_RESYNC);
665 gst_iterator_free (iter);
666 g_cond_broadcast (&self->cond);
667 g_mutex_unlock (&self->lock);
669 if (res == GST_ITERATOR_ERROR)
670 return GST_STATE_CHANGE_FAILURE;
678 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
679 if (ret == GST_STATE_CHANGE_FAILURE)
682 switch (transition) {
683 case GST_STATE_CHANGE_PAUSED_TO_READY:{
684 GstIterator *iter = gst_element_iterate_sink_pads (element);
685 GstIteratorResult res;
688 res = gst_iterator_foreach (iter, reset_pad, NULL);
689 } while (res == GST_ITERATOR_RESYNC);
691 gst_iterator_free (iter);
693 if (res == GST_ITERATOR_ERROR)
694 return GST_STATE_CHANGE_FAILURE;