2 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
24 #include "gststreamsynchronizer.h"
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START { \
30 GST_LOG_OBJECT (obj, \
31 "locking from thread %p", \
33 g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
34 GST_LOG_OBJECT (obj, \
35 "locked from thread %p", \
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START { \
40 GST_LOG_OBJECT (obj, \
41 "unlocking from thread %p", \
43 g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
55 static const gboolean passthrough = TRUE;
57 GST_BOILERPLATE (GstStreamSynchronizer, gst_stream_synchronizer,
58 GstElement, GST_TYPE_ELEMENT);
62 GstStreamSynchronizer *transform;
70 gboolean drop_discont;
74 gint64 running_time_diff;
77 /* Must be called with lock! */
79 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
81 if (stream->sinkpad == pad)
82 return gst_object_ref (stream->srcpad);
83 else if (stream->srcpad == pad)
84 return gst_object_ref (stream->sinkpad);
90 gst_stream_get_other_pad_from_pad (GstPad * pad)
92 GstStreamSynchronizer *self =
93 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
97 GST_STREAM_SYNCHRONIZER_LOCK (self);
98 stream = gst_pad_get_element_private (pad);
102 opad = gst_stream_get_other_pad (stream, pad);
105 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
106 gst_object_unref (self);
109 GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
114 /* Generic pad functions */
116 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
118 GstIterator *it = NULL;
121 opad = gst_stream_get_other_pad_from_pad (pad);
123 it = gst_iterator_new_single (GST_TYPE_PAD, opad,
124 (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
125 gst_object_unref (opad);
132 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
135 gboolean ret = FALSE;
137 GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
139 opad = gst_stream_get_other_pad_from_pad (pad);
141 ret = gst_pad_peer_query (opad, query);
142 gst_object_unref (opad);
149 gst_stream_synchronizer_getcaps (GstPad * pad)
154 opad = gst_stream_get_other_pad_from_pad (pad);
156 ret = gst_pad_peer_get_caps (opad);
157 gst_object_unref (opad);
161 ret = gst_caps_new_any ();
163 GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
169 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
172 gboolean ret = FALSE;
174 opad = gst_stream_get_other_pad_from_pad (pad);
176 ret = gst_pad_peer_accept_caps (opad, caps);
177 gst_object_unref (opad);
180 GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
186 /* srcpad functions */
188 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
190 GstStreamSynchronizer *self =
191 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
193 gboolean ret = FALSE;
196 goto skip_adjustments;
198 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
199 GST_EVENT_TYPE_NAME (event), event->structure);
201 switch (GST_EVENT_TYPE (event)) {
204 GstClockTimeDiff diff;
205 GstClockTime timestamp;
206 gint64 running_time_diff;
209 gst_event_parse_qos (event, &proportion, &diff, ×tamp);
210 gst_event_unref (event);
212 GST_STREAM_SYNCHRONIZER_LOCK (self);
213 stream = gst_pad_get_element_private (pad);
215 running_time_diff = stream->running_time_diff;
217 running_time_diff = -1;
218 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
220 if (running_time_diff == -1) {
221 GST_WARNING_OBJECT (pad, "QOS event before group start");
223 } else if (timestamp < running_time_diff) {
224 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
229 "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
230 GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
231 GST_TIME_ARGS (running_time_diff),
232 GST_TIME_ARGS (timestamp - running_time_diff));
234 timestamp -= running_time_diff;
236 /* That case is invalid for QoS events */
237 if (diff < 0 && -diff > timestamp) {
238 GST_DEBUG_OBJECT (pad, "QOS event from previous group");
243 event = gst_event_new_qos (proportion, diff, timestamp);
252 opad = gst_stream_get_other_pad_from_pad (pad);
254 ret = gst_pad_push_event (opad, event);
255 gst_object_unref (opad);
259 gst_object_unref (self);
264 /* sinkpad functions */
266 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
268 GstStreamSynchronizer *self =
269 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
271 gboolean ret = FALSE;
274 goto skip_adjustments;
276 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
277 GST_EVENT_TYPE_NAME (event), event->structure);
279 switch (GST_EVENT_TYPE (event)) {
280 case GST_EVENT_SINK_MESSAGE:{
283 gst_event_parse_sink_message (event, &message);
284 if (message->structure
285 && gst_structure_has_name (message->structure,
286 "playbin2-stream-changed")) {
289 GST_STREAM_SYNCHRONIZER_LOCK (self);
290 stream = gst_pad_get_element_private (pad);
293 gboolean all_wait = TRUE;
295 GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
297 stream->is_eos = FALSE;
299 stream->new_stream = TRUE;
301 for (l = self->streams; l; l = l->next) {
302 GstStream *ostream = l->data;
304 all_wait = all_wait && ostream->wait;
309 gint64 last_stop = 0;
311 GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
313 for (l = self->streams; l; l = l->next) {
314 GstStream *ostream = l->data;
315 gint64 stop_running_time;
316 gint64 last_stop_running_time;
318 ostream->wait = FALSE;
321 gst_segment_to_running_time (&ostream->segment,
322 GST_FORMAT_TIME, ostream->segment.stop);
323 last_stop_running_time =
324 gst_segment_to_running_time (&ostream->segment,
325 GST_FORMAT_TIME, ostream->segment.last_stop);
327 MAX (last_stop, MAX (stop_running_time,
328 last_stop_running_time));
330 last_stop = MAX (0, last_stop);
331 self->group_start_time = MAX (self->group_start_time, last_stop);
333 GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
334 GST_TIME_ARGS (self->group_start_time));
336 g_cond_broadcast (self->stream_finish_cond);
339 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
341 gst_message_unref (message);
344 case GST_EVENT_NEWSEGMENT:{
347 gdouble rate, applied_rate;
349 gint64 start, stop, position;
351 gst_event_parse_new_segment_full (event,
352 &update, &rate, &applied_rate, &format, &start, &stop, &position);
354 GST_STREAM_SYNCHRONIZER_LOCK (self);
355 stream = gst_pad_get_element_private (pad);
358 GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
359 g_cond_wait (self->stream_finish_cond, self->lock);
360 stream = gst_pad_get_element_private (pad);
362 stream->wait = FALSE;
366 if (self->shutdown) {
367 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
368 gst_event_unref (event);
372 if (stream && format == GST_FORMAT_TIME) {
373 if (stream->new_stream) {
374 gint64 last_stop_running_time = 0;
375 gint64 stop_running_time = 0;
377 if (stream->segment.format == GST_FORMAT_TIME) {
378 last_stop_running_time =
379 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
380 stream->segment.last_stop);
381 last_stop_running_time = MAX (last_stop_running_time, 0);
383 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
384 stream->segment.stop);
385 stop_running_time = MAX (last_stop_running_time, 0);
387 if (stop_running_time != last_stop_running_time) {
388 GST_WARNING_OBJECT (pad,
389 "Gap between last_stop and segment stop: %" GST_TIME_FORMAT
390 " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
391 GST_TIME_ARGS (last_stop_running_time));
394 if (stop_running_time < last_stop_running_time) {
395 GST_DEBUG_OBJECT (pad, "Updating stop position");
396 gst_pad_push_event (stream->srcpad,
397 gst_event_new_new_segment_full (TRUE, stream->segment.rate,
398 stream->segment.applied_rate, GST_FORMAT_TIME,
399 stream->segment.start, stream->segment.last_stop,
400 stream->segment.time));
401 gst_segment_set_newsegment_full (&stream->segment, TRUE,
402 stream->segment.rate, stream->segment.applied_rate,
403 GST_FORMAT_TIME, stream->segment.start,
404 stream->segment.last_stop, stream->segment.time);
406 stop_running_time = MAX (stop_running_time, last_stop_running_time);
407 GST_DEBUG_OBJECT (pad,
408 "Stop running time of last group: %" GST_TIME_FORMAT,
409 GST_TIME_ARGS (stop_running_time));
411 stream->new_stream = FALSE;
412 stream->drop_discont = TRUE;
414 if (stop_running_time < self->group_start_time) {
415 gint64 diff = self->group_start_time - stop_running_time;
417 GST_DEBUG_OBJECT (pad,
418 "Advancing running time for other streams by: %"
419 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
420 gst_pad_push_event (stream->srcpad,
421 gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
422 GST_FORMAT_TIME, 0, diff, 0));
423 gst_segment_set_newsegment_full (&stream->segment, FALSE, 1.0, 1.0,
424 GST_FORMAT_TIME, 0, diff, 0);
428 GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
430 gst_segment_set_newsegment_full (&stream->segment, update, rate,
431 applied_rate, format, start, stop, position);
432 GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
435 GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
436 GST_TIME_ARGS (stream->segment.accum));
437 stream->running_time_diff = stream->segment.accum;
439 GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
440 gst_format_get_name (format));
441 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
443 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
446 case GST_EVENT_FLUSH_STOP:{
449 GST_STREAM_SYNCHRONIZER_LOCK (self);
450 stream = gst_pad_get_element_private (pad);
452 GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
453 stream->stream_number);
454 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
456 stream->is_eos = FALSE;
457 stream->wait = FALSE;
458 stream->new_stream = FALSE;
459 stream->drop_discont = FALSE;
460 stream->seen_data = FALSE;
462 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
468 gboolean all_eos = TRUE;
473 GST_STREAM_SYNCHRONIZER_LOCK (self);
474 stream = gst_pad_get_element_private (pad);
476 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
477 GST_WARNING_OBJECT (pad, "EOS for unknown stream");
481 GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
482 stream->is_eos = TRUE;
484 seen_data = stream->seen_data;
485 srcpad = gst_object_ref (stream->srcpad);
487 for (l = self->streams; l; l = l->next) {
488 GstStream *ostream = l->data;
490 all_eos = all_eos && ostream->is_eos;
496 GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
497 for (l = self->streams; l; l = l->next) {
498 GstStream *ostream = l->data;
499 /* local snapshot of current pads */
500 gst_object_ref (ostream->srcpad);
501 pads = g_slist_prepend (pads, ostream->srcpad);
504 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
505 /* drop lock when sending eos, which may block in e.g. preroll */
514 GST_DEBUG_OBJECT (pad, "Pushing EOS");
515 ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
516 gst_object_unref (pad);
517 epad = g_slist_next (epad);
521 /* if EOS, but no data has passed, then send something to replace EOS
522 * for preroll purposes */
524 GstBuffer *buf = gst_buffer_new ();
526 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
527 gst_pad_push (srcpad, buf);
530 gst_object_unref (srcpad);
540 opad = gst_stream_get_other_pad_from_pad (pad);
542 ret = gst_pad_push_event (opad, event);
543 gst_object_unref (opad);
547 gst_object_unref (self);
553 gst_stream_synchronizer_sink_bufferalloc (GstPad * pad, guint64 offset,
554 guint size, GstCaps * caps, GstBuffer ** buf)
557 GstFlowReturn ret = GST_FLOW_ERROR;
559 GST_LOG_OBJECT (pad, "Allocating buffer: size=%u", size);
561 opad = gst_stream_get_other_pad_from_pad (pad);
563 ret = gst_pad_alloc_buffer (opad, offset, size, caps, buf);
564 gst_object_unref (opad);
567 GST_LOG_OBJECT (pad, "Allocation: %s", gst_flow_get_name (ret));
573 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
575 GstStreamSynchronizer *self =
576 GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
578 GstFlowReturn ret = GST_FLOW_ERROR;
580 GstClockTime timestamp = GST_CLOCK_TIME_NONE;
581 GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
584 opad = gst_stream_get_other_pad_from_pad (pad);
586 ret = gst_pad_push (opad, buffer);
587 gst_object_unref (opad);
592 GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
593 GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
594 " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
595 buffer, gst_buffer_get_size (buffer),
596 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
597 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
598 GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
600 timestamp = GST_BUFFER_TIMESTAMP (buffer);
601 if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
602 && GST_BUFFER_DURATION_IS_VALID (buffer))
603 timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
605 GST_STREAM_SYNCHRONIZER_LOCK (self);
606 stream = gst_pad_get_element_private (pad);
608 stream->seen_data = TRUE;
609 if (stream && stream->drop_discont) {
610 buffer = gst_buffer_make_writable (buffer);
611 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
612 stream->drop_discont = FALSE;
615 if (stream && stream->segment.format == GST_FORMAT_TIME
616 && GST_CLOCK_TIME_IS_VALID (timestamp)) {
618 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
619 GST_TIME_ARGS (stream->segment.last_stop), GST_TIME_ARGS (timestamp));
620 gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME, timestamp);
622 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
624 opad = gst_stream_get_other_pad_from_pad (pad);
626 ret = gst_pad_push (opad, buffer);
627 gst_object_unref (opad);
630 GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
631 if (ret == GST_FLOW_OK) {
634 GST_STREAM_SYNCHRONIZER_LOCK (self);
635 stream = gst_pad_get_element_private (pad);
636 if (stream && stream->segment.format == GST_FORMAT_TIME
637 && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
639 "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
640 GST_TIME_ARGS (stream->segment.last_stop),
641 GST_TIME_ARGS (timestamp_end));
642 gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME,
646 /* Advance EOS streams if necessary. For non-EOS
647 * streams the demuxers should already do this! */
648 for (l = self->streams; l; l = l->next) {
649 GstStream *ostream = l->data;
652 if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
655 if (ostream->segment.last_stop != -1)
656 last_stop = ostream->segment.last_stop;
658 last_stop = ostream->segment.start;
660 /* Is there a 1 second lag? */
661 if (last_stop != -1 && last_stop + GST_SECOND < timestamp_end) {
662 gint64 new_start, new_stop;
664 new_start = timestamp_end - GST_SECOND;
665 if (ostream->segment.stop == -1)
668 new_stop = MAX (new_start, ostream->segment.stop);
670 GST_DEBUG_OBJECT (ostream->sinkpad,
671 "Advancing stream %u from %" GST_TIME_FORMAT " to %"
672 GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (last_stop),
673 GST_TIME_ARGS (new_start));
675 gst_pad_push_event (ostream->srcpad,
676 gst_event_new_new_segment_full (TRUE, ostream->segment.rate,
677 ostream->segment.applied_rate, ostream->segment.format,
678 new_start, new_stop, new_start));
679 gst_segment_set_newsegment_full (&ostream->segment, TRUE,
680 ostream->segment.rate, ostream->segment.applied_rate,
681 ostream->segment.format, new_start, new_stop, new_start);
682 gst_segment_set_last_stop (&ostream->segment, GST_FORMAT_TIME,
686 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
691 gst_object_unref (self);
696 /* GstElement vfuncs */
698 gst_stream_synchronizer_request_new_pad (GstElement * element,
699 GstPadTemplate * temp, const gchar * name)
701 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
705 GST_STREAM_SYNCHRONIZER_LOCK (self);
706 GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
707 self->current_stream_number);
709 stream = g_slice_new0 (GstStream);
710 stream->transform = self;
711 stream->stream_number = self->current_stream_number;
713 tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
714 stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
716 gst_pad_set_element_private (stream->sinkpad, stream);
717 gst_pad_set_iterate_internal_links_function (stream->sinkpad,
718 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
719 gst_pad_set_query_function (stream->sinkpad,
720 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
721 gst_pad_set_getcaps_function (stream->sinkpad,
722 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
723 gst_pad_set_acceptcaps_function (stream->sinkpad,
724 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
725 gst_pad_set_event_function (stream->sinkpad,
726 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
727 gst_pad_set_chain_function (stream->sinkpad,
728 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
729 gst_pad_set_bufferalloc_function (stream->sinkpad,
730 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_bufferalloc));
732 tmp = g_strdup_printf ("src_%d", self->current_stream_number);
733 stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
735 gst_pad_set_element_private (stream->srcpad, stream);
736 gst_pad_set_iterate_internal_links_function (stream->srcpad,
737 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
738 gst_pad_set_query_function (stream->srcpad,
739 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
740 gst_pad_set_getcaps_function (stream->srcpad,
741 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
742 gst_pad_set_acceptcaps_function (stream->srcpad,
743 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
744 gst_pad_set_event_function (stream->srcpad,
745 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
747 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
749 self->streams = g_list_prepend (self->streams, stream);
750 self->current_stream_number++;
751 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
753 /* Add pads and activate unless we're going to NULL */
754 g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
755 if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
756 gst_pad_set_active (stream->srcpad, TRUE);
757 gst_pad_set_active (stream->sinkpad, TRUE);
759 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
760 gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
761 g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
763 return stream->sinkpad;
766 /* Must be called with lock! */
768 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
773 GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
775 for (l = self->streams; l; l = l->next) {
776 if (l->data == stream) {
777 self->streams = g_list_delete_link (self->streams, l);
781 g_assert (l != NULL);
783 /* we can drop the lock, since stream exists now only local.
784 * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
785 * (due to reverse lock order) when deactivating pads */
786 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
788 gst_pad_set_element_private (stream->srcpad, NULL);
789 gst_pad_set_element_private (stream->sinkpad, NULL);
790 gst_pad_set_active (stream->srcpad, FALSE);
791 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
792 gst_pad_set_active (stream->sinkpad, FALSE);
793 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
795 if (stream->segment.format == GST_FORMAT_TIME) {
796 gint64 stop_running_time;
797 gint64 last_stop_running_time;
800 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
801 stream->segment.stop);
802 last_stop_running_time =
803 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
804 stream->segment.last_stop);
805 stop_running_time = MAX (stop_running_time, last_stop_running_time);
807 GST_DEBUG_OBJECT (stream->sinkpad,
808 "Stop running time was: %" GST_TIME_FORMAT,
809 GST_TIME_ARGS (stop_running_time));
811 self->group_start_time = MAX (self->group_start_time, stop_running_time);
814 g_slice_free (GstStream, stream);
816 /* NOTE: In theory we have to check here if all streams
817 * are EOS but the one that was removed wasn't and then
818 * send EOS downstream. But due to the way how playsink
819 * works this is not necessary and will only cause problems
820 * for gapless playback. playsink will only add/remove pads
821 * when it's reconfigured, which happens when the streams
825 /* lock for good measure, since the caller had it */
826 GST_STREAM_SYNCHRONIZER_LOCK (self);
830 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
832 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
835 GST_STREAM_SYNCHRONIZER_LOCK (self);
836 stream = gst_pad_get_element_private (pad);
838 g_assert (stream->sinkpad == pad);
840 gst_stream_synchronizer_release_stream (self, stream);
842 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
845 static GstStateChangeReturn
846 gst_stream_synchronizer_change_state (GstElement * element,
847 GstStateChange transition)
849 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
850 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
852 switch (transition) {
853 case GST_STATE_CHANGE_NULL_TO_READY:
854 GST_DEBUG_OBJECT (self, "State change NULL->READY");
855 self->shutdown = FALSE;
857 case GST_STATE_CHANGE_READY_TO_PAUSED:
858 GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
859 self->group_start_time = 0;
860 self->shutdown = FALSE;
862 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
863 GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
865 case GST_STATE_CHANGE_PAUSED_TO_READY:
866 GST_DEBUG_OBJECT (self, "State change READY->NULL");
868 GST_STREAM_SYNCHRONIZER_LOCK (self);
869 g_cond_broadcast (self->stream_finish_cond);
870 self->shutdown = TRUE;
871 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
877 GstStateChangeReturn bret;
879 bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
880 GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
881 if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
885 switch (transition) {
886 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
887 GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
889 case GST_STATE_CHANGE_PAUSED_TO_READY:{
892 GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
893 self->group_start_time = 0;
895 GST_STREAM_SYNCHRONIZER_LOCK (self);
896 for (l = self->streams; l; l = l->next) {
897 GstStream *stream = l->data;
899 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
900 stream->wait = FALSE;
901 stream->new_stream = FALSE;
902 stream->drop_discont = FALSE;
903 stream->is_eos = FALSE;
905 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
908 case GST_STATE_CHANGE_READY_TO_NULL:{
909 GST_DEBUG_OBJECT (self, "State change READY->NULL");
911 GST_STREAM_SYNCHRONIZER_LOCK (self);
912 while (self->streams)
913 gst_stream_synchronizer_release_stream (self, self->streams->data);
914 self->current_stream_number = 0;
915 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
927 gst_stream_synchronizer_finalize (GObject * object)
929 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
932 g_mutex_free (self->lock);
936 if (self->stream_finish_cond) {
937 g_cond_free (self->stream_finish_cond);
938 self->stream_finish_cond = NULL;
941 G_OBJECT_CLASS (parent_class)->finalize (object);
944 /* GObject type initialization */
946 gst_stream_synchronizer_init (GstStreamSynchronizer * self,
947 GstStreamSynchronizerClass * klass)
949 self->lock = g_mutex_new ();
950 self->stream_finish_cond = g_cond_new ();
954 gst_stream_synchronizer_base_init (gpointer g_class)
956 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
958 gst_element_class_add_pad_template (gstelement_class,
959 gst_static_pad_template_get (&srctemplate));
960 gst_element_class_add_pad_template (gstelement_class,
961 gst_static_pad_template_get (&sinktemplate));
963 gst_element_class_set_details_simple (gstelement_class,
964 "Stream Synchronizer", "Generic",
965 "Synchronizes a group of streams to have equal durations and starting points",
966 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
970 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
972 GObjectClass *gobject_class = (GObjectClass *) klass;
973 GstElementClass *element_class = (GstElementClass *) klass;
975 GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
976 "streamsynchronizer", 0, "Stream Synchronizer");
978 gobject_class->finalize = gst_stream_synchronizer_finalize;
980 element_class->change_state =
981 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
982 element_class->request_new_pad =
983 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
984 element_class->release_pad =
985 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);