2 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
24 #include "gststreamsynchronizer.h"
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START { \
30 GST_LOG_OBJECT (obj, \
31 "locking from thread %p", \
33 g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
34 GST_LOG_OBJECT (obj, \
35 "locked from thread %p", \
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START { \
40 GST_LOG_OBJECT (obj, \
41 "unlocking from thread %p", \
43 g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
55 GST_BOILERPLATE (GstStreamSynchronizer, gst_stream_synchronizer,
56 GstElement, GST_TYPE_ELEMENT);
60 GstStreamSynchronizer *transform;
68 gboolean drop_discont;
72 gint64 running_time_diff;
75 /* Must be called with lock! */
77 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
79 if (stream->sinkpad == pad)
80 return gst_object_ref (stream->srcpad);
81 else if (stream->srcpad == pad)
82 return gst_object_ref (stream->sinkpad);
88 gst_stream_get_other_pad_from_pad (GstPad * pad)
90 GstStreamSynchronizer *self =
91 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
95 GST_STREAM_SYNCHRONIZER_LOCK (self);
96 stream = gst_pad_get_element_private (pad);
100 opad = gst_stream_get_other_pad (stream, pad);
103 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
104 gst_object_unref (self);
107 GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
112 /* Generic pad functions */
114 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
116 GstIterator *it = NULL;
119 opad = gst_stream_get_other_pad_from_pad (pad);
121 it = gst_iterator_new_single (GST_TYPE_PAD, opad,
122 (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
123 gst_object_unref (opad);
130 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
133 gboolean ret = FALSE;
135 GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
137 opad = gst_stream_get_other_pad_from_pad (pad);
139 ret = gst_pad_peer_query (opad, query);
140 gst_object_unref (opad);
147 gst_stream_synchronizer_getcaps (GstPad * pad)
152 opad = gst_stream_get_other_pad_from_pad (pad);
154 ret = gst_pad_peer_get_caps (opad);
155 gst_object_unref (opad);
159 ret = gst_caps_new_any ();
161 GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
167 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
170 gboolean ret = FALSE;
172 opad = gst_stream_get_other_pad_from_pad (pad);
174 ret = gst_pad_peer_accept_caps (opad, caps);
175 gst_object_unref (opad);
178 GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
184 /* srcpad functions */
186 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
188 GstStreamSynchronizer *self =
189 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
191 gboolean ret = FALSE;
193 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
194 GST_EVENT_TYPE_NAME (event), event->structure);
196 switch (GST_EVENT_TYPE (event)) {
199 GstClockTimeDiff diff;
200 GstClockTime timestamp;
201 gint64 running_time_diff;
204 gst_event_parse_qos (event, &proportion, &diff, ×tamp);
205 gst_event_unref (event);
207 GST_STREAM_SYNCHRONIZER_LOCK (self);
208 stream = gst_pad_get_element_private (pad);
210 running_time_diff = stream->running_time_diff;
212 running_time_diff = -1;
213 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
215 if (running_time_diff == -1) {
216 GST_WARNING_OBJECT (pad, "QOS event before group start");
218 } else if (timestamp < running_time_diff) {
219 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
224 "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
225 GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
226 GST_TIME_ARGS (running_time_diff),
227 GST_TIME_ARGS (timestamp - running_time_diff));
229 timestamp -= running_time_diff;
231 /* That case is invalid for QoS events */
232 if (diff < 0 && -diff > timestamp) {
233 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
238 event = gst_event_new_qos (proportion, diff, timestamp);
245 opad = gst_stream_get_other_pad_from_pad (pad);
247 ret = gst_pad_push_event (opad, event);
248 gst_object_unref (opad);
252 gst_object_unref (self);
257 /* sinkpad functions */
259 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
261 GstStreamSynchronizer *self =
262 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
264 gboolean ret = FALSE;
266 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
267 GST_EVENT_TYPE_NAME (event), event->structure);
269 switch (GST_EVENT_TYPE (event)) {
270 case GST_EVENT_SINK_MESSAGE:{
273 gst_event_parse_sink_message (event, &message);
274 if (message->structure
275 && gst_structure_has_name (message->structure,
276 "playbin2-stream-changed")) {
279 GST_STREAM_SYNCHRONIZER_LOCK (self);
280 stream = gst_pad_get_element_private (pad);
283 gboolean all_wait = TRUE;
285 GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
287 stream->is_eos = FALSE;
289 stream->new_stream = TRUE;
291 for (l = self->streams; l; l = l->next) {
292 GstStream *ostream = l->data;
294 all_wait = all_wait && ostream->wait;
299 gint64 last_stop = 0;
301 GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
303 for (l = self->streams; l; l = l->next) {
304 GstStream *ostream = l->data;
305 gint64 stop_running_time;
306 gint64 last_stop_running_time;
308 ostream->wait = FALSE;
311 gst_segment_to_running_time (&ostream->segment,
312 GST_FORMAT_TIME, ostream->segment.stop);
313 last_stop_running_time =
314 gst_segment_to_running_time (&ostream->segment,
315 GST_FORMAT_TIME, ostream->segment.last_stop);
317 MAX (last_stop, MAX (stop_running_time,
318 last_stop_running_time));
320 last_stop = MAX (0, last_stop);
321 self->group_start_time = MAX (self->group_start_time, last_stop);
323 GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
324 GST_TIME_ARGS (self->group_start_time));
326 g_cond_broadcast (self->stream_finish_cond);
329 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
331 gst_message_unref (message);
334 case GST_EVENT_NEWSEGMENT:{
337 gdouble rate, applied_rate;
339 gint64 start, stop, position;
341 gst_event_parse_new_segment_full (event,
342 &update, &rate, &applied_rate, &format, &start, &stop, &position);
344 GST_STREAM_SYNCHRONIZER_LOCK (self);
345 stream = gst_pad_get_element_private (pad);
348 GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
349 g_cond_wait (self->stream_finish_cond, self->lock);
350 stream = gst_pad_get_element_private (pad);
352 stream->wait = FALSE;
356 if (self->shutdown) {
357 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
358 gst_event_unref (event);
362 if (stream && format == GST_FORMAT_TIME) {
363 if (stream->new_stream) {
364 gint64 last_stop_running_time = 0;
365 gint64 stop_running_time = 0;
367 if (stream->segment.format == GST_FORMAT_TIME) {
368 last_stop_running_time =
369 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
370 stream->segment.last_stop);
371 last_stop_running_time = MAX (last_stop_running_time, 0);
373 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
374 stream->segment.stop);
375 stop_running_time = MAX (last_stop_running_time, 0);
377 if (stop_running_time != last_stop_running_time) {
378 GST_WARNING_OBJECT (pad,
379 "Gap between last_stop and segment stop: %" GST_TIME_FORMAT
380 " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
381 GST_TIME_ARGS (last_stop_running_time));
384 if (stop_running_time < last_stop_running_time) {
385 GST_DEBUG_OBJECT (pad, "Updating stop position");
386 gst_pad_push_event (stream->srcpad,
387 gst_event_new_new_segment_full (TRUE, stream->segment.rate,
388 stream->segment.applied_rate, GST_FORMAT_TIME,
389 stream->segment.start, stream->segment.last_stop,
390 stream->segment.time));
391 gst_segment_set_newsegment_full (&stream->segment, TRUE,
392 stream->segment.rate, stream->segment.applied_rate,
393 GST_FORMAT_TIME, stream->segment.start,
394 stream->segment.last_stop, stream->segment.time);
396 stop_running_time = MAX (stop_running_time, last_stop_running_time);
397 GST_DEBUG_OBJECT (pad,
398 "Stop running time of last group: %" GST_TIME_FORMAT,
399 GST_TIME_ARGS (stop_running_time));
401 stream->new_stream = FALSE;
402 stream->drop_discont = TRUE;
404 if (stop_running_time < self->group_start_time) {
405 gint64 diff = self->group_start_time - stop_running_time;
407 GST_DEBUG_OBJECT (pad,
408 "Advancing running time for other streams by: %"
409 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
410 gst_pad_push_event (stream->srcpad,
411 gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
412 GST_FORMAT_TIME, 0, diff, 0));
413 gst_segment_set_newsegment_full (&stream->segment, FALSE, 1.0, 1.0,
414 GST_FORMAT_TIME, 0, diff, 0);
418 GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
420 gst_segment_set_newsegment_full (&stream->segment, update, rate,
421 applied_rate, format, start, stop, position);
422 GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
425 GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
426 GST_TIME_ARGS (stream->segment.accum));
427 stream->running_time_diff = stream->segment.accum;
429 GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
430 gst_format_get_name (format));
431 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
433 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
436 case GST_EVENT_FLUSH_STOP:{
439 GST_STREAM_SYNCHRONIZER_LOCK (self);
440 stream = gst_pad_get_element_private (pad);
442 GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
443 stream->stream_number);
444 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
446 stream->is_eos = FALSE;
447 stream->wait = FALSE;
448 stream->new_stream = FALSE;
449 stream->drop_discont = FALSE;
450 stream->seen_data = FALSE;
452 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
458 gboolean all_eos = TRUE;
463 GST_STREAM_SYNCHRONIZER_LOCK (self);
464 stream = gst_pad_get_element_private (pad);
466 GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
467 stream->is_eos = TRUE;
470 seen_data = stream->seen_data;
471 srcpad = gst_object_ref (stream->srcpad);
473 for (l = self->streams; l; l = l->next) {
474 GstStream *ostream = l->data;
476 all_eos = all_eos && ostream->is_eos;
482 GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
483 for (l = self->streams; l; l = l->next) {
484 GstStream *ostream = l->data;
485 /* local snapshot of current pads */
486 gst_object_ref (ostream->srcpad);
487 pads = g_slist_prepend (pads, ostream->srcpad);
490 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
491 /* drop lock when sending eos, which may block in e.g. preroll */
500 GST_DEBUG_OBJECT (pad, "Pushing EOS");
501 ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
502 gst_object_unref (pad);
503 epad = g_slist_next (epad);
507 /* if EOS, but no data has passed, then send something to replace EOS
508 * for preroll purposes */
510 GstBuffer *buf = gst_buffer_new ();
512 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
513 gst_pad_push (srcpad, buf);
516 gst_object_unref (srcpad);
524 opad = gst_stream_get_other_pad_from_pad (pad);
526 ret = gst_pad_push_event (opad, event);
527 gst_object_unref (opad);
531 gst_object_unref (self);
537 gst_stream_synchronizer_sink_bufferalloc (GstPad * pad, guint64 offset,
538 guint size, GstCaps * caps, GstBuffer ** buf)
541 GstFlowReturn ret = GST_FLOW_ERROR;
543 GST_LOG_OBJECT (pad, "Allocating buffer: size=%u", size);
545 opad = gst_stream_get_other_pad_from_pad (pad);
547 ret = gst_pad_alloc_buffer (opad, offset, size, caps, buf);
548 gst_object_unref (opad);
551 GST_LOG_OBJECT (pad, "Allocation: %s", gst_flow_get_name (ret));
557 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
559 GstStreamSynchronizer *self =
560 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
562 GstFlowReturn ret = GST_FLOW_ERROR;
564 GstClockTime timestamp = GST_CLOCK_TIME_NONE;
565 GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
567 GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
568 GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
569 " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
570 buffer, GST_BUFFER_SIZE (buffer),
571 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
572 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
573 GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
575 timestamp = GST_BUFFER_TIMESTAMP (buffer);
576 if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
577 && GST_BUFFER_DURATION_IS_VALID (buffer))
578 timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
580 GST_STREAM_SYNCHRONIZER_LOCK (self);
581 stream = gst_pad_get_element_private (pad);
583 stream->seen_data = TRUE;
584 if (stream && stream->drop_discont) {
585 buffer = gst_buffer_make_metadata_writable (buffer);
586 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
587 stream->drop_discont = FALSE;
590 if (stream && stream->segment.format == GST_FORMAT_TIME
591 && GST_CLOCK_TIME_IS_VALID (timestamp)) {
593 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
594 GST_TIME_ARGS (stream->segment.last_stop), GST_TIME_ARGS (timestamp));
595 gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME, timestamp);
597 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
599 opad = gst_stream_get_other_pad_from_pad (pad);
601 ret = gst_pad_push (opad, buffer);
602 gst_object_unref (opad);
605 GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
606 if (ret == GST_FLOW_OK) {
609 GST_STREAM_SYNCHRONIZER_LOCK (self);
610 stream = gst_pad_get_element_private (pad);
611 if (stream && stream->segment.format == GST_FORMAT_TIME
612 && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
614 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
615 GST_TIME_ARGS (stream->segment.last_stop),
616 GST_TIME_ARGS (timestamp_end));
617 gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME,
621 /* Advance EOS streams if necessary. For non-EOS
622 * streams the demuxers should already do this! */
623 for (l = self->streams; l; l = l->next) {
624 GstStream *ostream = l->data;
627 if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
630 if (ostream->segment.last_stop != -1)
631 last_stop = ostream->segment.last_stop;
633 last_stop = ostream->segment.start;
635 /* Is there a 1 second lag? */
636 if (last_stop != -1 && last_stop + GST_SECOND < timestamp_end) {
639 new_start = timestamp_end - GST_SECOND;
641 GST_DEBUG_OBJECT (ostream->sinkpad,
642 "Advancing stream %u from %" GST_TIME_FORMAT " to %"
643 GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (last_stop),
644 GST_TIME_ARGS (new_start));
646 gst_pad_push_event (ostream->srcpad,
647 gst_event_new_new_segment_full (TRUE, ostream->segment.rate,
648 ostream->segment.applied_rate, ostream->segment.format,
649 new_start, ostream->segment.stop, new_start));
650 gst_segment_set_newsegment_full (&ostream->segment, TRUE,
651 ostream->segment.rate, ostream->segment.applied_rate,
652 ostream->segment.format, new_start, ostream->segment.stop,
654 gst_segment_set_last_stop (&ostream->segment, GST_FORMAT_TIME,
658 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
661 gst_object_unref (self);
666 /* GstElement vfuncs */
668 gst_stream_synchronizer_request_new_pad (GstElement * element,
669 GstPadTemplate * temp, const gchar * name)
671 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
675 GST_STREAM_SYNCHRONIZER_LOCK (self);
676 GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
677 self->current_stream_number);
679 stream = g_slice_new0 (GstStream);
680 stream->transform = self;
681 stream->stream_number = self->current_stream_number;
683 tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
684 stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
686 gst_pad_set_element_private (stream->sinkpad, stream);
687 gst_pad_set_iterate_internal_links_function (stream->sinkpad,
688 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
689 gst_pad_set_query_function (stream->sinkpad,
690 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
691 gst_pad_set_getcaps_function (stream->sinkpad,
692 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
693 gst_pad_set_acceptcaps_function (stream->sinkpad,
694 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
695 gst_pad_set_event_function (stream->sinkpad,
696 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
697 gst_pad_set_chain_function (stream->sinkpad,
698 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
699 gst_pad_set_bufferalloc_function (stream->sinkpad,
700 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_bufferalloc));
702 tmp = g_strdup_printf ("src_%d", self->current_stream_number);
703 stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
705 gst_pad_set_element_private (stream->srcpad, stream);
706 gst_pad_set_iterate_internal_links_function (stream->srcpad,
707 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
708 gst_pad_set_query_function (stream->srcpad,
709 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
710 gst_pad_set_getcaps_function (stream->srcpad,
711 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
712 gst_pad_set_acceptcaps_function (stream->srcpad,
713 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
714 gst_pad_set_event_function (stream->srcpad,
715 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
717 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
719 self->streams = g_list_prepend (self->streams, stream);
720 self->current_stream_number++;
721 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
723 /* Add pads and activate unless we're going to NULL */
724 g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
725 if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
726 gst_pad_set_active (stream->srcpad, TRUE);
727 gst_pad_set_active (stream->sinkpad, TRUE);
729 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
730 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
731 g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
733 return stream->sinkpad;
736 /* Must be called with lock! */
738 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
743 GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
745 for (l = self->streams; l; l = l->next) {
746 if (l->data == stream) {
747 self->streams = g_list_delete_link (self->streams, l);
751 g_assert (l != NULL);
753 /* we can drop the lock, since stream exists now only local.
754 * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
755 * (due to reverse lock order) when deactivating pads */
756 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
758 gst_pad_set_element_private (stream->srcpad, NULL);
759 gst_pad_set_element_private (stream->sinkpad, NULL);
760 gst_pad_set_active (stream->srcpad, FALSE);
761 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
762 gst_pad_set_active (stream->sinkpad, FALSE);
763 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
765 if (stream->segment.format == GST_FORMAT_TIME) {
766 gint64 stop_running_time;
767 gint64 last_stop_running_time;
770 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
771 stream->segment.stop);
772 last_stop_running_time =
773 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
774 stream->segment.last_stop);
775 stop_running_time = MAX (stop_running_time, last_stop_running_time);
777 GST_DEBUG_OBJECT (stream->sinkpad,
778 "Stop running time was: %" GST_TIME_FORMAT,
779 GST_TIME_ARGS (stop_running_time));
781 self->group_start_time = MAX (self->group_start_time, stop_running_time);
784 g_slice_free (GstStream, stream);
786 /* NOTE: In theory we have to check here if all streams
787 * are EOS but the one that was removed wasn't and then
788 * send EOS downstream. But due to the way how playsink
789 * works this is not necessary and will only cause problems
790 * for gapless playback. playsink will only add/remove pads
791 * when it's reconfigured, which happens when the streams
795 /* lock for good measure, since the caller had it */
796 GST_STREAM_SYNCHRONIZER_LOCK (self);
800 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
802 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
805 GST_STREAM_SYNCHRONIZER_LOCK (self);
806 stream = gst_pad_get_element_private (pad);
808 g_assert (stream->sinkpad == pad);
810 gst_stream_synchronizer_release_stream (self, stream);
812 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
815 static GstStateChangeReturn
816 gst_stream_synchronizer_change_state (GstElement * element,
817 GstStateChange transition)
819 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
820 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
822 switch (transition) {
823 case GST_STATE_CHANGE_NULL_TO_READY:
824 GST_DEBUG_OBJECT (self, "State change NULL->READY");
825 self->shutdown = FALSE;
827 case GST_STATE_CHANGE_READY_TO_PAUSED:
828 GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
829 self->group_start_time = 0;
830 self->shutdown = FALSE;
832 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
833 GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
835 case GST_STATE_CHANGE_PAUSED_TO_READY:
836 GST_DEBUG_OBJECT (self, "State change READY->NULL");
838 GST_STREAM_SYNCHRONIZER_LOCK (self);
839 g_cond_broadcast (self->stream_finish_cond);
840 self->shutdown = TRUE;
841 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
847 GstStateChangeReturn bret;
849 bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
850 GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
851 if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
855 switch (transition) {
856 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
857 GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
859 case GST_STATE_CHANGE_PAUSED_TO_READY:{
862 GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
863 self->group_start_time = 0;
865 GST_STREAM_SYNCHRONIZER_LOCK (self);
866 for (l = self->streams; l; l = l->next) {
867 GstStream *stream = l->data;
869 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
870 stream->wait = FALSE;
871 stream->new_stream = FALSE;
872 stream->drop_discont = FALSE;
873 stream->is_eos = FALSE;
875 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
878 case GST_STATE_CHANGE_READY_TO_NULL:{
879 GST_DEBUG_OBJECT (self, "State change READY->NULL");
881 GST_STREAM_SYNCHRONIZER_LOCK (self);
882 while (self->streams)
883 gst_stream_synchronizer_release_stream (self, self->streams->data);
884 self->current_stream_number = 0;
885 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
897 gst_stream_synchronizer_finalize (GObject * object)
899 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
902 g_mutex_free (self->lock);
906 if (self->stream_finish_cond) {
907 g_cond_free (self->stream_finish_cond);
908 self->stream_finish_cond = NULL;
911 G_OBJECT_CLASS (parent_class)->finalize (object);
914 /* GObject type initialization */
916 gst_stream_synchronizer_init (GstStreamSynchronizer * self,
917 GstStreamSynchronizerClass * klass)
919 self->lock = g_mutex_new ();
920 self->stream_finish_cond = g_cond_new ();
924 gst_stream_synchronizer_base_init (gpointer g_class)
926 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
928 gst_element_class_add_pad_template (gstelement_class,
929 gst_static_pad_template_get (&srctemplate));
930 gst_element_class_add_pad_template (gstelement_class,
931 gst_static_pad_template_get (&sinktemplate));
933 gst_element_class_set_details_simple (gstelement_class,
934 "Stream Synchronizer", "Generic",
935 "Synchronizes a group of streams to have equal durations and starting points",
936 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
940 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
942 GObjectClass *gobject_class = (GObjectClass *) klass;
943 GstElementClass *element_class = (GstElementClass *) klass;
945 GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
946 "streamsynchronizer", 0, "Stream Synchronizer");
948 gobject_class->finalize = gst_stream_synchronizer_finalize;
950 element_class->change_state =
951 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
952 element_class->request_new_pad =
953 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
954 element_class->release_pad =
955 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);