2 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
24 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
25 * with newer GLib versions (>= 2.31.0) */
26 #define GLIB_DISABLE_DEPRECATION_WARNINGS
28 #include "gststreamsynchronizer.h"
29 #include "gst/glib-compat-private.h"
31 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
32 #define GST_CAT_DEFAULT stream_synchronizer_debug
34 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START { \
35 GST_LOG_OBJECT (obj, \
36 "locking from thread %p", \
38 g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
39 GST_LOG_OBJECT (obj, \
40 "locked from thread %p", \
44 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START { \
45 GST_LOG_OBJECT (obj, \
46 "unlocking from thread %p", \
48 g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
51 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
55 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
60 static const gboolean passthrough = TRUE;
62 GST_BOILERPLATE (GstStreamSynchronizer, gst_stream_synchronizer,
63 GstElement, GST_TYPE_ELEMENT);
67 GstStreamSynchronizer *transform;
75 gboolean drop_discont;
79 gint64 running_time_diff;
82 /* Must be called with lock! */
84 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
86 if (stream->sinkpad == pad)
87 return gst_object_ref (stream->srcpad);
88 else if (stream->srcpad == pad)
89 return gst_object_ref (stream->sinkpad);
95 gst_stream_get_other_pad_from_pad (GstPad * pad)
97 GstObject *parent = gst_pad_get_parent (pad);
98 GstStreamSynchronizer *self;
102 /* released pad does not have parent anymore */
103 if (!G_LIKELY (parent))
106 self = GST_STREAM_SYNCHRONIZER (parent);
107 GST_STREAM_SYNCHRONIZER_LOCK (self);
108 stream = gst_pad_get_element_private (pad);
112 opad = gst_stream_get_other_pad (stream, pad);
115 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
116 gst_object_unref (self);
120 GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
125 /* Generic pad functions */
127 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
129 GstIterator *it = NULL;
132 opad = gst_stream_get_other_pad_from_pad (pad);
134 it = gst_iterator_new_single (GST_TYPE_PAD, opad,
135 (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
136 gst_object_unref (opad);
143 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
146 gboolean ret = FALSE;
148 GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
150 opad = gst_stream_get_other_pad_from_pad (pad);
152 ret = gst_pad_peer_query (opad, query);
153 gst_object_unref (opad);
160 gst_stream_synchronizer_getcaps (GstPad * pad)
165 opad = gst_stream_get_other_pad_from_pad (pad);
167 ret = gst_pad_peer_get_caps (opad);
168 gst_object_unref (opad);
172 ret = gst_caps_new_any ();
174 GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
180 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
183 gboolean ret = FALSE;
185 opad = gst_stream_get_other_pad_from_pad (pad);
187 ret = gst_pad_peer_accept_caps (opad, caps);
188 gst_object_unref (opad);
191 GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
197 /* srcpad functions */
199 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
201 GstStreamSynchronizer *self =
202 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
204 gboolean ret = FALSE;
207 goto skip_adjustments;
209 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
210 GST_EVENT_TYPE_NAME (event), event->structure);
212 switch (GST_EVENT_TYPE (event)) {
215 GstClockTimeDiff diff;
216 GstClockTime timestamp;
217 gint64 running_time_diff;
220 gst_event_parse_qos (event, &proportion, &diff, ×tamp);
221 gst_event_unref (event);
223 GST_STREAM_SYNCHRONIZER_LOCK (self);
224 stream = gst_pad_get_element_private (pad);
226 running_time_diff = stream->running_time_diff;
228 running_time_diff = -1;
229 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
231 if (running_time_diff == -1) {
232 GST_WARNING_OBJECT (pad, "QOS event before group start");
234 } else if (timestamp < running_time_diff) {
235 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
240 "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
241 GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
242 GST_TIME_ARGS (running_time_diff),
243 GST_TIME_ARGS (timestamp - running_time_diff));
245 timestamp -= running_time_diff;
247 /* That case is invalid for QoS events */
248 if (diff < 0 && -diff > timestamp) {
249 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
254 event = gst_event_new_qos (proportion, diff, timestamp);
263 opad = gst_stream_get_other_pad_from_pad (pad);
265 ret = gst_pad_push_event (opad, event);
266 gst_object_unref (opad);
270 gst_object_unref (self);
275 /* sinkpad functions */
277 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
279 GstStreamSynchronizer *self =
280 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
282 gboolean ret = FALSE;
285 goto skip_adjustments;
287 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
288 GST_EVENT_TYPE_NAME (event), event->structure);
290 switch (GST_EVENT_TYPE (event)) {
291 case GST_EVENT_SINK_MESSAGE:{
294 gst_event_parse_sink_message (event, &message);
295 if (message->structure
296 && gst_structure_has_name (message->structure,
297 "playbin2-stream-changed")) {
300 GST_STREAM_SYNCHRONIZER_LOCK (self);
301 stream = gst_pad_get_element_private (pad);
304 gboolean all_wait = TRUE;
306 GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
308 stream->is_eos = FALSE;
310 stream->new_stream = TRUE;
312 for (l = self->streams; l; l = l->next) {
313 GstStream *ostream = l->data;
315 all_wait = all_wait && ostream->wait;
320 gint64 last_stop = 0;
322 GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
324 for (l = self->streams; l; l = l->next) {
325 GstStream *ostream = l->data;
326 gint64 stop_running_time;
327 gint64 last_stop_running_time;
329 ostream->wait = FALSE;
332 gst_segment_to_running_time (&ostream->segment,
333 GST_FORMAT_TIME, ostream->segment.stop);
334 last_stop_running_time =
335 gst_segment_to_running_time (&ostream->segment,
336 GST_FORMAT_TIME, ostream->segment.last_stop);
338 MAX (last_stop, MAX (stop_running_time,
339 last_stop_running_time));
341 last_stop = MAX (0, last_stop);
342 self->group_start_time = MAX (self->group_start_time, last_stop);
344 GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
345 GST_TIME_ARGS (self->group_start_time));
347 g_cond_broadcast (self->stream_finish_cond);
350 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
352 gst_message_unref (message);
355 case GST_EVENT_NEWSEGMENT:{
358 gdouble rate, applied_rate;
360 gint64 start, stop, position;
362 gst_event_parse_new_segment_full (event,
363 &update, &rate, &applied_rate, &format, &start, &stop, &position);
365 GST_STREAM_SYNCHRONIZER_LOCK (self);
366 stream = gst_pad_get_element_private (pad);
369 GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
370 g_cond_wait (self->stream_finish_cond, self->lock);
371 stream = gst_pad_get_element_private (pad);
373 stream->wait = FALSE;
377 if (self->shutdown) {
378 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
379 gst_event_unref (event);
383 if (stream && format == GST_FORMAT_TIME) {
384 if (stream->new_stream) {
385 gint64 last_stop_running_time = 0;
386 gint64 stop_running_time = 0;
388 if (stream->segment.format == GST_FORMAT_TIME) {
389 last_stop_running_time =
390 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
391 stream->segment.last_stop);
392 last_stop_running_time = MAX (last_stop_running_time, 0);
394 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
395 stream->segment.stop);
396 stop_running_time = MAX (last_stop_running_time, 0);
398 if (stop_running_time != last_stop_running_time) {
399 GST_WARNING_OBJECT (pad,
400 "Gap between last_stop and segment stop: %" GST_TIME_FORMAT
401 " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
402 GST_TIME_ARGS (last_stop_running_time));
405 if (stop_running_time < last_stop_running_time) {
406 GST_DEBUG_OBJECT (pad, "Updating stop position");
407 gst_pad_push_event (stream->srcpad,
408 gst_event_new_new_segment_full (TRUE, stream->segment.rate,
409 stream->segment.applied_rate, GST_FORMAT_TIME,
410 stream->segment.start, stream->segment.last_stop,
411 stream->segment.time));
412 gst_segment_set_newsegment_full (&stream->segment, TRUE,
413 stream->segment.rate, stream->segment.applied_rate,
414 GST_FORMAT_TIME, stream->segment.start,
415 stream->segment.last_stop, stream->segment.time);
417 stop_running_time = MAX (stop_running_time, last_stop_running_time);
418 GST_DEBUG_OBJECT (pad,
419 "Stop running time of last group: %" GST_TIME_FORMAT,
420 GST_TIME_ARGS (stop_running_time));
422 stream->new_stream = FALSE;
423 stream->drop_discont = TRUE;
425 if (stop_running_time < self->group_start_time) {
426 gint64 diff = self->group_start_time - stop_running_time;
428 GST_DEBUG_OBJECT (pad,
429 "Advancing running time for other streams by: %"
430 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
431 gst_pad_push_event (stream->srcpad,
432 gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
433 GST_FORMAT_TIME, 0, diff, 0));
434 gst_segment_set_newsegment_full (&stream->segment, FALSE, 1.0, 1.0,
435 GST_FORMAT_TIME, 0, diff, 0);
439 GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
441 gst_segment_set_newsegment_full (&stream->segment, update, rate,
442 applied_rate, format, start, stop, position);
443 GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
446 GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
447 GST_TIME_ARGS (stream->segment.accum));
448 stream->running_time_diff = stream->segment.accum;
450 GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
451 gst_format_get_name (format));
452 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
454 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
457 case GST_EVENT_FLUSH_STOP:{
460 GST_STREAM_SYNCHRONIZER_LOCK (self);
461 stream = gst_pad_get_element_private (pad);
463 GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
464 stream->stream_number);
465 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
467 stream->is_eos = FALSE;
468 stream->wait = FALSE;
469 stream->new_stream = FALSE;
470 stream->drop_discont = FALSE;
471 stream->seen_data = FALSE;
473 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
479 gboolean all_eos = TRUE;
484 GST_STREAM_SYNCHRONIZER_LOCK (self);
485 stream = gst_pad_get_element_private (pad);
487 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
488 GST_WARNING_OBJECT (pad, "EOS for unknown stream");
492 GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
493 stream->is_eos = TRUE;
495 seen_data = stream->seen_data;
496 srcpad = gst_object_ref (stream->srcpad);
498 for (l = self->streams; l; l = l->next) {
499 GstStream *ostream = l->data;
501 all_eos = all_eos && ostream->is_eos;
507 GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
508 for (l = self->streams; l; l = l->next) {
509 GstStream *ostream = l->data;
510 /* local snapshot of current pads */
511 gst_object_ref (ostream->srcpad);
512 pads = g_slist_prepend (pads, ostream->srcpad);
515 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
516 /* drop lock when sending eos, which may block in e.g. preroll */
525 GST_DEBUG_OBJECT (pad, "Pushing EOS");
526 ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
527 gst_object_unref (pad);
528 epad = g_slist_next (epad);
532 /* if EOS, but no data has passed, then send something to replace EOS
533 * for preroll purposes */
535 GstBuffer *buf = gst_buffer_new ();
537 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
538 gst_pad_push (srcpad, buf);
541 gst_object_unref (srcpad);
551 opad = gst_stream_get_other_pad_from_pad (pad);
553 ret = gst_pad_push_event (opad, event);
554 gst_object_unref (opad);
558 gst_object_unref (self);
564 gst_stream_synchronizer_sink_bufferalloc (GstPad * pad, guint64 offset,
565 guint size, GstCaps * caps, GstBuffer ** buf)
568 GstFlowReturn ret = GST_FLOW_OK;
570 GST_LOG_OBJECT (pad, "Allocating buffer: size=%u", size);
572 opad = gst_stream_get_other_pad_from_pad (pad);
574 ret = gst_pad_alloc_buffer (opad, offset, size, caps, buf);
575 gst_object_unref (opad);
577 /* may have been released during shutdown;
578 * silently trigger fallback */
582 GST_LOG_OBJECT (pad, "Allocation: %s", gst_flow_get_name (ret));
588 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
590 GstStreamSynchronizer *self =
591 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
593 GstFlowReturn ret = GST_FLOW_ERROR;
595 GstClockTime timestamp = GST_CLOCK_TIME_NONE;
596 GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
599 opad = gst_stream_get_other_pad_from_pad (pad);
601 ret = gst_pad_push (opad, buffer);
602 gst_object_unref (opad);
607 GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
608 GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
609 " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
610 buffer, GST_BUFFER_SIZE (buffer),
611 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
612 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
613 GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
615 timestamp = GST_BUFFER_TIMESTAMP (buffer);
616 if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
617 && GST_BUFFER_DURATION_IS_VALID (buffer))
618 timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
620 GST_STREAM_SYNCHRONIZER_LOCK (self);
621 stream = gst_pad_get_element_private (pad);
624 stream->seen_data = TRUE;
625 if (stream && stream->drop_discont) {
626 buffer = gst_buffer_make_metadata_writable (buffer);
627 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
628 stream->drop_discont = FALSE;
631 if (stream && stream->segment.format == GST_FORMAT_TIME
632 && GST_CLOCK_TIME_IS_VALID (timestamp)) {
634 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
635 GST_TIME_ARGS (stream->segment.last_stop), GST_TIME_ARGS (timestamp));
636 gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME, timestamp);
638 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
640 opad = gst_stream_get_other_pad_from_pad (pad);
642 ret = gst_pad_push (opad, buffer);
643 gst_object_unref (opad);
646 GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
647 if (ret == GST_FLOW_OK) {
650 GST_STREAM_SYNCHRONIZER_LOCK (self);
651 stream = gst_pad_get_element_private (pad);
652 if (stream && stream->segment.format == GST_FORMAT_TIME
653 && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
655 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
656 GST_TIME_ARGS (stream->segment.last_stop),
657 GST_TIME_ARGS (timestamp_end));
658 gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME,
662 /* Advance EOS streams if necessary. For non-EOS
663 * streams the demuxers should already do this! */
664 for (l = self->streams; l; l = l->next) {
665 GstStream *ostream = l->data;
668 if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
671 if (ostream->segment.last_stop != -1)
672 last_stop = ostream->segment.last_stop;
674 last_stop = ostream->segment.start;
676 /* Is there a 1 second lag? */
677 if (last_stop != -1 && last_stop + GST_SECOND < timestamp_end) {
678 gint64 new_start, new_stop;
680 new_start = timestamp_end - GST_SECOND;
681 if (ostream->segment.stop == -1)
684 new_stop = MAX (new_start, ostream->segment.stop);
686 GST_DEBUG_OBJECT (ostream->sinkpad,
687 "Advancing stream %u from %" GST_TIME_FORMAT " to %"
688 GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (last_stop),
689 GST_TIME_ARGS (new_start));
691 gst_pad_push_event (ostream->srcpad,
692 gst_event_new_new_segment_full (TRUE, ostream->segment.rate,
693 ostream->segment.applied_rate, ostream->segment.format,
694 new_start, new_stop, new_start));
695 gst_segment_set_newsegment_full (&ostream->segment, TRUE,
696 ostream->segment.rate, ostream->segment.applied_rate,
697 ostream->segment.format, new_start, new_stop, new_start);
698 gst_segment_set_last_stop (&ostream->segment, GST_FORMAT_TIME,
702 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
707 gst_object_unref (self);
712 /* GstElement vfuncs */
714 gst_stream_synchronizer_request_new_pad (GstElement * element,
715 GstPadTemplate * temp, const gchar * name)
717 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
721 GST_STREAM_SYNCHRONIZER_LOCK (self);
722 GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
723 self->current_stream_number);
725 stream = g_slice_new0 (GstStream);
726 stream->transform = self;
727 stream->stream_number = self->current_stream_number;
729 tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
730 stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
732 gst_pad_set_element_private (stream->sinkpad, stream);
733 gst_pad_set_iterate_internal_links_function (stream->sinkpad,
734 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
735 gst_pad_set_query_function (stream->sinkpad,
736 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
737 gst_pad_set_getcaps_function (stream->sinkpad,
738 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
739 gst_pad_set_acceptcaps_function (stream->sinkpad,
740 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
741 gst_pad_set_event_function (stream->sinkpad,
742 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
743 gst_pad_set_chain_function (stream->sinkpad,
744 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
745 gst_pad_set_bufferalloc_function (stream->sinkpad,
746 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_bufferalloc));
748 tmp = g_strdup_printf ("src_%d", self->current_stream_number);
749 stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
751 gst_pad_set_element_private (stream->srcpad, stream);
752 gst_pad_set_iterate_internal_links_function (stream->srcpad,
753 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
754 gst_pad_set_query_function (stream->srcpad,
755 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
756 gst_pad_set_getcaps_function (stream->srcpad,
757 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
758 gst_pad_set_acceptcaps_function (stream->srcpad,
759 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
760 gst_pad_set_event_function (stream->srcpad,
761 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
763 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
765 self->streams = g_list_prepend (self->streams, stream);
766 self->current_stream_number++;
767 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
769 /* Add pads and activate unless we're going to NULL */
770 g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
771 if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
772 gst_pad_set_active (stream->srcpad, TRUE);
773 gst_pad_set_active (stream->sinkpad, TRUE);
775 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
776 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
777 g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
779 return stream->sinkpad;
782 /* Must be called with lock! */
784 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
789 GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
791 for (l = self->streams; l; l = l->next) {
792 if (l->data == stream) {
793 self->streams = g_list_delete_link (self->streams, l);
797 g_assert (l != NULL);
799 /* we can drop the lock, since stream exists now only local.
800 * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
801 * (due to reverse lock order) when deactivating pads */
802 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
804 gst_pad_set_element_private (stream->srcpad, NULL);
805 gst_pad_set_element_private (stream->sinkpad, NULL);
806 gst_pad_set_active (stream->srcpad, FALSE);
807 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
808 gst_pad_set_active (stream->sinkpad, FALSE);
809 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
811 if (stream->segment.format == GST_FORMAT_TIME) {
812 gint64 stop_running_time;
813 gint64 last_stop_running_time;
816 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
817 stream->segment.stop);
818 last_stop_running_time =
819 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
820 stream->segment.last_stop);
821 stop_running_time = MAX (stop_running_time, last_stop_running_time);
823 GST_DEBUG_OBJECT (stream->sinkpad,
824 "Stop running time was: %" GST_TIME_FORMAT,
825 GST_TIME_ARGS (stop_running_time));
827 self->group_start_time = MAX (self->group_start_time, stop_running_time);
830 g_slice_free (GstStream, stream);
832 /* NOTE: In theory we have to check here if all streams
833 * are EOS but the one that was removed wasn't and then
834 * send EOS downstream. But due to the way how playsink
835 * works this is not necessary and will only cause problems
836 * for gapless playback. playsink will only add/remove pads
837 * when it's reconfigured, which happens when the streams
841 /* lock for good measure, since the caller had it */
842 GST_STREAM_SYNCHRONIZER_LOCK (self);
846 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
848 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
851 GST_STREAM_SYNCHRONIZER_LOCK (self);
852 stream = gst_pad_get_element_private (pad);
854 g_assert (stream->sinkpad == pad);
856 gst_stream_synchronizer_release_stream (self, stream);
858 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
861 static GstStateChangeReturn
862 gst_stream_synchronizer_change_state (GstElement * element,
863 GstStateChange transition)
865 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
866 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
868 switch (transition) {
869 case GST_STATE_CHANGE_NULL_TO_READY:
870 GST_DEBUG_OBJECT (self, "State change NULL->READY");
871 self->shutdown = FALSE;
873 case GST_STATE_CHANGE_READY_TO_PAUSED:
874 GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
875 self->group_start_time = 0;
876 self->shutdown = FALSE;
878 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
879 GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
881 case GST_STATE_CHANGE_PAUSED_TO_READY:
882 GST_DEBUG_OBJECT (self, "State change READY->NULL");
884 GST_STREAM_SYNCHRONIZER_LOCK (self);
885 g_cond_broadcast (self->stream_finish_cond);
886 self->shutdown = TRUE;
887 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
893 GstStateChangeReturn bret;
895 bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
896 GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
897 if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
901 switch (transition) {
902 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
903 GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
905 case GST_STATE_CHANGE_PAUSED_TO_READY:{
908 GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
909 self->group_start_time = 0;
911 GST_STREAM_SYNCHRONIZER_LOCK (self);
912 for (l = self->streams; l; l = l->next) {
913 GstStream *stream = l->data;
915 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
916 stream->wait = FALSE;
917 stream->new_stream = FALSE;
918 stream->drop_discont = FALSE;
919 stream->is_eos = FALSE;
921 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
924 case GST_STATE_CHANGE_READY_TO_NULL:{
925 GST_DEBUG_OBJECT (self, "State change READY->NULL");
927 GST_STREAM_SYNCHRONIZER_LOCK (self);
928 while (self->streams)
929 gst_stream_synchronizer_release_stream (self, self->streams->data);
930 self->current_stream_number = 0;
931 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
943 gst_stream_synchronizer_finalize (GObject * object)
945 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
948 g_mutex_free (self->lock);
952 if (self->stream_finish_cond) {
953 g_cond_free (self->stream_finish_cond);
954 self->stream_finish_cond = NULL;
957 G_OBJECT_CLASS (parent_class)->finalize (object);
960 /* GObject type initialization */
962 gst_stream_synchronizer_init (GstStreamSynchronizer * self,
963 GstStreamSynchronizerClass * klass)
965 self->lock = g_mutex_new ();
966 self->stream_finish_cond = g_cond_new ();
970 gst_stream_synchronizer_base_init (gpointer g_class)
972 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
974 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
975 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
977 gst_element_class_set_details_simple (gstelement_class,
978 "Stream Synchronizer", "Generic",
979 "Synchronizes a group of streams to have equal durations and starting points",
980 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
984 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
986 GObjectClass *gobject_class = (GObjectClass *) klass;
987 GstElementClass *element_class = (GstElementClass *) klass;
989 GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
990 "streamsynchronizer", 0, "Stream Synchronizer");
992 gobject_class->finalize = gst_stream_synchronizer_finalize;
994 element_class->change_state =
995 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
996 element_class->request_new_pad =
997 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
998 element_class->release_pad =
999 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);