2 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
24 #include "gststreamsynchronizer.h"
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START { \
30 GST_LOG_OBJECT (obj, \
31 "locking from thread %p", \
33 g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
34 GST_LOG_OBJECT (obj, \
35 "locked from thread %p", \
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START { \
40 GST_LOG_OBJECT (obj, \
41 "unlocking from thread %p", \
43 g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
55 static const gboolean passthrough = TRUE;
57 #define gst_stream_synchronizer_parent_class parent_class
58 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
63 GstStreamSynchronizer *transform;
71 gboolean drop_discont;
75 gint64 running_time_diff;
78 /* Must be called with lock! */
80 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
82 if (stream->sinkpad == pad)
83 return gst_object_ref (stream->srcpad);
84 else if (stream->srcpad == pad)
85 return gst_object_ref (stream->sinkpad);
91 gst_stream_get_other_pad_from_pad (GstPad * pad)
93 GstStreamSynchronizer *self =
94 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
98 GST_STREAM_SYNCHRONIZER_LOCK (self);
99 stream = gst_pad_get_element_private (pad);
103 opad = gst_stream_get_other_pad (stream, pad);
106 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
107 gst_object_unref (self);
110 GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
115 /* Generic pad functions */
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
119 GstIterator *it = NULL;
122 opad = gst_stream_get_other_pad_from_pad (pad);
124 GValue value = { 0, };
126 g_value_init (&value, GST_TYPE_PAD);
127 g_value_set_object (&value, opad);
128 it = gst_iterator_new_single (GST_TYPE_PAD, &value);
129 g_value_unset (&value);
130 gst_object_unref (opad);
137 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
140 gboolean ret = FALSE;
142 GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
144 opad = gst_stream_get_other_pad_from_pad (pad);
146 ret = gst_pad_peer_query (opad, query);
147 gst_object_unref (opad);
154 gst_stream_synchronizer_getcaps (GstPad * pad, GstCaps * filter)
159 opad = gst_stream_get_other_pad_from_pad (pad);
161 ret = gst_pad_peer_get_caps (opad, filter);
162 gst_object_unref (opad);
166 ret = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
168 GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
174 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
177 gboolean ret = FALSE;
179 opad = gst_stream_get_other_pad_from_pad (pad);
181 ret = gst_pad_peer_accept_caps (opad, caps);
182 gst_object_unref (opad);
185 GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
191 /* srcpad functions */
193 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
195 GstStreamSynchronizer *self =
196 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
198 gboolean ret = FALSE;
201 goto skip_adjustments;
203 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
204 GST_EVENT_TYPE_NAME (event), event);
206 switch (GST_EVENT_TYPE (event)) {
209 GstClockTimeDiff diff;
210 GstClockTime timestamp;
211 gint64 running_time_diff;
214 gst_event_parse_qos (event, NULL, &proportion, &diff, ×tamp);
215 gst_event_unref (event);
217 GST_STREAM_SYNCHRONIZER_LOCK (self);
218 stream = gst_pad_get_element_private (pad);
220 running_time_diff = stream->running_time_diff;
222 running_time_diff = -1;
223 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
225 if (running_time_diff == -1) {
226 GST_WARNING_OBJECT (pad, "QOS event before group start");
228 } else if (timestamp < running_time_diff) {
229 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
234 "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
235 GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
236 GST_TIME_ARGS (running_time_diff),
237 GST_TIME_ARGS (timestamp - running_time_diff));
239 timestamp -= running_time_diff;
241 /* That case is invalid for QoS events */
242 if (diff < 0 && -diff > timestamp) {
243 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
249 gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
259 opad = gst_stream_get_other_pad_from_pad (pad);
261 ret = gst_pad_push_event (opad, event);
262 gst_object_unref (opad);
266 gst_object_unref (self);
271 /* sinkpad functions */
273 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
275 GstStreamSynchronizer *self =
276 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
278 gboolean ret = FALSE;
281 goto skip_adjustments;
283 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
284 GST_EVENT_TYPE_NAME (event), event);
286 switch (GST_EVENT_TYPE (event)) {
287 case GST_EVENT_SINK_MESSAGE:{
290 gst_event_parse_sink_message (event, &message);
291 if (gst_message_has_name (message, "playbin-stream-changed")) {
294 GST_STREAM_SYNCHRONIZER_LOCK (self);
295 stream = gst_pad_get_element_private (pad);
298 gboolean all_wait = TRUE;
300 GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
302 stream->is_eos = FALSE;
304 stream->new_stream = TRUE;
306 for (l = self->streams; l; l = l->next) {
307 GstStream *ostream = l->data;
309 all_wait = all_wait && ostream->wait;
316 GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
318 for (l = self->streams; l; l = l->next) {
319 GstStream *ostream = l->data;
320 gint64 stop_running_time;
321 gint64 position_running_time;
323 ostream->wait = FALSE;
326 gst_segment_to_running_time (&ostream->segment,
327 GST_FORMAT_TIME, ostream->segment.stop);
328 position_running_time =
329 gst_segment_to_running_time (&ostream->segment,
330 GST_FORMAT_TIME, ostream->segment.position);
332 MAX (position, MAX (stop_running_time,
333 position_running_time));
335 position = MAX (0, position);
336 self->group_start_time = MAX (self->group_start_time, position);
338 GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
339 GST_TIME_ARGS (self->group_start_time));
341 g_cond_broadcast (self->stream_finish_cond);
344 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
346 gst_message_unref (message);
349 case GST_EVENT_SEGMENT:{
353 gst_event_copy_segment (event, &segment);
355 GST_STREAM_SYNCHRONIZER_LOCK (self);
356 stream = gst_pad_get_element_private (pad);
359 GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
360 g_cond_wait (self->stream_finish_cond, self->lock);
361 stream = gst_pad_get_element_private (pad);
363 stream->wait = FALSE;
367 if (self->shutdown) {
368 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
369 gst_event_unref (event);
373 if (stream && segment.format == GST_FORMAT_TIME) {
374 if (stream->new_stream) {
375 gint64 position_running_time = 0;
376 gint64 stop_running_time = 0;
378 if (stream->segment.format == GST_FORMAT_TIME) {
379 position_running_time =
380 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
381 stream->segment.position);
382 position_running_time = MAX (position_running_time, 0);
384 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
385 stream->segment.stop);
386 stop_running_time = MAX (position_running_time, 0);
388 if (stop_running_time != position_running_time) {
389 GST_WARNING_OBJECT (pad,
390 "Gap between position and segment stop: %" GST_TIME_FORMAT
391 " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
392 GST_TIME_ARGS (position_running_time));
395 if (stop_running_time < position_running_time) {
396 GST_DEBUG_OBJECT (pad, "Updating stop position");
397 stream->segment.stop = stream->segment.position;
398 gst_pad_push_event (stream->srcpad,
399 gst_event_new_segment (&stream->segment));
401 stop_running_time = MAX (stop_running_time, position_running_time);
402 GST_DEBUG_OBJECT (pad,
403 "Stop running time of last group: %" GST_TIME_FORMAT,
404 GST_TIME_ARGS (stop_running_time));
406 stream->new_stream = FALSE;
407 stream->drop_discont = TRUE;
409 if (stop_running_time < self->group_start_time) {
410 gint64 diff = self->group_start_time - stop_running_time;
412 GST_DEBUG_OBJECT (pad,
413 "Advancing running time for other streams by: %"
414 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
416 segment.base += diff;
420 GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
422 gst_segment_copy_into (&segment, &stream->segment);
423 GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
426 GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
427 GST_TIME_ARGS (stream->segment.base));
428 stream->running_time_diff = stream->segment.base;
430 GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
431 gst_format_get_name (segment.format));
432 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
434 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
437 case GST_EVENT_FLUSH_STOP:{
440 GST_STREAM_SYNCHRONIZER_LOCK (self);
441 stream = gst_pad_get_element_private (pad);
443 GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
444 stream->stream_number);
445 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
447 stream->is_eos = FALSE;
448 stream->wait = FALSE;
449 stream->new_stream = FALSE;
450 stream->drop_discont = FALSE;
451 stream->seen_data = FALSE;
453 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
459 gboolean all_eos = TRUE;
464 GST_STREAM_SYNCHRONIZER_LOCK (self);
465 stream = gst_pad_get_element_private (pad);
467 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
468 GST_WARNING_OBJECT (pad, "EOS for unknown stream");
472 GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
473 stream->is_eos = TRUE;
475 seen_data = stream->seen_data;
476 srcpad = gst_object_ref (stream->srcpad);
478 for (l = self->streams; l; l = l->next) {
479 GstStream *ostream = l->data;
481 all_eos = all_eos && ostream->is_eos;
487 GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
488 for (l = self->streams; l; l = l->next) {
489 GstStream *ostream = l->data;
490 /* local snapshot of current pads */
491 gst_object_ref (ostream->srcpad);
492 pads = g_slist_prepend (pads, ostream->srcpad);
495 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
496 /* drop lock when sending eos, which may block in e.g. preroll */
505 GST_DEBUG_OBJECT (pad, "Pushing EOS");
506 ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
507 gst_object_unref (pad);
508 epad = g_slist_next (epad);
512 /* if EOS, but no data has passed, then send something to replace EOS
513 * for preroll purposes */
515 GstBuffer *buf = gst_buffer_new ();
517 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
518 gst_pad_push (srcpad, buf);
521 gst_object_unref (srcpad);
531 opad = gst_stream_get_other_pad_from_pad (pad);
533 ret = gst_pad_push_event (opad, event);
534 gst_object_unref (opad);
538 gst_object_unref (self);
544 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
546 GstStreamSynchronizer *self =
547 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
549 GstFlowReturn ret = GST_FLOW_ERROR;
551 GstClockTime timestamp = GST_CLOCK_TIME_NONE;
552 GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
555 opad = gst_stream_get_other_pad_from_pad (pad);
557 ret = gst_pad_push (opad, buffer);
558 gst_object_unref (opad);
563 GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
564 GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
565 " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
566 buffer, gst_buffer_get_size (buffer),
567 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
568 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
569 GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
571 timestamp = GST_BUFFER_TIMESTAMP (buffer);
572 if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
573 && GST_BUFFER_DURATION_IS_VALID (buffer))
574 timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
576 GST_STREAM_SYNCHRONIZER_LOCK (self);
577 stream = gst_pad_get_element_private (pad);
579 stream->seen_data = TRUE;
580 if (stream && stream->drop_discont) {
581 buffer = gst_buffer_make_writable (buffer);
582 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
583 stream->drop_discont = FALSE;
586 if (stream && stream->segment.format == GST_FORMAT_TIME
587 && GST_CLOCK_TIME_IS_VALID (timestamp)) {
589 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
590 GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
591 stream->segment.position = timestamp;
593 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
595 opad = gst_stream_get_other_pad_from_pad (pad);
597 ret = gst_pad_push (opad, buffer);
598 gst_object_unref (opad);
601 GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
602 if (ret == GST_FLOW_OK) {
605 GST_STREAM_SYNCHRONIZER_LOCK (self);
606 stream = gst_pad_get_element_private (pad);
607 if (stream && stream->segment.format == GST_FORMAT_TIME
608 && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
610 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
611 GST_TIME_ARGS (stream->segment.position),
612 GST_TIME_ARGS (timestamp_end));
613 stream->segment.position = timestamp_end;
616 /* Advance EOS streams if necessary. For non-EOS
617 * streams the demuxers should already do this! */
618 for (l = self->streams; l; l = l->next) {
619 GstStream *ostream = l->data;
622 if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
625 if (ostream->segment.position != -1)
626 position = ostream->segment.position;
628 position = ostream->segment.start;
630 /* Is there a 1 second lag? */
631 if (position != -1 && position + GST_SECOND < timestamp_end) {
632 gint64 new_start, new_stop;
634 new_start = timestamp_end - GST_SECOND;
635 if (ostream->segment.stop == -1)
638 new_stop = MAX (new_start, ostream->segment.stop);
640 GST_DEBUG_OBJECT (ostream->sinkpad,
641 "Advancing stream %u from %" GST_TIME_FORMAT " to %"
642 GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
643 GST_TIME_ARGS (new_start));
645 ostream->segment.start = new_start;
646 ostream->segment.stop = new_stop;
647 ostream->segment.time = new_start;
648 ostream->segment.position = new_start;
650 gst_pad_push_event (ostream->srcpad,
651 gst_event_new_segment (&ostream->segment));
654 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
659 gst_object_unref (self);
664 /* GstElement vfuncs */
666 gst_stream_synchronizer_request_new_pad (GstElement * element,
667 GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
669 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
673 GST_STREAM_SYNCHRONIZER_LOCK (self);
674 GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
675 self->current_stream_number);
677 stream = g_slice_new0 (GstStream);
678 stream->transform = self;
679 stream->stream_number = self->current_stream_number;
681 tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
682 stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
684 gst_pad_set_element_private (stream->sinkpad, stream);
685 gst_pad_set_iterate_internal_links_function (stream->sinkpad,
686 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
687 gst_pad_set_query_function (stream->sinkpad,
688 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
689 gst_pad_set_getcaps_function (stream->sinkpad,
690 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
691 gst_pad_set_acceptcaps_function (stream->sinkpad,
692 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
693 gst_pad_set_event_function (stream->sinkpad,
694 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
695 gst_pad_set_chain_function (stream->sinkpad,
696 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
698 tmp = g_strdup_printf ("src_%d", self->current_stream_number);
699 stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
701 gst_pad_set_element_private (stream->srcpad, stream);
702 gst_pad_set_iterate_internal_links_function (stream->srcpad,
703 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
704 gst_pad_set_query_function (stream->srcpad,
705 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
706 gst_pad_set_getcaps_function (stream->srcpad,
707 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
708 gst_pad_set_acceptcaps_function (stream->srcpad,
709 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
710 gst_pad_set_event_function (stream->srcpad,
711 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
713 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
715 self->streams = g_list_prepend (self->streams, stream);
716 self->current_stream_number++;
717 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
719 /* Add pads and activate unless we're going to NULL */
720 g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
721 if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
722 gst_pad_set_active (stream->srcpad, TRUE);
723 gst_pad_set_active (stream->sinkpad, TRUE);
725 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
726 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
727 g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
729 return stream->sinkpad;
732 /* Must be called with lock! */
734 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
739 GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
741 for (l = self->streams; l; l = l->next) {
742 if (l->data == stream) {
743 self->streams = g_list_delete_link (self->streams, l);
747 g_assert (l != NULL);
749 /* we can drop the lock, since stream exists now only local.
750 * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
751 * (due to reverse lock order) when deactivating pads */
752 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
754 gst_pad_set_element_private (stream->srcpad, NULL);
755 gst_pad_set_element_private (stream->sinkpad, NULL);
756 gst_pad_set_active (stream->srcpad, FALSE);
757 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
758 gst_pad_set_active (stream->sinkpad, FALSE);
759 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
761 if (stream->segment.format == GST_FORMAT_TIME) {
762 gint64 stop_running_time;
763 gint64 position_running_time;
766 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
767 stream->segment.stop);
768 position_running_time =
769 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
770 stream->segment.position);
771 stop_running_time = MAX (stop_running_time, position_running_time);
773 GST_DEBUG_OBJECT (stream->sinkpad,
774 "Stop running time was: %" GST_TIME_FORMAT,
775 GST_TIME_ARGS (stop_running_time));
777 self->group_start_time = MAX (self->group_start_time, stop_running_time);
780 g_slice_free (GstStream, stream);
782 /* NOTE: In theory we have to check here if all streams
783 * are EOS but the one that was removed wasn't and then
784 * send EOS downstream. But due to the way how playsink
785 * works this is not necessary and will only cause problems
786 * for gapless playback. playsink will only add/remove pads
787 * when it's reconfigured, which happens when the streams
791 /* lock for good measure, since the caller had it */
792 GST_STREAM_SYNCHRONIZER_LOCK (self);
796 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
798 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
801 GST_STREAM_SYNCHRONIZER_LOCK (self);
802 stream = gst_pad_get_element_private (pad);
804 g_assert (stream->sinkpad == pad);
806 gst_stream_synchronizer_release_stream (self, stream);
808 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
811 static GstStateChangeReturn
812 gst_stream_synchronizer_change_state (GstElement * element,
813 GstStateChange transition)
815 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
816 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
818 switch (transition) {
819 case GST_STATE_CHANGE_NULL_TO_READY:
820 GST_DEBUG_OBJECT (self, "State change NULL->READY");
821 self->shutdown = FALSE;
823 case GST_STATE_CHANGE_READY_TO_PAUSED:
824 GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
825 self->group_start_time = 0;
826 self->shutdown = FALSE;
828 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
829 GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
831 case GST_STATE_CHANGE_PAUSED_TO_READY:
832 GST_DEBUG_OBJECT (self, "State change READY->NULL");
834 GST_STREAM_SYNCHRONIZER_LOCK (self);
835 g_cond_broadcast (self->stream_finish_cond);
836 self->shutdown = TRUE;
837 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
843 GstStateChangeReturn bret;
845 bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
846 GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
847 if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
851 switch (transition) {
852 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
853 GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
855 case GST_STATE_CHANGE_PAUSED_TO_READY:{
858 GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
859 self->group_start_time = 0;
861 GST_STREAM_SYNCHRONIZER_LOCK (self);
862 for (l = self->streams; l; l = l->next) {
863 GstStream *stream = l->data;
865 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
866 stream->wait = FALSE;
867 stream->new_stream = FALSE;
868 stream->drop_discont = FALSE;
869 stream->is_eos = FALSE;
871 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
874 case GST_STATE_CHANGE_READY_TO_NULL:{
875 GST_DEBUG_OBJECT (self, "State change READY->NULL");
877 GST_STREAM_SYNCHRONIZER_LOCK (self);
878 while (self->streams)
879 gst_stream_synchronizer_release_stream (self, self->streams->data);
880 self->current_stream_number = 0;
881 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
893 gst_stream_synchronizer_finalize (GObject * object)
895 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
898 g_mutex_free (self->lock);
902 if (self->stream_finish_cond) {
903 g_cond_free (self->stream_finish_cond);
904 self->stream_finish_cond = NULL;
907 G_OBJECT_CLASS (parent_class)->finalize (object);
910 /* GObject type initialization */
912 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
914 self->lock = g_mutex_new ();
915 self->stream_finish_cond = g_cond_new ();
919 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
921 GObjectClass *gobject_class = (GObjectClass *) klass;
922 GstElementClass *element_class = (GstElementClass *) klass;
924 GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
925 "streamsynchronizer", 0, "Stream Synchronizer");
927 gobject_class->finalize = gst_stream_synchronizer_finalize;
929 gst_element_class_add_pad_template (element_class,
930 gst_static_pad_template_get (&srctemplate));
931 gst_element_class_add_pad_template (element_class,
932 gst_static_pad_template_get (&sinktemplate));
934 gst_element_class_set_details_simple (element_class,
935 "Stream Synchronizer", "Generic",
936 "Synchronizes a group of streams to have equal durations and starting points",
937 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
939 element_class->change_state =
940 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
941 element_class->request_new_pad =
942 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
943 element_class->release_pad =
944 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);