1 /* GStreamer concat element
3 * Copyright (c) 2014 Sebastian Dröge <sebastian@centricular.com>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
26 #include "gstconcat.h"
28 GST_DEBUG_CATEGORY_STATIC (gst_concat_debug);
29 #define GST_CAT_DEFAULT gst_concat_debug
31 G_GNUC_INTERNAL GType gst_concat_pad_get_type (void);
33 #define GST_TYPE_CONCAT_PAD (gst_concat_pad_get_type())
34 #define GST_CONCAT_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_CONCAT_PAD, GstConcatPad))
35 #define GST_CONCAT_PAD_CAST(obj) ((GstConcatPad *)(obj))
36 #define GST_CONCAT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_CONCAT_PAD, GstConcatPadClass))
37 #define GST_IS_CONCAT_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_CONCAT_PAD))
38 #define GST_IS_CONCAT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_CONCAT_PAD))
40 typedef struct _GstConcatPad GstConcatPad;
41 typedef struct _GstConcatPadClass GstConcatPadClass;
49 /* Protected by the concat lock */
53 struct _GstConcatPadClass
58 G_DEFINE_TYPE (GstConcatPad, gst_concat_pad, GST_TYPE_PAD);
61 gst_concat_pad_class_init (GstConcatPadClass * klass)
66 gst_concat_pad_init (GstConcatPad * self)
68 gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
69 self->flushing = FALSE;
72 static GstStaticPadTemplate concat_sink_template =
73 GST_STATIC_PAD_TEMPLATE ("sink_%u",
78 static GstStaticPadTemplate concat_src_template =
79 GST_STATIC_PAD_TEMPLATE ("src",
85 GST_DEBUG_CATEGORY_INIT (gst_concat_debug, "concat", 0, "concat element");
86 #define gst_concat_parent_class parent_class
87 G_DEFINE_TYPE_WITH_CODE (GstConcat, gst_concat, GST_TYPE_ELEMENT, _do_init);
89 static void gst_concat_dispose (GObject * object);
90 static void gst_concat_finalize (GObject * object);
92 static GstStateChangeReturn gst_concat_change_state (GstElement * element,
93 GstStateChange transition);
94 static GstPad *gst_concat_request_new_pad (GstElement * element,
95 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
96 static void gst_concat_release_pad (GstElement * element, GstPad * pad);
98 static GstFlowReturn gst_concat_sink_chain (GstPad * pad, GstObject * parent,
100 static gboolean gst_concat_sink_event (GstPad * pad, GstObject * parent,
102 static gboolean gst_concat_sink_query (GstPad * pad, GstObject * parent,
105 static gboolean gst_concat_src_event (GstPad * pad, GstObject * parent,
107 static gboolean gst_concat_src_query (GstPad * pad, GstObject * parent,
110 static gboolean gst_concat_switch_pad (GstConcat * self);
113 gst_concat_class_init (GstConcatClass * klass)
115 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
116 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
118 gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_concat_dispose);
119 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_concat_finalize);
121 gst_element_class_set_static_metadata (gstelement_class,
122 "Concat", "Generic", "Concatenate multiple streams",
123 "Sebastian Dröge <sebastian@centricular.com>");
125 gst_element_class_add_pad_template (gstelement_class,
126 gst_static_pad_template_get (&concat_sink_template));
127 gst_element_class_add_pad_template (gstelement_class,
128 gst_static_pad_template_get (&concat_src_template));
130 gstelement_class->request_new_pad =
131 GST_DEBUG_FUNCPTR (gst_concat_request_new_pad);
132 gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_concat_release_pad);
133 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_concat_change_state);
137 gst_concat_init (GstConcat * self)
139 g_mutex_init (&self->lock);
140 g_cond_init (&self->cond);
142 self->srcpad = gst_pad_new_from_static_template (&concat_src_template, "src");
143 gst_pad_set_event_function (self->srcpad,
144 GST_DEBUG_FUNCPTR (gst_concat_src_event));
145 gst_pad_set_query_function (self->srcpad,
146 GST_DEBUG_FUNCPTR (gst_concat_src_query));
147 gst_pad_use_fixed_caps (self->srcpad);
149 gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
153 gst_concat_dispose (GObject * object)
155 GstConcat *self = GST_CONCAT (object);
158 gst_object_replace ((GstObject **) & self->current_sinkpad, NULL);
161 for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
162 GstPad *pad = GST_PAD (item->data);
164 if (GST_PAD_IS_SINK (pad)) {
165 gst_element_release_request_pad (GST_ELEMENT (object), pad);
170 G_OBJECT_CLASS (parent_class)->dispose (object);
174 gst_concat_finalize (GObject * object)
176 GstConcat *self = GST_CONCAT (object);
178 g_mutex_clear (&self->lock);
179 g_cond_clear (&self->cond);
181 G_OBJECT_CLASS (parent_class)->finalize (object);
185 gst_concat_request_new_pad (GstElement * element, GstPadTemplate * templ,
186 const gchar * name, const GstCaps * caps)
188 GstConcat *self = GST_CONCAT (element);
192 GST_DEBUG_OBJECT (element, "requesting pad");
194 g_mutex_lock (&self->lock);
195 pad_name = g_strdup_printf ("sink_%u", self->pad_count);
197 g_mutex_unlock (&self->lock);
199 sinkpad = GST_PAD_CAST (g_object_new (GST_TYPE_CONCAT_PAD,
200 "name", pad_name, "direction", templ->direction, "template", templ,
204 gst_pad_set_chain_function (sinkpad,
205 GST_DEBUG_FUNCPTR (gst_concat_sink_chain));
206 gst_pad_set_event_function (sinkpad,
207 GST_DEBUG_FUNCPTR (gst_concat_sink_event));
208 gst_pad_set_query_function (sinkpad,
209 GST_DEBUG_FUNCPTR (gst_concat_sink_query));
210 GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS);
211 GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_ALLOCATION);
213 gst_pad_set_active (sinkpad, TRUE);
215 g_mutex_lock (&self->lock);
216 self->sinkpads = g_list_prepend (self->sinkpads, gst_object_ref (sinkpad));
217 if (!self->current_sinkpad)
218 self->current_sinkpad = gst_object_ref (sinkpad);
219 g_mutex_unlock (&self->lock);
221 gst_element_add_pad (element, sinkpad);
227 gst_concat_release_pad (GstElement * element, GstPad * pad)
229 GstConcat *self = GST_CONCAT (element);
230 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
232 gboolean current_pad_removed = FALSE;
233 gboolean eos = FALSE;
235 GST_DEBUG_OBJECT (self, "releasing pad");
237 g_mutex_lock (&self->lock);
238 spad->flushing = TRUE;
239 g_cond_broadcast (&self->cond);
240 g_mutex_unlock (&self->lock);
242 gst_pad_set_active (pad, FALSE);
244 /* Now the pad is definitely not running anymore */
246 g_mutex_lock (&self->lock);
247 if (self->current_sinkpad == GST_PAD_CAST (spad)) {
248 eos = ! !gst_concat_switch_pad (self);
249 current_pad_removed = TRUE;
252 for (l = self->sinkpads; l; l = l->next) {
253 if ((gpointer) spad == l->data) {
254 gst_object_unref (spad);
255 self->sinkpads = g_list_delete_link (self->sinkpads, l);
259 g_mutex_unlock (&self->lock);
261 gst_element_remove_pad (GST_ELEMENT_CAST (self), pad);
263 if (GST_STATE (self) > GST_STATE_READY) {
264 if (current_pad_removed && !eos)
265 gst_element_post_message (GST_ELEMENT_CAST (self),
266 gst_message_new_duration_changed (GST_OBJECT_CAST (self)));
268 /* FIXME: Sending EOS from application thread */
270 gst_pad_push_event (self->srcpad, gst_event_new_eos ());
274 /* Returns FALSE if flushing
275 * Must be called from the pad's streaming thread
278 gst_concat_pad_wait (GstConcatPad * spad, GstConcat * self)
280 g_mutex_lock (&self->lock);
281 if (spad->flushing) {
282 g_mutex_unlock (&self->lock);
283 GST_DEBUG_OBJECT (spad, "Flushing");
287 while (spad != GST_CONCAT_PAD_CAST (self->current_sinkpad)) {
288 GST_TRACE_OBJECT (spad, "Not the current sinkpad - waiting");
289 g_cond_wait (&self->cond, &self->lock);
290 if (spad->flushing) {
291 g_mutex_unlock (&self->lock);
292 GST_DEBUG_OBJECT (spad, "Flushing");
296 /* This pad can only become not the current sinkpad from
297 * a) This streaming thread (we hold the stream lock)
298 * b) Releasing the pad (takes the stream lock, see above)
300 * Unlocking here is thus safe and we can safely push
301 * serialized data to our srcpad
303 GST_DEBUG_OBJECT (spad, "Now the current sinkpad");
304 g_mutex_unlock (&self->lock);
310 gst_concat_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
313 GstConcat *self = GST_CONCAT (parent);
314 GstConcatPad *spad = GST_CONCAT_PAD (pad);
316 GST_LOG_OBJECT (pad, "received buffer %p", buffer);
318 if (!gst_concat_pad_wait (spad, self))
319 return GST_FLOW_FLUSHING;
321 if (self->last_stop == GST_CLOCK_TIME_NONE)
322 self->last_stop = spad->segment.start;
324 if (self->format == GST_FORMAT_TIME) {
325 GstClockTime start_time = GST_BUFFER_TIMESTAMP (buffer);
326 GstClockTime end_time = GST_CLOCK_TIME_NONE;
328 if (start_time != GST_CLOCK_TIME_NONE)
329 end_time = start_time;
330 if (GST_BUFFER_DURATION_IS_VALID (buffer))
331 end_time += GST_BUFFER_DURATION (buffer);
333 if (end_time != GST_CLOCK_TIME_NONE && end_time > self->last_stop)
334 self->last_stop = end_time;
336 self->last_stop += gst_buffer_get_size (buffer);
339 ret = gst_pad_push (self->srcpad, buffer);
341 GST_LOG_OBJECT (pad, "handled buffer %s", gst_flow_get_name (ret));
346 /* Returns FALSE if no further pad, must be called with concat lock */
348 gst_concat_switch_pad (GstConcat * self)
355 segment = GST_CONCAT_PAD (self->current_sinkpad)->segment;
357 last_stop = self->last_stop;
358 if (last_stop == GST_CLOCK_TIME_NONE)
359 last_stop = segment.stop;
360 if (last_stop == GST_CLOCK_TIME_NONE)
361 last_stop = segment.start;
362 g_assert (last_stop != GST_CLOCK_TIME_NONE);
364 if (last_stop > segment.stop)
365 last_stop = segment.stop;
367 if (segment.format == GST_FORMAT_TIME)
369 gst_segment_to_running_time (&segment, segment.format, last_stop);
371 last_stop += segment.start;
373 self->current_start_offset += last_stop;
375 for (l = self->sinkpads; l; l = l->next) {
376 if ((gpointer) self->current_sinkpad == l->data) {
378 GST_DEBUG_OBJECT (self,
379 "Switching from pad %" GST_PTR_FORMAT " to %" GST_PTR_FORMAT,
380 self->current_sinkpad, l ? l->data : NULL);
381 gst_object_unref (self->current_sinkpad);
382 self->current_sinkpad = l ? gst_object_ref (l->data) : NULL;
383 g_cond_broadcast (&self->cond);
388 next = self->current_sinkpad != NULL;
390 self->last_stop = GST_CLOCK_TIME_NONE;
396 gst_concat_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
398 GstConcat *self = GST_CONCAT (parent);
399 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
402 GST_LOG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event);
404 switch (GST_EVENT_TYPE (event)) {
405 case GST_EVENT_STREAM_START:{
406 if (!gst_concat_pad_wait (spad, self)) {
408 gst_event_unref (event);
410 ret = gst_pad_event_default (pad, parent, event);
414 case GST_EVENT_SEGMENT:{
415 /* Drop segment event, we create our own one */
416 gst_event_copy_segment (event, &spad->segment);
417 gst_event_unref (event);
419 g_mutex_lock (&self->lock);
420 if (self->format == GST_FORMAT_UNDEFINED) {
421 if (spad->segment.format != GST_FORMAT_TIME
422 && spad->segment.format != GST_FORMAT_BYTES) {
423 g_mutex_unlock (&self->lock);
424 GST_ELEMENT_ERROR (self, CORE, FAILED, (NULL),
425 ("Can only operate in TIME or BYTES format"));
429 self->format = spad->segment.format;
430 GST_DEBUG_OBJECT (self, "Operating in %s format",
431 gst_format_get_name (self->format));
432 g_mutex_unlock (&self->lock);
433 } else if (self->format != spad->segment.format) {
434 g_mutex_unlock (&self->lock);
435 GST_ELEMENT_ERROR (self, CORE, FAILED, (NULL),
436 ("Operating in %s format but new pad has %s",
437 gst_format_get_name (self->format),
438 gst_format_get_name (spad->segment.format)));
441 g_mutex_unlock (&self->lock);
444 if (!gst_concat_pad_wait (spad, self)) {
447 GstSegment segment = spad->segment;
449 /* We know no duration */
450 segment.duration = -1;
452 /* Update segment values to be contiguous with last stream */
453 if (self->format == GST_FORMAT_TIME) {
454 segment.base += self->current_start_offset;
456 /* Shift start/stop byte position */
457 segment.start += self->current_start_offset;
458 if (segment.stop != -1)
459 segment.stop += self->current_start_offset;
461 gst_pad_push_event (self->srcpad, gst_event_new_segment (&segment));
466 gst_event_unref (event);
467 if (!gst_concat_pad_wait (spad, self)) {
472 g_mutex_lock (&self->lock);
473 next = gst_concat_switch_pad (self);
474 g_mutex_unlock (&self->lock);
478 gst_pad_push_event (self->srcpad, gst_event_new_eos ());
480 gst_element_post_message (GST_ELEMENT_CAST (self),
481 gst_message_new_duration_changed (GST_OBJECT_CAST (self)));
486 case GST_EVENT_FLUSH_START:{
487 g_mutex_lock (&self->lock);
488 spad->flushing = TRUE;
489 g_cond_broadcast (&self->cond);
490 g_mutex_unlock (&self->lock);
493 case GST_EVENT_FLUSH_STOP:{
494 gst_segment_init (&spad->segment, GST_FORMAT_UNDEFINED);
495 spad->flushing = FALSE;
499 /* Wait for other serialized events before forwarding */
500 if (GST_EVENT_IS_SERIALIZED (event) && !gst_concat_pad_wait (spad, self)) {
501 gst_event_unref (event);
504 ret = gst_pad_event_default (pad, parent, event);
514 gst_concat_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
516 GstConcat *self = GST_CONCAT (parent);
517 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
520 GST_LOG_OBJECT (pad, "received query %" GST_PTR_FORMAT, query);
522 switch (GST_QUERY_TYPE (query)) {
524 /* Wait for other serialized queries before forwarding */
525 if (GST_QUERY_IS_SERIALIZED (query) && !gst_concat_pad_wait (spad, self)) {
528 ret = gst_pad_query_default (pad, parent, query);
537 gst_concat_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
539 GstConcat *self = GST_CONCAT (parent);
542 GST_LOG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event);
544 switch (GST_EVENT_TYPE (event)) {
545 case GST_EVENT_SEEK:{
546 /* We don't support seeking */
547 gst_event_unref (event);
553 GstClockTimeDiff diff;
554 GstClockTime timestamp;
557 gst_event_parse_qos (event, &type, &proportion, &diff, ×tamp);
558 gst_event_unref (event);
560 if (timestamp != GST_CLOCK_TIME_NONE
561 && timestamp > self->current_start_offset) {
562 timestamp -= self->current_start_offset;
563 event = gst_event_new_qos (type, proportion, diff, timestamp);
564 ret = gst_pad_push_event (self->current_sinkpad, event);
571 ret = gst_pad_event_default (pad, parent, event);
579 gst_concat_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
583 GST_LOG_OBJECT (pad, "received query %" GST_PTR_FORMAT, query);
585 switch (GST_QUERY_TYPE (query)) {
587 ret = gst_pad_query_default (pad, parent, query);
595 reset_pad (const GValue * data, gpointer user_data)
597 GstPad *pad = g_value_get_object (data);
598 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
600 gst_segment_init (&spad->segment, GST_FORMAT_UNDEFINED);
601 spad->flushing = FALSE;
605 unblock_pad (const GValue * data, gpointer user_data)
607 GstPad *pad = g_value_get_object (data);
608 GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
610 spad->flushing = TRUE;
613 static GstStateChangeReturn
614 gst_concat_change_state (GstElement * element, GstStateChange transition)
616 GstConcat *self = GST_CONCAT (element);
617 GstStateChangeReturn ret;
619 switch (transition) {
620 case GST_STATE_CHANGE_READY_TO_PAUSED:{
621 self->format = GST_FORMAT_UNDEFINED;
622 self->current_start_offset = 0;
623 self->last_stop = GST_CLOCK_TIME_NONE;
626 case GST_STATE_CHANGE_PAUSED_TO_READY:{
627 GstIterator *iter = gst_element_iterate_sink_pads (element);
628 GstIteratorResult res;
630 g_mutex_lock (&self->lock);
632 res = gst_iterator_foreach (iter, unblock_pad, NULL);
633 } while (res == GST_ITERATOR_RESYNC);
635 gst_iterator_free (iter);
636 g_cond_broadcast (&self->cond);
637 g_mutex_unlock (&self->lock);
639 if (res == GST_ITERATOR_ERROR)
640 return GST_STATE_CHANGE_FAILURE;
648 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
649 if (ret == GST_STATE_CHANGE_FAILURE)
652 switch (transition) {
653 case GST_STATE_CHANGE_PAUSED_TO_READY:{
654 GstIterator *iter = gst_element_iterate_sink_pads (element);
655 GstIteratorResult res;
658 res = gst_iterator_foreach (iter, reset_pad, NULL);
659 } while (res == GST_ITERATOR_RESYNC);
661 gst_iterator_free (iter);
663 if (res == GST_ITERATOR_ERROR)
664 return GST_STATE_CHANGE_FAILURE;