2 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
24 #include "gststreamsynchronizer.h"
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START { \
30 GST_LOG_OBJECT (obj, \
31 "locking from thread %p", \
33 g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
34 GST_LOG_OBJECT (obj, \
35 "locked from thread %p", \
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START { \
40 GST_LOG_OBJECT (obj, \
41 "unlocking from thread %p", \
43 g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
55 static const gboolean passthrough = TRUE;
57 #define gst_stream_synchronizer_parent_class parent_class
58 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
63 GstStreamSynchronizer *transform;
71 gboolean drop_discont;
75 gint64 running_time_diff;
78 /* Must be called with lock! */
80 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
82 if (stream->sinkpad == pad)
83 return gst_object_ref (stream->srcpad);
84 else if (stream->srcpad == pad)
85 return gst_object_ref (stream->sinkpad);
91 gst_stream_get_other_pad_from_pad (GstPad * pad)
93 GstStreamSynchronizer *self =
94 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
98 GST_STREAM_SYNCHRONIZER_LOCK (self);
99 stream = gst_pad_get_element_private (pad);
103 opad = gst_stream_get_other_pad (stream, pad);
106 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
107 gst_object_unref (self);
110 GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
115 /* Generic pad functions */
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
119 GstIterator *it = NULL;
122 opad = gst_stream_get_other_pad_from_pad (pad);
124 GValue value = { 0, };
126 g_value_init (&value, GST_TYPE_PAD);
127 g_value_set_object (&value, opad);
128 it = gst_iterator_new_single (GST_TYPE_PAD, &value);
129 g_value_unset (&value);
130 gst_object_unref (opad);
137 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
140 gboolean ret = FALSE;
142 GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
144 opad = gst_stream_get_other_pad_from_pad (pad);
146 ret = gst_pad_peer_query (opad, query);
147 gst_object_unref (opad);
154 gst_stream_synchronizer_getcaps (GstPad * pad)
159 opad = gst_stream_get_other_pad_from_pad (pad);
161 ret = gst_pad_peer_get_caps (opad);
162 gst_object_unref (opad);
166 ret = gst_caps_new_any ();
168 GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
174 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
177 gboolean ret = FALSE;
179 opad = gst_stream_get_other_pad_from_pad (pad);
181 ret = gst_pad_peer_accept_caps (opad, caps);
182 gst_object_unref (opad);
185 GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
191 /* srcpad functions */
193 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
195 GstStreamSynchronizer *self =
196 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
198 gboolean ret = FALSE;
201 goto skip_adjustments;
203 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
204 GST_EVENT_TYPE_NAME (event), event->structure);
206 switch (GST_EVENT_TYPE (event)) {
209 GstClockTimeDiff diff;
210 GstClockTime timestamp;
211 gint64 running_time_diff;
214 gst_event_parse_qos (event, NULL, &proportion, &diff, ×tamp);
215 gst_event_unref (event);
217 GST_STREAM_SYNCHRONIZER_LOCK (self);
218 stream = gst_pad_get_element_private (pad);
220 running_time_diff = stream->running_time_diff;
222 running_time_diff = -1;
223 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
225 if (running_time_diff == -1) {
226 GST_WARNING_OBJECT (pad, "QOS event before group start");
228 } else if (timestamp < running_time_diff) {
229 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
234 "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
235 GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
236 GST_TIME_ARGS (running_time_diff),
237 GST_TIME_ARGS (timestamp - running_time_diff));
239 timestamp -= running_time_diff;
241 /* That case is invalid for QoS events */
242 if (diff < 0 && -diff > timestamp) {
243 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
249 gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
259 opad = gst_stream_get_other_pad_from_pad (pad);
261 ret = gst_pad_push_event (opad, event);
262 gst_object_unref (opad);
266 gst_object_unref (self);
271 /* sinkpad functions */
273 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
275 GstStreamSynchronizer *self =
276 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
278 gboolean ret = FALSE;
281 goto skip_adjustments;
283 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
284 GST_EVENT_TYPE_NAME (event), event->structure);
286 switch (GST_EVENT_TYPE (event)) {
287 case GST_EVENT_SINK_MESSAGE:{
290 gst_event_parse_sink_message (event, &message);
291 if (message->structure
292 && gst_structure_has_name (message->structure,
293 "playbin2-stream-changed")) {
296 GST_STREAM_SYNCHRONIZER_LOCK (self);
297 stream = gst_pad_get_element_private (pad);
300 gboolean all_wait = TRUE;
302 GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
304 stream->is_eos = FALSE;
306 stream->new_stream = TRUE;
308 for (l = self->streams; l; l = l->next) {
309 GstStream *ostream = l->data;
311 all_wait = all_wait && ostream->wait;
316 gint64 last_stop = 0;
318 GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
320 for (l = self->streams; l; l = l->next) {
321 GstStream *ostream = l->data;
322 gint64 stop_running_time;
323 gint64 last_stop_running_time;
325 ostream->wait = FALSE;
328 gst_segment_to_running_time (&ostream->segment,
329 GST_FORMAT_TIME, ostream->segment.stop);
330 last_stop_running_time =
331 gst_segment_to_running_time (&ostream->segment,
332 GST_FORMAT_TIME, ostream->segment.last_stop);
334 MAX (last_stop, MAX (stop_running_time,
335 last_stop_running_time));
337 last_stop = MAX (0, last_stop);
338 self->group_start_time = MAX (self->group_start_time, last_stop);
340 GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
341 GST_TIME_ARGS (self->group_start_time));
343 g_cond_broadcast (self->stream_finish_cond);
346 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
348 gst_message_unref (message);
351 case GST_EVENT_NEWSEGMENT:{
354 gdouble rate, applied_rate;
356 gint64 start, stop, position;
358 gst_event_parse_new_segment (event,
359 &update, &rate, &applied_rate, &format, &start, &stop, &position);
361 GST_STREAM_SYNCHRONIZER_LOCK (self);
362 stream = gst_pad_get_element_private (pad);
365 GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
366 g_cond_wait (self->stream_finish_cond, self->lock);
367 stream = gst_pad_get_element_private (pad);
369 stream->wait = FALSE;
373 if (self->shutdown) {
374 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
375 gst_event_unref (event);
379 if (stream && format == GST_FORMAT_TIME) {
380 if (stream->new_stream) {
381 gint64 last_stop_running_time = 0;
382 gint64 stop_running_time = 0;
384 if (stream->segment.format == GST_FORMAT_TIME) {
385 last_stop_running_time =
386 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
387 stream->segment.last_stop);
388 last_stop_running_time = MAX (last_stop_running_time, 0);
390 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
391 stream->segment.stop);
392 stop_running_time = MAX (last_stop_running_time, 0);
394 if (stop_running_time != last_stop_running_time) {
395 GST_WARNING_OBJECT (pad,
396 "Gap between last_stop and segment stop: %" GST_TIME_FORMAT
397 " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
398 GST_TIME_ARGS (last_stop_running_time));
401 if (stop_running_time < last_stop_running_time) {
402 GST_DEBUG_OBJECT (pad, "Updating stop position");
403 gst_pad_push_event (stream->srcpad,
404 gst_event_new_new_segment (TRUE, stream->segment.rate,
405 stream->segment.applied_rate, GST_FORMAT_TIME,
406 stream->segment.start, stream->segment.last_stop,
407 stream->segment.time));
408 gst_segment_set_newsegment (&stream->segment, TRUE,
409 stream->segment.rate, stream->segment.applied_rate,
410 GST_FORMAT_TIME, stream->segment.start,
411 stream->segment.last_stop, stream->segment.time);
413 stop_running_time = MAX (stop_running_time, last_stop_running_time);
414 GST_DEBUG_OBJECT (pad,
415 "Stop running time of last group: %" GST_TIME_FORMAT,
416 GST_TIME_ARGS (stop_running_time));
418 stream->new_stream = FALSE;
419 stream->drop_discont = TRUE;
421 if (stop_running_time < self->group_start_time) {
422 gint64 diff = self->group_start_time - stop_running_time;
424 GST_DEBUG_OBJECT (pad,
425 "Advancing running time for other streams by: %"
426 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
427 gst_pad_push_event (stream->srcpad,
428 gst_event_new_new_segment (FALSE, 1.0, 1.0,
429 GST_FORMAT_TIME, 0, diff, 0));
430 gst_segment_set_newsegment (&stream->segment, FALSE, 1.0, 1.0,
431 GST_FORMAT_TIME, 0, diff, 0);
435 GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
437 gst_segment_set_newsegment (&stream->segment, update, rate,
438 applied_rate, format, start, stop, position);
439 GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
442 GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
443 GST_TIME_ARGS (stream->segment.accum));
444 stream->running_time_diff = stream->segment.accum;
446 GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
447 gst_format_get_name (format));
448 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
450 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
453 case GST_EVENT_FLUSH_STOP:{
456 GST_STREAM_SYNCHRONIZER_LOCK (self);
457 stream = gst_pad_get_element_private (pad);
459 GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
460 stream->stream_number);
461 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
463 stream->is_eos = FALSE;
464 stream->wait = FALSE;
465 stream->new_stream = FALSE;
466 stream->drop_discont = FALSE;
467 stream->seen_data = FALSE;
469 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
475 gboolean all_eos = TRUE;
480 GST_STREAM_SYNCHRONIZER_LOCK (self);
481 stream = gst_pad_get_element_private (pad);
483 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
484 GST_WARNING_OBJECT (pad, "EOS for unknown stream");
488 GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
489 stream->is_eos = TRUE;
491 seen_data = stream->seen_data;
492 srcpad = gst_object_ref (stream->srcpad);
494 for (l = self->streams; l; l = l->next) {
495 GstStream *ostream = l->data;
497 all_eos = all_eos && ostream->is_eos;
503 GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
504 for (l = self->streams; l; l = l->next) {
505 GstStream *ostream = l->data;
506 /* local snapshot of current pads */
507 gst_object_ref (ostream->srcpad);
508 pads = g_slist_prepend (pads, ostream->srcpad);
511 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
512 /* drop lock when sending eos, which may block in e.g. preroll */
521 GST_DEBUG_OBJECT (pad, "Pushing EOS");
522 ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
523 gst_object_unref (pad);
524 epad = g_slist_next (epad);
528 /* if EOS, but no data has passed, then send something to replace EOS
529 * for preroll purposes */
531 GstBuffer *buf = gst_buffer_new ();
533 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
534 gst_pad_push (srcpad, buf);
537 gst_object_unref (srcpad);
547 opad = gst_stream_get_other_pad_from_pad (pad);
549 ret = gst_pad_push_event (opad, event);
550 gst_object_unref (opad);
554 gst_object_unref (self);
560 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
562 GstStreamSynchronizer *self =
563 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
565 GstFlowReturn ret = GST_FLOW_ERROR;
567 GstClockTime timestamp = GST_CLOCK_TIME_NONE;
568 GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
571 opad = gst_stream_get_other_pad_from_pad (pad);
573 ret = gst_pad_push (opad, buffer);
574 gst_object_unref (opad);
579 GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
580 GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
581 " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
582 buffer, gst_buffer_get_size (buffer),
583 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
584 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
585 GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
587 timestamp = GST_BUFFER_TIMESTAMP (buffer);
588 if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
589 && GST_BUFFER_DURATION_IS_VALID (buffer))
590 timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
592 GST_STREAM_SYNCHRONIZER_LOCK (self);
593 stream = gst_pad_get_element_private (pad);
595 stream->seen_data = TRUE;
596 if (stream && stream->drop_discont) {
597 buffer = gst_buffer_make_writable (buffer);
598 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
599 stream->drop_discont = FALSE;
602 if (stream && stream->segment.format == GST_FORMAT_TIME
603 && GST_CLOCK_TIME_IS_VALID (timestamp)) {
605 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
606 GST_TIME_ARGS (stream->segment.last_stop), GST_TIME_ARGS (timestamp));
607 gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME, timestamp);
609 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
611 opad = gst_stream_get_other_pad_from_pad (pad);
613 ret = gst_pad_push (opad, buffer);
614 gst_object_unref (opad);
617 GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
618 if (ret == GST_FLOW_OK) {
621 GST_STREAM_SYNCHRONIZER_LOCK (self);
622 stream = gst_pad_get_element_private (pad);
623 if (stream && stream->segment.format == GST_FORMAT_TIME
624 && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
626 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
627 GST_TIME_ARGS (stream->segment.last_stop),
628 GST_TIME_ARGS (timestamp_end));
629 gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME,
633 /* Advance EOS streams if necessary. For non-EOS
634 * streams the demuxers should already do this! */
635 for (l = self->streams; l; l = l->next) {
636 GstStream *ostream = l->data;
639 if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
642 if (ostream->segment.last_stop != -1)
643 last_stop = ostream->segment.last_stop;
645 last_stop = ostream->segment.start;
647 /* Is there a 1 second lag? */
648 if (last_stop != -1 && last_stop + GST_SECOND < timestamp_end) {
649 gint64 new_start, new_stop;
651 new_start = timestamp_end - GST_SECOND;
652 if (ostream->segment.stop == -1)
655 new_stop = MAX (new_start, ostream->segment.stop);
657 GST_DEBUG_OBJECT (ostream->sinkpad,
658 "Advancing stream %u from %" GST_TIME_FORMAT " to %"
659 GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (last_stop),
660 GST_TIME_ARGS (new_start));
662 gst_pad_push_event (ostream->srcpad,
663 gst_event_new_new_segment (TRUE, ostream->segment.rate,
664 ostream->segment.applied_rate, ostream->segment.format,
665 new_start, new_stop, new_start));
666 gst_segment_set_newsegment (&ostream->segment, TRUE,
667 ostream->segment.rate, ostream->segment.applied_rate,
668 ostream->segment.format, new_start, new_stop, new_start);
669 gst_segment_set_last_stop (&ostream->segment, GST_FORMAT_TIME,
673 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
678 gst_object_unref (self);
683 /* GstElement vfuncs */
685 gst_stream_synchronizer_request_new_pad (GstElement * element,
686 GstPadTemplate * temp, const gchar * name)
688 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
692 GST_STREAM_SYNCHRONIZER_LOCK (self);
693 GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
694 self->current_stream_number);
696 stream = g_slice_new0 (GstStream);
697 stream->transform = self;
698 stream->stream_number = self->current_stream_number;
700 tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
701 stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
703 gst_pad_set_element_private (stream->sinkpad, stream);
704 gst_pad_set_iterate_internal_links_function (stream->sinkpad,
705 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
706 gst_pad_set_query_function (stream->sinkpad,
707 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
708 gst_pad_set_getcaps_function (stream->sinkpad,
709 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
710 gst_pad_set_acceptcaps_function (stream->sinkpad,
711 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
712 gst_pad_set_event_function (stream->sinkpad,
713 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
714 gst_pad_set_chain_function (stream->sinkpad,
715 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
717 tmp = g_strdup_printf ("src_%d", self->current_stream_number);
718 stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
720 gst_pad_set_element_private (stream->srcpad, stream);
721 gst_pad_set_iterate_internal_links_function (stream->srcpad,
722 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
723 gst_pad_set_query_function (stream->srcpad,
724 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
725 gst_pad_set_getcaps_function (stream->srcpad,
726 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
727 gst_pad_set_acceptcaps_function (stream->srcpad,
728 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
729 gst_pad_set_event_function (stream->srcpad,
730 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
732 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
734 self->streams = g_list_prepend (self->streams, stream);
735 self->current_stream_number++;
736 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
738 /* Add pads and activate unless we're going to NULL */
739 g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
740 if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
741 gst_pad_set_active (stream->srcpad, TRUE);
742 gst_pad_set_active (stream->sinkpad, TRUE);
744 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
745 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
746 g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
748 return stream->sinkpad;
751 /* Must be called with lock! */
753 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
758 GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
760 for (l = self->streams; l; l = l->next) {
761 if (l->data == stream) {
762 self->streams = g_list_delete_link (self->streams, l);
766 g_assert (l != NULL);
768 /* we can drop the lock, since stream exists now only local.
769 * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
770 * (due to reverse lock order) when deactivating pads */
771 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
773 gst_pad_set_element_private (stream->srcpad, NULL);
774 gst_pad_set_element_private (stream->sinkpad, NULL);
775 gst_pad_set_active (stream->srcpad, FALSE);
776 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
777 gst_pad_set_active (stream->sinkpad, FALSE);
778 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
780 if (stream->segment.format == GST_FORMAT_TIME) {
781 gint64 stop_running_time;
782 gint64 last_stop_running_time;
785 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
786 stream->segment.stop);
787 last_stop_running_time =
788 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
789 stream->segment.last_stop);
790 stop_running_time = MAX (stop_running_time, last_stop_running_time);
792 GST_DEBUG_OBJECT (stream->sinkpad,
793 "Stop running time was: %" GST_TIME_FORMAT,
794 GST_TIME_ARGS (stop_running_time));
796 self->group_start_time = MAX (self->group_start_time, stop_running_time);
799 g_slice_free (GstStream, stream);
801 /* NOTE: In theory we have to check here if all streams
802 * are EOS but the one that was removed wasn't and then
803 * send EOS downstream. But due to the way how playsink
804 * works this is not necessary and will only cause problems
805 * for gapless playback. playsink will only add/remove pads
806 * when it's reconfigured, which happens when the streams
810 /* lock for good measure, since the caller had it */
811 GST_STREAM_SYNCHRONIZER_LOCK (self);
815 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
817 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
820 GST_STREAM_SYNCHRONIZER_LOCK (self);
821 stream = gst_pad_get_element_private (pad);
823 g_assert (stream->sinkpad == pad);
825 gst_stream_synchronizer_release_stream (self, stream);
827 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
830 static GstStateChangeReturn
831 gst_stream_synchronizer_change_state (GstElement * element,
832 GstStateChange transition)
834 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
835 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
837 switch (transition) {
838 case GST_STATE_CHANGE_NULL_TO_READY:
839 GST_DEBUG_OBJECT (self, "State change NULL->READY");
840 self->shutdown = FALSE;
842 case GST_STATE_CHANGE_READY_TO_PAUSED:
843 GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
844 self->group_start_time = 0;
845 self->shutdown = FALSE;
847 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
848 GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
850 case GST_STATE_CHANGE_PAUSED_TO_READY:
851 GST_DEBUG_OBJECT (self, "State change READY->NULL");
853 GST_STREAM_SYNCHRONIZER_LOCK (self);
854 g_cond_broadcast (self->stream_finish_cond);
855 self->shutdown = TRUE;
856 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
862 GstStateChangeReturn bret;
864 bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
865 GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
866 if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
870 switch (transition) {
871 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
872 GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
874 case GST_STATE_CHANGE_PAUSED_TO_READY:{
877 GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
878 self->group_start_time = 0;
880 GST_STREAM_SYNCHRONIZER_LOCK (self);
881 for (l = self->streams; l; l = l->next) {
882 GstStream *stream = l->data;
884 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
885 stream->wait = FALSE;
886 stream->new_stream = FALSE;
887 stream->drop_discont = FALSE;
888 stream->is_eos = FALSE;
890 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
893 case GST_STATE_CHANGE_READY_TO_NULL:{
894 GST_DEBUG_OBJECT (self, "State change READY->NULL");
896 GST_STREAM_SYNCHRONIZER_LOCK (self);
897 while (self->streams)
898 gst_stream_synchronizer_release_stream (self, self->streams->data);
899 self->current_stream_number = 0;
900 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
912 gst_stream_synchronizer_finalize (GObject * object)
914 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
917 g_mutex_free (self->lock);
921 if (self->stream_finish_cond) {
922 g_cond_free (self->stream_finish_cond);
923 self->stream_finish_cond = NULL;
926 G_OBJECT_CLASS (parent_class)->finalize (object);
929 /* GObject type initialization */
931 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
933 self->lock = g_mutex_new ();
934 self->stream_finish_cond = g_cond_new ();
938 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
940 GObjectClass *gobject_class = (GObjectClass *) klass;
941 GstElementClass *element_class = (GstElementClass *) klass;
943 GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
944 "streamsynchronizer", 0, "Stream Synchronizer");
946 gobject_class->finalize = gst_stream_synchronizer_finalize;
948 gst_element_class_add_pad_template (element_class,
949 gst_static_pad_template_get (&srctemplate));
950 gst_element_class_add_pad_template (element_class,
951 gst_static_pad_template_get (&sinktemplate));
953 gst_element_class_set_details_simple (element_class,
954 "Stream Synchronizer", "Generic",
955 "Synchronizes a group of streams to have equal durations and starting points",
956 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
958 element_class->change_state =
959 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
960 element_class->request_new_pad =
961 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
962 element_class->release_pad =
963 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);