2 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
24 #include "gststreamsynchronizer.h"
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START { \
30 GST_LOG_OBJECT (obj, \
31 "locking from thread %p", \
33 g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
34 GST_LOG_OBJECT (obj, \
35 "locked from thread %p", \
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START { \
40 GST_LOG_OBJECT (obj, \
41 "unlocking from thread %p", \
43 g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
55 static const gboolean passthrough = TRUE;
57 #define gst_stream_synchronizer_parent_class parent_class
58 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
63 GstStreamSynchronizer *transform;
71 gboolean drop_discont;
75 gint64 running_time_diff;
78 /* Must be called with lock! */
80 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
82 if (stream->sinkpad == pad)
83 return gst_object_ref (stream->srcpad);
84 else if (stream->srcpad == pad)
85 return gst_object_ref (stream->sinkpad);
91 gst_stream_get_other_pad_from_pad (GstPad * pad)
93 GstStreamSynchronizer *self =
94 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
98 GST_STREAM_SYNCHRONIZER_LOCK (self);
99 stream = gst_pad_get_element_private (pad);
103 opad = gst_stream_get_other_pad (stream, pad);
106 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
107 gst_object_unref (self);
110 GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
115 /* Generic pad functions */
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
119 GstIterator *it = NULL;
122 opad = gst_stream_get_other_pad_from_pad (pad);
124 GValue value = { 0, };
126 g_value_init (&value, GST_TYPE_PAD);
127 g_value_set_object (&value, opad);
128 it = gst_iterator_new_single (GST_TYPE_PAD, &value);
129 g_value_unset (&value);
130 gst_object_unref (opad);
137 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
140 gboolean ret = FALSE;
142 GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
144 opad = gst_stream_get_other_pad_from_pad (pad);
146 ret = gst_pad_peer_query (opad, query);
147 gst_object_unref (opad);
154 gst_stream_synchronizer_getcaps (GstPad * pad, GstCaps * filter)
159 opad = gst_stream_get_other_pad_from_pad (pad);
161 ret = gst_pad_peer_get_caps (opad, filter);
162 gst_object_unref (opad);
166 ret = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
168 GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
174 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
177 gboolean ret = FALSE;
179 opad = gst_stream_get_other_pad_from_pad (pad);
181 ret = gst_pad_peer_accept_caps (opad, caps);
182 gst_object_unref (opad);
185 GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
191 /* srcpad functions */
193 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
195 GstStreamSynchronizer *self =
196 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
198 gboolean ret = FALSE;
201 goto skip_adjustments;
203 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
204 GST_EVENT_TYPE_NAME (event), event);
206 switch (GST_EVENT_TYPE (event)) {
209 GstClockTimeDiff diff;
210 GstClockTime timestamp;
211 gint64 running_time_diff;
214 gst_event_parse_qos (event, NULL, &proportion, &diff, ×tamp);
215 gst_event_unref (event);
217 GST_STREAM_SYNCHRONIZER_LOCK (self);
218 stream = gst_pad_get_element_private (pad);
220 running_time_diff = stream->running_time_diff;
222 running_time_diff = -1;
223 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
225 if (running_time_diff == -1) {
226 GST_WARNING_OBJECT (pad, "QOS event before group start");
228 } else if (timestamp < running_time_diff) {
229 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
234 "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
235 GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
236 GST_TIME_ARGS (running_time_diff),
237 GST_TIME_ARGS (timestamp - running_time_diff));
239 timestamp -= running_time_diff;
241 /* That case is invalid for QoS events */
242 if (diff < 0 && -diff > timestamp) {
243 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
249 gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
259 opad = gst_stream_get_other_pad_from_pad (pad);
261 ret = gst_pad_push_event (opad, event);
262 gst_object_unref (opad);
266 gst_object_unref (self);
271 /* sinkpad functions */
273 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
275 GstStreamSynchronizer *self =
276 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
278 gboolean ret = FALSE;
281 goto skip_adjustments;
283 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
284 GST_EVENT_TYPE_NAME (event), event);
286 switch (GST_EVENT_TYPE (event)) {
287 case GST_EVENT_SINK_MESSAGE:{
290 gst_event_parse_sink_message (event, &message);
291 if (gst_message_has_name (message, "playbin-stream-changed")) {
294 GST_STREAM_SYNCHRONIZER_LOCK (self);
295 stream = gst_pad_get_element_private (pad);
298 gboolean all_wait = TRUE;
300 GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
302 stream->is_eos = FALSE;
304 stream->new_stream = TRUE;
306 for (l = self->streams; l; l = l->next) {
307 GstStream *ostream = l->data;
309 all_wait = all_wait && ostream->wait;
316 GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
318 for (l = self->streams; l; l = l->next) {
319 GstStream *ostream = l->data;
320 gint64 stop_running_time;
321 gint64 position_running_time;
323 ostream->wait = FALSE;
326 gst_segment_to_running_time (&ostream->segment,
327 GST_FORMAT_TIME, ostream->segment.stop);
328 position_running_time =
329 gst_segment_to_running_time (&ostream->segment,
330 GST_FORMAT_TIME, ostream->segment.position);
332 MAX (position, MAX (stop_running_time,
333 position_running_time));
335 position = MAX (0, position);
336 self->group_start_time = MAX (self->group_start_time, position);
338 GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
339 GST_TIME_ARGS (self->group_start_time));
341 g_cond_broadcast (self->stream_finish_cond);
344 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
346 gst_message_unref (message);
349 case GST_EVENT_SEGMENT:{
353 gst_event_copy_segment (event, &segment);
355 GST_STREAM_SYNCHRONIZER_LOCK (self);
356 stream = gst_pad_get_element_private (pad);
359 GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
360 g_cond_wait (self->stream_finish_cond, self->lock);
361 stream = gst_pad_get_element_private (pad);
363 stream->wait = FALSE;
367 if (self->shutdown) {
368 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
369 gst_event_unref (event);
373 if (stream && segment.format == GST_FORMAT_TIME) {
374 if (stream->new_stream) {
375 gint64 position_running_time = 0;
376 gint64 stop_running_time = 0;
378 if (stream->segment.format == GST_FORMAT_TIME) {
379 position_running_time =
380 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
381 stream->segment.position);
382 position_running_time = MAX (position_running_time, 0);
384 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
385 stream->segment.stop);
386 stop_running_time = MAX (position_running_time, 0);
388 if (stop_running_time != position_running_time) {
389 GST_WARNING_OBJECT (pad,
390 "Gap between position and segment stop: %" GST_TIME_FORMAT
391 " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
392 GST_TIME_ARGS (position_running_time));
395 if (stop_running_time < position_running_time) {
396 GST_DEBUG_OBJECT (pad, "Updating stop position");
397 stream->segment.stop = stream->segment.position;
398 gst_pad_push_event (stream->srcpad,
399 gst_event_new_segment (&stream->segment));
401 stop_running_time = MAX (stop_running_time, position_running_time);
402 GST_DEBUG_OBJECT (pad,
403 "Stop running time of last group: %" GST_TIME_FORMAT,
404 GST_TIME_ARGS (stop_running_time));
406 stream->new_stream = FALSE;
407 stream->drop_discont = TRUE;
409 if (stop_running_time < self->group_start_time) {
410 gint64 diff = self->group_start_time - stop_running_time;
412 GST_DEBUG_OBJECT (pad,
413 "Advancing running time for other streams by: %"
414 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
416 segment.base += diff;
420 GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
422 gst_segment_copy_into (&segment, &stream->segment);
423 GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
426 GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
427 GST_TIME_ARGS (stream->segment.base));
428 stream->running_time_diff = stream->segment.base;
430 GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
431 gst_format_get_name (segment.format));
432 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
434 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
437 case GST_EVENT_FLUSH_STOP:{
440 GST_STREAM_SYNCHRONIZER_LOCK (self);
441 stream = gst_pad_get_element_private (pad);
443 GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
444 stream->stream_number);
445 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
447 stream->is_eos = FALSE;
448 stream->wait = FALSE;
449 stream->new_stream = FALSE;
450 stream->drop_discont = FALSE;
451 stream->seen_data = FALSE;
453 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
459 gboolean all_eos = TRUE;
464 GST_STREAM_SYNCHRONIZER_LOCK (self);
465 stream = gst_pad_get_element_private (pad);
467 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
468 GST_WARNING_OBJECT (pad, "EOS for unknown stream");
472 GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
473 stream->is_eos = TRUE;
475 seen_data = stream->seen_data;
476 srcpad = gst_object_ref (stream->srcpad);
478 for (l = self->streams; l; l = l->next) {
479 GstStream *ostream = l->data;
481 all_eos = all_eos && ostream->is_eos;
487 GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
488 for (l = self->streams; l; l = l->next) {
489 GstStream *ostream = l->data;
490 /* local snapshot of current pads */
491 gst_object_ref (ostream->srcpad);
492 pads = g_slist_prepend (pads, ostream->srcpad);
495 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
496 /* drop lock when sending eos, which may block in e.g. preroll */
505 GST_DEBUG_OBJECT (pad, "Pushing EOS");
506 ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
507 gst_object_unref (pad);
508 epad = g_slist_next (epad);
512 /* if EOS, but no data has passed, then send something to replace EOS
513 * for preroll purposes */
515 GstBuffer *buf = gst_buffer_new ();
517 gst_pad_push (srcpad, buf);
520 gst_object_unref (srcpad);
530 opad = gst_stream_get_other_pad_from_pad (pad);
532 ret = gst_pad_push_event (opad, event);
533 gst_object_unref (opad);
537 gst_object_unref (self);
543 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
545 GstStreamSynchronizer *self =
546 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
548 GstFlowReturn ret = GST_FLOW_ERROR;
550 GstClockTime timestamp = GST_CLOCK_TIME_NONE;
551 GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
554 opad = gst_stream_get_other_pad_from_pad (pad);
556 ret = gst_pad_push (opad, buffer);
557 gst_object_unref (opad);
562 GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
563 ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
564 " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
565 buffer, gst_buffer_get_size (buffer),
566 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
567 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
568 GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
570 timestamp = GST_BUFFER_TIMESTAMP (buffer);
571 if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
572 && GST_BUFFER_DURATION_IS_VALID (buffer))
573 timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
575 GST_STREAM_SYNCHRONIZER_LOCK (self);
576 stream = gst_pad_get_element_private (pad);
578 stream->seen_data = TRUE;
579 if (stream && stream->drop_discont) {
580 buffer = gst_buffer_make_writable (buffer);
581 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
582 stream->drop_discont = FALSE;
585 if (stream && stream->segment.format == GST_FORMAT_TIME
586 && GST_CLOCK_TIME_IS_VALID (timestamp)) {
588 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
589 GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
590 stream->segment.position = timestamp;
592 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
594 opad = gst_stream_get_other_pad_from_pad (pad);
596 ret = gst_pad_push (opad, buffer);
597 gst_object_unref (opad);
600 GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
601 if (ret == GST_FLOW_OK) {
604 GST_STREAM_SYNCHRONIZER_LOCK (self);
605 stream = gst_pad_get_element_private (pad);
606 if (stream && stream->segment.format == GST_FORMAT_TIME
607 && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
609 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
610 GST_TIME_ARGS (stream->segment.position),
611 GST_TIME_ARGS (timestamp_end));
612 stream->segment.position = timestamp_end;
615 /* Advance EOS streams if necessary. For non-EOS
616 * streams the demuxers should already do this! */
617 for (l = self->streams; l; l = l->next) {
618 GstStream *ostream = l->data;
621 if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
624 if (ostream->segment.position != -1)
625 position = ostream->segment.position;
627 position = ostream->segment.start;
629 /* Is there a 1 second lag? */
630 if (position != -1 && position + GST_SECOND < timestamp_end) {
631 gint64 new_start, new_stop;
633 new_start = timestamp_end - GST_SECOND;
634 if (ostream->segment.stop == -1)
637 new_stop = MAX (new_start, ostream->segment.stop);
639 GST_DEBUG_OBJECT (ostream->sinkpad,
640 "Advancing stream %u from %" GST_TIME_FORMAT " to %"
641 GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
642 GST_TIME_ARGS (new_start));
644 ostream->segment.start = new_start;
645 ostream->segment.stop = new_stop;
646 ostream->segment.time = new_start;
647 ostream->segment.position = new_start;
649 gst_pad_push_event (ostream->srcpad,
650 gst_event_new_segment (&ostream->segment));
653 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
658 gst_object_unref (self);
663 /* GstElement vfuncs */
665 gst_stream_synchronizer_request_new_pad (GstElement * element,
666 GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
668 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
672 GST_STREAM_SYNCHRONIZER_LOCK (self);
673 GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
674 self->current_stream_number);
676 stream = g_slice_new0 (GstStream);
677 stream->transform = self;
678 stream->stream_number = self->current_stream_number;
680 tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
681 stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
683 gst_pad_set_element_private (stream->sinkpad, stream);
684 gst_pad_set_iterate_internal_links_function (stream->sinkpad,
685 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
686 gst_pad_set_query_function (stream->sinkpad,
687 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
688 gst_pad_set_getcaps_function (stream->sinkpad,
689 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
690 gst_pad_set_acceptcaps_function (stream->sinkpad,
691 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
692 gst_pad_set_event_function (stream->sinkpad,
693 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
694 gst_pad_set_chain_function (stream->sinkpad,
695 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
697 tmp = g_strdup_printf ("src_%d", self->current_stream_number);
698 stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
700 gst_pad_set_element_private (stream->srcpad, stream);
701 gst_pad_set_iterate_internal_links_function (stream->srcpad,
702 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
703 gst_pad_set_query_function (stream->srcpad,
704 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
705 gst_pad_set_getcaps_function (stream->srcpad,
706 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
707 gst_pad_set_acceptcaps_function (stream->srcpad,
708 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
709 gst_pad_set_event_function (stream->srcpad,
710 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
712 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
714 self->streams = g_list_prepend (self->streams, stream);
715 self->current_stream_number++;
716 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
718 /* Add pads and activate unless we're going to NULL */
719 g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
720 if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
721 gst_pad_set_active (stream->srcpad, TRUE);
722 gst_pad_set_active (stream->sinkpad, TRUE);
724 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
725 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
726 g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
728 return stream->sinkpad;
731 /* Must be called with lock! */
733 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
738 GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
740 for (l = self->streams; l; l = l->next) {
741 if (l->data == stream) {
742 self->streams = g_list_delete_link (self->streams, l);
746 g_assert (l != NULL);
748 /* we can drop the lock, since stream exists now only local.
749 * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
750 * (due to reverse lock order) when deactivating pads */
751 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
753 gst_pad_set_element_private (stream->srcpad, NULL);
754 gst_pad_set_element_private (stream->sinkpad, NULL);
755 gst_pad_set_active (stream->srcpad, FALSE);
756 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
757 gst_pad_set_active (stream->sinkpad, FALSE);
758 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
760 if (stream->segment.format == GST_FORMAT_TIME) {
761 gint64 stop_running_time;
762 gint64 position_running_time;
765 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
766 stream->segment.stop);
767 position_running_time =
768 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
769 stream->segment.position);
770 stop_running_time = MAX (stop_running_time, position_running_time);
772 GST_DEBUG_OBJECT (stream->sinkpad,
773 "Stop running time was: %" GST_TIME_FORMAT,
774 GST_TIME_ARGS (stop_running_time));
776 self->group_start_time = MAX (self->group_start_time, stop_running_time);
779 g_slice_free (GstStream, stream);
781 /* NOTE: In theory we have to check here if all streams
782 * are EOS but the one that was removed wasn't and then
783 * send EOS downstream. But due to the way how playsink
784 * works this is not necessary and will only cause problems
785 * for gapless playback. playsink will only add/remove pads
786 * when it's reconfigured, which happens when the streams
790 /* lock for good measure, since the caller had it */
791 GST_STREAM_SYNCHRONIZER_LOCK (self);
795 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
797 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
800 GST_STREAM_SYNCHRONIZER_LOCK (self);
801 stream = gst_pad_get_element_private (pad);
803 g_assert (stream->sinkpad == pad);
805 gst_stream_synchronizer_release_stream (self, stream);
807 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
810 static GstStateChangeReturn
811 gst_stream_synchronizer_change_state (GstElement * element,
812 GstStateChange transition)
814 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
815 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
817 switch (transition) {
818 case GST_STATE_CHANGE_NULL_TO_READY:
819 GST_DEBUG_OBJECT (self, "State change NULL->READY");
820 self->shutdown = FALSE;
822 case GST_STATE_CHANGE_READY_TO_PAUSED:
823 GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
824 self->group_start_time = 0;
825 self->shutdown = FALSE;
827 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
828 GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
830 case GST_STATE_CHANGE_PAUSED_TO_READY:
831 GST_DEBUG_OBJECT (self, "State change READY->NULL");
833 GST_STREAM_SYNCHRONIZER_LOCK (self);
834 g_cond_broadcast (self->stream_finish_cond);
835 self->shutdown = TRUE;
836 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
842 GstStateChangeReturn bret;
844 bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
845 GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
846 if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
850 switch (transition) {
851 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
852 GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
854 case GST_STATE_CHANGE_PAUSED_TO_READY:{
857 GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
858 self->group_start_time = 0;
860 GST_STREAM_SYNCHRONIZER_LOCK (self);
861 for (l = self->streams; l; l = l->next) {
862 GstStream *stream = l->data;
864 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
865 stream->wait = FALSE;
866 stream->new_stream = FALSE;
867 stream->drop_discont = FALSE;
868 stream->is_eos = FALSE;
870 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
873 case GST_STATE_CHANGE_READY_TO_NULL:{
874 GST_DEBUG_OBJECT (self, "State change READY->NULL");
876 GST_STREAM_SYNCHRONIZER_LOCK (self);
877 while (self->streams)
878 gst_stream_synchronizer_release_stream (self, self->streams->data);
879 self->current_stream_number = 0;
880 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
892 gst_stream_synchronizer_finalize (GObject * object)
894 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
897 g_mutex_free (self->lock);
901 if (self->stream_finish_cond) {
902 g_cond_free (self->stream_finish_cond);
903 self->stream_finish_cond = NULL;
906 G_OBJECT_CLASS (parent_class)->finalize (object);
909 /* GObject type initialization */
911 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
913 self->lock = g_mutex_new ();
914 self->stream_finish_cond = g_cond_new ();
918 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
920 GObjectClass *gobject_class = (GObjectClass *) klass;
921 GstElementClass *element_class = (GstElementClass *) klass;
923 GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
924 "streamsynchronizer", 0, "Stream Synchronizer");
926 gobject_class->finalize = gst_stream_synchronizer_finalize;
928 gst_element_class_add_pad_template (element_class,
929 gst_static_pad_template_get (&srctemplate));
930 gst_element_class_add_pad_template (element_class,
931 gst_static_pad_template_get (&sinktemplate));
933 gst_element_class_set_details_simple (element_class,
934 "Stream Synchronizer", "Generic",
935 "Synchronizes a group of streams to have equal durations and starting points",
936 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
938 element_class->change_state =
939 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
940 element_class->request_new_pad =
941 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
942 element_class->release_pad =
943 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);