2 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
24 #include "gstplaybackelements.h"
25 #include "gststreamsynchronizer.h"
27 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
28 #define GST_CAT_DEFAULT stream_synchronizer_debug
30 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START { \
31 GST_TRACE_OBJECT (obj, \
32 "locking from thread %p", \
34 g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
35 GST_TRACE_OBJECT (obj, \
36 "locked from thread %p", \
40 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START { \
41 GST_TRACE_OBJECT (obj, \
42 "unlocking from thread %p", \
44 g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
47 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
51 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
56 #define gst_stream_synchronizer_parent_class parent_class
57 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
60 playback_element_init (plugin);
61 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (streamsynchronizer, "streamsynchronizer",
62 GST_RANK_NONE, GST_TYPE_STREAM_SYNCHRONIZER, _do_init);
66 GstStreamSynchronizer *transform;
72 gboolean wait; /* TRUE if waiting/blocking */
73 gboolean is_eos; /* TRUE if EOS was received */
74 gboolean eos_sent; /* when EOS was sent downstream */
75 gboolean flushing; /* set after flush-start and before flush-stop */
77 gboolean send_gap_event;
78 GstClockTime gap_duration;
82 GCond stream_finish_cond;
84 /* seqnum of the previously received STREAM_START
85 * default: G_MAXUINT32 */
86 guint32 stream_start_seqnum;
87 guint32 segment_seqnum;
93 static GstSyncStream *
94 gst_syncstream_ref (GstSyncStream * stream)
96 g_return_val_if_fail (stream != NULL, NULL);
97 g_atomic_int_add (&stream->refcount, 1);
102 gst_syncstream_unref (GstSyncStream * stream)
104 g_return_if_fail (stream != NULL);
105 g_return_if_fail (stream->refcount > 0);
107 if (g_atomic_int_dec_and_test (&stream->refcount))
108 g_slice_free (GstSyncStream, stream);
112 #define GST_TYPE_STREAMSYNC_PAD (gst_streamsync_pad_get_type ())
113 #define GST_IS_STREAMSYNC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMSYNC_PAD))
114 #define GST_IS_STREAMSYNC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMSYNC_PAD))
115 #define GST_STREAMSYNC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPad))
116 #define GST_STREAMSYNC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPadClass))
117 typedef struct _GstStreamSyncPad GstStreamSyncPad;
118 typedef struct _GstStreamSyncPadClass GstStreamSyncPadClass;
120 struct _GstStreamSyncPad
124 GstSyncStream *stream;
126 /* Since we need to access data associated with a pad in this
127 * element, it's important to manage the respective lifetimes of the
128 * stored pad data and the pads themselves. Pad deactivation happens
129 * without mutual exclusion to the use of pad data in this element.
131 * The approach here is to have the sinkpad (the request pad) hold a
132 * strong reference onto the srcpad (so that it stays alive until
133 * the last pad is destroyed). Similarly the srcpad has a weak
134 * reference to the sinkpad (request pad) to ensure it knows when
135 * the pads are destroyed, since the pad data may be requested from
136 * either the srcpad or the sinkpad. This avoids a nasty set of
137 * potential race conditions.
139 * The code is arranged so that in the srcpad, the pad pointer is
140 * always NULL (not used) and in the sinkpad, the otherpad is always
146 struct _GstStreamSyncPadClass
148 GstPadClass parent_class;
151 static GType gst_streamsync_pad_get_type (void);
152 static GstSyncStream *gst_streamsync_pad_get_stream (GstPad * pad);
155 #define GST_STREAMSYNC_PAD_CAST(obj) ((GstStreamSyncPad *)obj)
156 G_DEFINE_TYPE (GstStreamSyncPad, gst_streamsync_pad, GST_TYPE_PAD);
158 static void gst_streamsync_pad_dispose (GObject * object);
161 gst_streamsync_pad_class_init (GstStreamSyncPadClass * klass)
163 GObjectClass *gobject_class;
164 gobject_class = G_OBJECT_CLASS (klass);
165 gobject_class->dispose = gst_streamsync_pad_dispose;
169 gst_streamsync_pad_init (GstStreamSyncPad * ppad)
174 gst_streamsync_pad_dispose (GObject * object)
176 GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (object);
178 if (GST_PAD_DIRECTION (spad) == GST_PAD_SINK)
179 gst_clear_object (&spad->pad);
181 g_weak_ref_clear (&spad->otherpad);
183 g_clear_pointer (&spad->stream, gst_syncstream_unref);
185 G_OBJECT_CLASS (gst_streamsync_pad_parent_class)->dispose (object);
189 gst_streamsync_pad_new_from_template (GstPadTemplate * templ,
192 g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL);
194 return GST_PAD_CAST (g_object_new (GST_TYPE_STREAMSYNC_PAD,
195 "name", name, "direction", templ->direction, "template", templ,
200 gst_streamsync_pad_new_from_static_template (GstStaticPadTemplate * templ,
204 GstPadTemplate *template;
206 template = gst_static_pad_template_get (templ);
207 pad = gst_streamsync_pad_new_from_template (template, name);
208 gst_object_unref (template);
213 static GstSyncStream *
214 gst_streamsync_pad_get_stream (GstPad * pad)
216 GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
217 return gst_syncstream_ref (spad->stream);
221 gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
223 GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
226 if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK)
227 opad = gst_object_ref (spad->pad);
229 opad = g_weak_ref_get (&spad->otherpad);
232 GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
237 /* Generic pad functions */
239 gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
242 GstIterator *it = NULL;
246 gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
248 GValue value = { 0, };
250 g_value_init (&value, GST_TYPE_PAD);
251 g_value_set_object (&value, opad);
252 it = gst_iterator_new_single (GST_TYPE_PAD, &value);
253 g_value_unset (&value);
254 gst_object_unref (opad);
261 set_event_rt_offset (GstStreamSynchronizer * self, GstPad * pad,
264 gint64 running_time_diff;
265 GstSyncStream *stream;
267 GST_STREAM_SYNCHRONIZER_LOCK (self);
268 stream = gst_streamsync_pad_get_stream (pad);
269 running_time_diff = stream->segment.base;
270 gst_syncstream_unref (stream);
271 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
273 if (running_time_diff != -1) {
276 event = gst_event_make_writable (event);
277 offset = gst_event_get_running_time_offset (event);
278 if (GST_PAD_IS_SRC (pad))
279 offset -= running_time_diff;
281 offset += running_time_diff;
283 gst_event_set_running_time_offset (event, offset);
289 /* srcpad functions */
291 gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
294 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
295 gboolean ret = FALSE;
297 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
298 GST_EVENT_TYPE_NAME (event), event);
300 event = set_event_rt_offset (self, pad, event);
302 ret = gst_pad_event_default (pad, parent, event);
307 /* must be called with the STREAM_SYNCHRONIZER_LOCK */
309 gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
311 gboolean ret = FALSE;
312 GstSyncStream *stream;
314 stream = gst_streamsync_pad_get_stream (pad);
316 while (!self->eos && !self->flushing) {
317 if (stream->flushing) {
318 GST_DEBUG_OBJECT (pad, "Flushing");
322 GST_DEBUG_OBJECT (pad, "Stream not waiting anymore");
326 if (stream->send_gap_event) {
329 if (!GST_CLOCK_TIME_IS_VALID (stream->segment.position)) {
330 GST_WARNING_OBJECT (pad, "Have no position and can't send GAP event");
331 stream->send_gap_event = FALSE;
336 gst_event_new_gap (stream->segment.position, stream->gap_duration);
337 GST_DEBUG_OBJECT (pad,
338 "Send GAP event, position: %" GST_TIME_FORMAT " duration: %"
339 GST_TIME_FORMAT, GST_TIME_ARGS (stream->segment.position),
340 GST_TIME_ARGS (stream->gap_duration));
342 /* drop lock when sending GAP event, which may block in e.g. preroll */
343 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
344 ret = gst_pad_push_event (pad, event);
345 GST_STREAM_SYNCHRONIZER_LOCK (self);
347 gst_syncstream_unref (stream);
350 stream->send_gap_event = FALSE;
352 /* force a check on the loop conditions as we unlocked a
353 * few lines above and those variables could have changed */
357 g_cond_wait (&stream->stream_finish_cond, &self->lock);
360 gst_syncstream_unref (stream);
364 /* sinkpad functions */
366 gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
369 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
370 gboolean ret = FALSE;
372 GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
373 GST_EVENT_TYPE_NAME (event), event);
375 switch (GST_EVENT_TYPE (event)) {
376 case GST_EVENT_STREAM_START:
378 GstSyncStream *stream, *ostream;
379 guint32 seqnum = gst_event_get_seqnum (event);
381 gboolean have_group_id;
383 gboolean all_wait = TRUE;
384 gboolean new_stream = TRUE;
386 have_group_id = gst_event_parse_group_id (event, &group_id);
388 GST_STREAM_SYNCHRONIZER_LOCK (self);
389 self->have_group_id &= have_group_id;
390 have_group_id = self->have_group_id;
393 stream = gst_streamsync_pad_get_stream (pad);
395 gst_event_parse_stream_flags (event, &stream->flags);
397 if ((have_group_id && stream->group_id != group_id) || (!have_group_id
398 && stream->stream_start_seqnum != seqnum)) {
399 stream->is_eos = FALSE;
400 stream->eos_sent = FALSE;
401 stream->flushing = FALSE;
402 stream->stream_start_seqnum = seqnum;
403 stream->group_id = group_id;
405 if (!have_group_id) {
406 /* Check if this belongs to a stream that is already there,
407 * e.g. we got the visualizations for an audio stream */
408 for (l = self->streams; l; l = l->next) {
411 if (ostream != stream && ostream->stream_start_seqnum == seqnum
419 GST_DEBUG_OBJECT (pad,
420 "Stream %d belongs to running stream %d, no waiting",
421 stream->stream_number, ostream->stream_number);
422 stream->wait = FALSE;
423 gst_syncstream_unref (stream);
424 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
427 } else if (group_id == self->group_id) {
428 GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, "
429 "no waiting", stream->stream_number, group_id);
430 gst_syncstream_unref (stream);
431 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
435 GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
439 for (l = self->streams; l; l = l->next) {
440 GstSyncStream *ostream = l->data;
442 all_wait = all_wait && ((ostream->flags & GST_STREAM_FLAG_SPARSE)
443 || (ostream->wait && (!have_group_id
444 || ostream->group_id == group_id)));
453 GST_DEBUG_OBJECT (self,
454 "All streams have changed to group id %u -- unblocking",
457 GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
459 self->group_id = group_id;
461 for (l = self->streams; l; l = l->next) {
462 GstSyncStream *ostream = l->data;
463 gint64 stop_running_time;
464 gint64 position_running_time;
466 ostream->wait = FALSE;
468 if (ostream->segment.format == GST_FORMAT_TIME) {
469 if (ostream->segment.rate > 0)
471 gst_segment_to_running_time (&ostream->segment,
472 GST_FORMAT_TIME, ostream->segment.stop);
475 gst_segment_to_running_time (&ostream->segment,
476 GST_FORMAT_TIME, ostream->segment.start);
478 position_running_time =
479 gst_segment_to_running_time (&ostream->segment,
480 GST_FORMAT_TIME, ostream->segment.position);
482 position_running_time =
483 MAX (position_running_time, stop_running_time);
485 if (ostream->segment.rate > 0)
486 position_running_time -=
487 gst_segment_to_running_time (&ostream->segment,
488 GST_FORMAT_TIME, ostream->segment.start);
490 position_running_time -=
491 gst_segment_to_running_time (&ostream->segment,
492 GST_FORMAT_TIME, ostream->segment.stop);
494 position_running_time = MAX (0, position_running_time);
496 position = MAX (position, position_running_time);
500 self->group_start_time += position;
502 GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
503 GST_TIME_ARGS (self->group_start_time));
505 for (l = self->streams; l; l = l->next) {
506 GstSyncStream *ostream = l->data;
507 ostream->wait = FALSE;
508 g_cond_broadcast (&ostream->stream_finish_cond);
513 gst_syncstream_unref (stream);
514 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
517 case GST_EVENT_SEGMENT:{
518 GstSyncStream *stream;
521 gst_event_copy_segment (event, &segment);
523 GST_STREAM_SYNCHRONIZER_LOCK (self);
525 gst_stream_synchronizer_wait (self, pad);
527 if (self->shutdown) {
528 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
529 gst_event_unref (event);
533 stream = gst_streamsync_pad_get_stream (pad);
534 if (segment.format == GST_FORMAT_TIME) {
535 GST_DEBUG_OBJECT (pad,
536 "New stream, updating base from %" GST_TIME_FORMAT " to %"
537 GST_TIME_FORMAT, GST_TIME_ARGS (segment.base),
538 GST_TIME_ARGS (segment.base + self->group_start_time));
539 segment.base += self->group_start_time;
541 GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
543 gst_segment_copy_into (&segment, &stream->segment);
544 GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
546 stream->segment_seqnum = gst_event_get_seqnum (event);
548 GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
549 GST_TIME_ARGS (stream->segment.base));
553 tmpev = gst_event_new_segment (&stream->segment);
554 gst_event_set_seqnum (tmpev, stream->segment_seqnum);
555 gst_event_unref (event);
559 GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
560 gst_format_get_name (segment.format));
561 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
563 gst_syncstream_unref (stream);
564 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
567 case GST_EVENT_FLUSH_START:{
568 GstSyncStream *stream;
570 GST_STREAM_SYNCHRONIZER_LOCK (self);
571 stream = gst_streamsync_pad_get_stream (pad);
573 GST_DEBUG_OBJECT (pad, "Flushing streams");
574 stream->flushing = TRUE;
575 g_cond_broadcast (&stream->stream_finish_cond);
576 gst_syncstream_unref (stream);
577 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
580 case GST_EVENT_FLUSH_STOP:{
581 GstSyncStream *stream;
583 GstClockTime new_group_start_time = 0;
585 GST_STREAM_SYNCHRONIZER_LOCK (self);
586 stream = gst_streamsync_pad_get_stream (pad);
587 GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
588 stream->stream_number);
589 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
591 stream->is_eos = FALSE;
592 stream->eos_sent = FALSE;
593 stream->flushing = FALSE;
594 stream->wait = FALSE;
595 g_cond_broadcast (&stream->stream_finish_cond);
597 for (l = self->streams; l; l = l->next) {
598 GstSyncStream *ostream = l->data;
599 GstClockTime start_running_time;
601 if (ostream == stream || ostream->flushing)
604 if (ostream->segment.format == GST_FORMAT_TIME) {
605 if (ostream->segment.rate > 0)
607 gst_segment_to_running_time (&ostream->segment,
608 GST_FORMAT_TIME, ostream->segment.start);
611 gst_segment_to_running_time (&ostream->segment,
612 GST_FORMAT_TIME, ostream->segment.stop);
614 new_group_start_time = MAX (new_group_start_time, start_running_time);
618 GST_DEBUG_OBJECT (pad,
619 "Updating group start time from %" GST_TIME_FORMAT " to %"
620 GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
621 GST_TIME_ARGS (new_group_start_time));
622 self->group_start_time = new_group_start_time;
624 gst_syncstream_unref (stream);
625 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
628 /* unblocking EOS wait when track switch. */
629 case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:{
630 if (gst_event_has_name (event, "playsink-custom-video-flush")
631 || gst_event_has_name (event, "playsink-custom-audio-flush")
632 || gst_event_has_name (event, "playsink-custom-subtitle-flush")) {
633 GstSyncStream *stream;
635 GST_STREAM_SYNCHRONIZER_LOCK (self);
636 stream = gst_streamsync_pad_get_stream (pad);
637 stream->is_eos = FALSE;
638 stream->eos_sent = FALSE;
639 stream->wait = FALSE;
640 g_cond_broadcast (&stream->stream_finish_cond);
641 gst_syncstream_unref (stream);
642 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
647 GstSyncStream *stream;
649 gboolean all_eos = TRUE;
653 GstClockTime timestamp;
656 GST_STREAM_SYNCHRONIZER_LOCK (self);
657 stream = gst_streamsync_pad_get_stream (pad);
659 GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
660 stream->is_eos = TRUE;
662 seen_data = stream->seen_data;
663 srcpad = gst_object_ref (stream->srcpad);
664 seqnum = stream->segment_seqnum;
666 if (seen_data && stream->segment.position != -1)
667 timestamp = stream->segment.position;
668 else if (stream->segment.rate < 0.0 || stream->segment.stop == -1)
669 timestamp = stream->segment.start;
671 timestamp = stream->segment.stop;
673 stream->segment.position = timestamp;
675 for (l = self->streams; l; l = l->next) {
676 GstSyncStream *ostream = l->data;
678 all_eos = all_eos && ostream->is_eos;
684 GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
686 for (l = self->streams; l; l = l->next) {
687 GstSyncStream *ostream = l->data;
688 /* local snapshot of current pads */
689 gst_object_ref (ostream->srcpad);
690 pads = g_slist_prepend (pads, ostream->srcpad);
696 GstSyncStream *ostream;
702 ostream = gst_streamsync_pad_get_stream (pad);
703 g_cond_broadcast (&ostream->stream_finish_cond);
704 gst_syncstream_unref (ostream);
705 gst_object_unref (pad);
706 epad = g_slist_next (epad);
711 stream->send_gap_event = TRUE;
712 stream->gap_duration = GST_CLOCK_TIME_NONE;
714 ret = gst_stream_synchronizer_wait (self, srcpad);
718 /* send eos if haven't seen data. seen_data will be true if data buffer
719 * of the track have received in anytime. sink is ready if seen_data is
720 * true, so can send GAP event. Will send EOS if sink isn't ready. The
721 * scenario for the case is one track haven't any media data and then
722 * send EOS. Or no any valid media data in one track, so decoder can't
723 * get valid CAPS for the track. sink can't ready without received CAPS.*/
724 if (!seen_data || self->eos) {
726 GST_DEBUG_OBJECT (pad, "send EOS event");
727 /* drop lock when sending eos, which may block in e.g. preroll */
728 topush = gst_event_new_eos ();
729 gst_event_set_seqnum (topush, seqnum);
730 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
731 ret = gst_pad_push_event (srcpad, topush);
732 GST_STREAM_SYNCHRONIZER_LOCK (self);
733 stream = gst_streamsync_pad_get_stream (pad);
734 stream->eos_sent = TRUE;
735 gst_syncstream_unref (stream);
738 gst_object_unref (srcpad);
739 gst_event_unref (event);
740 gst_syncstream_unref (stream);
741 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
748 event = set_event_rt_offset (self, pad, event);
750 ret = gst_pad_event_default (pad, parent, event);
758 gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
761 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
763 GstFlowReturn ret = GST_FLOW_ERROR;
764 GstSyncStream *stream;
765 GstClockTime duration = GST_CLOCK_TIME_NONE;
766 GstClockTime timestamp = GST_CLOCK_TIME_NONE;
767 GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
769 GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
770 ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
771 " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
772 buffer, gst_buffer_get_size (buffer),
773 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
774 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
775 GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
777 timestamp = GST_BUFFER_TIMESTAMP (buffer);
778 duration = GST_BUFFER_DURATION (buffer);
779 if (GST_CLOCK_TIME_IS_VALID (timestamp)
780 && GST_CLOCK_TIME_IS_VALID (duration))
781 timestamp_end = timestamp + duration;
783 GST_STREAM_SYNCHRONIZER_LOCK (self);
784 stream = gst_streamsync_pad_get_stream (pad);
786 stream->seen_data = TRUE;
787 if (stream->segment.format == GST_FORMAT_TIME
788 && GST_CLOCK_TIME_IS_VALID (timestamp)) {
790 "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
791 GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
792 if (stream->segment.rate > 0.0)
793 stream->segment.position = timestamp;
795 stream->segment.position = timestamp_end;
798 gst_syncstream_unref (stream);
799 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
801 opad = gst_stream_get_other_pad_from_pad (self, pad);
803 ret = gst_pad_push (opad, buffer);
804 gst_object_unref (opad);
807 GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
808 if (ret == GST_FLOW_OK) {
811 GST_STREAM_SYNCHRONIZER_LOCK (self);
812 stream = gst_streamsync_pad_get_stream (pad);
813 if (stream->segment.format == GST_FORMAT_TIME) {
814 GstClockTime position;
816 if (stream->segment.rate > 0.0)
817 position = timestamp_end;
819 position = timestamp;
821 if (GST_CLOCK_TIME_IS_VALID (position)) {
823 "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
824 GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (position));
825 stream->segment.position = position;
829 /* Advance EOS streams if necessary. For non-EOS
830 * streams the demuxers should already do this! */
831 if (!GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
832 GST_CLOCK_TIME_IS_VALID (timestamp)) {
833 timestamp_end = timestamp + GST_SECOND;
836 for (l = self->streams; l; l = l->next) {
837 GstSyncStream *ostream = l->data;
840 if (!ostream->is_eos || ostream->eos_sent ||
841 ostream->segment.format != GST_FORMAT_TIME)
844 if (ostream->segment.position != -1)
845 position = ostream->segment.position;
847 position = ostream->segment.start;
849 /* Is there a 1 second lag? */
850 if (position != -1 && GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
851 position + GST_SECOND < timestamp_end) {
854 new_start = timestamp_end - GST_SECOND;
856 GST_DEBUG_OBJECT (ostream->sinkpad,
857 "Advancing stream %u from %" GST_TIME_FORMAT " to %"
858 GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
859 GST_TIME_ARGS (new_start));
861 ostream->segment.position = new_start;
863 ostream->send_gap_event = TRUE;
864 ostream->gap_duration = new_start - position;
865 g_cond_broadcast (&ostream->stream_finish_cond);
868 gst_syncstream_unref (stream);
869 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
875 /* Must be called with lock! */
877 gst_stream_synchronizer_new_pad (GstStreamSynchronizer * sync)
879 GstSyncStream *stream = NULL;
880 GstStreamSyncPad *sinkpad, *srcpad;
883 stream = g_slice_new0 (GstSyncStream);
884 stream->transform = sync;
885 stream->stream_number = sync->current_stream_number;
886 g_cond_init (&stream->stream_finish_cond);
887 stream->stream_start_seqnum = G_MAXUINT32;
888 stream->segment_seqnum = G_MAXUINT32;
889 stream->group_id = G_MAXUINT;
890 stream->seen_data = FALSE;
891 stream->send_gap_event = FALSE;
892 stream->refcount = 1;
894 tmp = g_strdup_printf ("sink_%u", sync->current_stream_number);
896 gst_streamsync_pad_new_from_static_template (&sinktemplate, tmp);
899 GST_STREAMSYNC_PAD_CAST (stream->sinkpad)->stream =
900 gst_syncstream_ref (stream);
902 gst_pad_set_iterate_internal_links_function (stream->sinkpad,
903 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
904 gst_pad_set_event_function (stream->sinkpad,
905 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
906 gst_pad_set_chain_function (stream->sinkpad,
907 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
908 GST_PAD_SET_PROXY_CAPS (stream->sinkpad);
909 GST_PAD_SET_PROXY_ALLOCATION (stream->sinkpad);
910 GST_PAD_SET_PROXY_SCHEDULING (stream->sinkpad);
912 tmp = g_strdup_printf ("src_%u", sync->current_stream_number);
914 gst_streamsync_pad_new_from_static_template (&srctemplate, tmp);
917 GST_STREAMSYNC_PAD_CAST (stream->srcpad)->stream =
918 gst_syncstream_ref (stream);
920 sinkpad = GST_STREAMSYNC_PAD_CAST (stream->sinkpad);
921 srcpad = GST_STREAMSYNC_PAD_CAST (stream->srcpad);
922 /* Hold a strong reference from the sink (request pad) to the src to
923 * ensure a predicatable destruction order */
924 sinkpad->pad = gst_object_ref (srcpad);
925 /* And a weak reference from the src to the sink, to know when pad
926 * release is occuring, and to ensure we do not try and take
927 * references to inactive / destructing streams. */
928 g_weak_ref_init (&srcpad->otherpad, stream->sinkpad);
930 gst_pad_set_iterate_internal_links_function (stream->srcpad,
931 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
932 gst_pad_set_event_function (stream->srcpad,
933 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
934 GST_PAD_SET_PROXY_CAPS (stream->srcpad);
935 GST_PAD_SET_PROXY_ALLOCATION (stream->srcpad);
936 GST_PAD_SET_PROXY_SCHEDULING (stream->srcpad);
938 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
940 GST_STREAM_SYNCHRONIZER_UNLOCK (sync);
942 /* Add pads and activate unless we're going to NULL */
943 g_rec_mutex_lock (GST_STATE_GET_LOCK (sync));
944 if (GST_STATE_TARGET (sync) != GST_STATE_NULL) {
945 gst_pad_set_active (stream->srcpad, TRUE);
946 gst_pad_set_active (stream->sinkpad, TRUE);
948 gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->srcpad);
949 gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->sinkpad);
950 g_rec_mutex_unlock (GST_STATE_GET_LOCK (sync));
952 GST_STREAM_SYNCHRONIZER_LOCK (sync);
954 sync->streams = g_list_prepend (sync->streams, g_steal_pointer (&stream));
955 sync->current_stream_number++;
957 return GST_PAD_CAST (sinkpad);
960 /* GstElement vfuncs */
962 gst_stream_synchronizer_request_new_pad (GstElement * element,
963 GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
965 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
968 GST_STREAM_SYNCHRONIZER_LOCK (self);
969 GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
970 self->current_stream_number);
972 request_pad = gst_stream_synchronizer_new_pad (self);
974 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
979 /* Must be called with lock! */
981 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
982 GstSyncStream * stream)
986 GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
988 for (l = self->streams; l; l = l->next) {
989 if (l->data == stream) {
990 self->streams = g_list_delete_link (self->streams, l);
994 g_assert (l != NULL);
995 if (self->streams == NULL) {
996 self->have_group_id = TRUE;
997 self->group_id = G_MAXUINT;
1000 /* we can drop the lock, since stream exists now only local.
1001 * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
1002 * (due to reverse lock order) when deactivating pads */
1003 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1005 gst_pad_set_active (stream->srcpad, FALSE);
1006 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
1007 gst_pad_set_active (stream->sinkpad, FALSE);
1008 gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
1010 g_cond_clear (&stream->stream_finish_cond);
1012 /* Release the ref maintaining validity in the streams list */
1013 gst_syncstream_unref (stream);
1015 /* NOTE: In theory we have to check here if all streams
1016 * are EOS but the one that was removed wasn't and then
1017 * send EOS downstream. But due to the way how playsink
1018 * works this is not necessary and will only cause problems
1019 * for gapless playback. playsink will only add/remove pads
1020 * when it's reconfigured, which happens when the streams
1024 /* lock for good measure, since the caller had it */
1025 GST_STREAM_SYNCHRONIZER_LOCK (self);
1029 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
1031 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
1032 GstSyncStream *stream;
1034 GST_STREAM_SYNCHRONIZER_LOCK (self);
1035 stream = gst_streamsync_pad_get_stream (pad);
1036 g_assert (stream->sinkpad == pad);
1038 gst_stream_synchronizer_release_stream (self, stream);
1040 gst_syncstream_unref (stream);
1041 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1044 static GstStateChangeReturn
1045 gst_stream_synchronizer_change_state (GstElement * element,
1046 GstStateChange transition)
1048 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
1049 GstStateChangeReturn ret;
1051 switch (transition) {
1052 case GST_STATE_CHANGE_NULL_TO_READY:
1053 GST_DEBUG_OBJECT (self, "State change NULL->READY");
1054 self->shutdown = FALSE;
1056 case GST_STATE_CHANGE_READY_TO_PAUSED:
1057 GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
1058 self->group_start_time = 0;
1059 self->have_group_id = TRUE;
1060 self->group_id = G_MAXUINT;
1061 self->shutdown = FALSE;
1062 self->flushing = FALSE;
1065 case GST_STATE_CHANGE_PAUSED_TO_READY:{
1068 GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
1070 GST_STREAM_SYNCHRONIZER_LOCK (self);
1071 self->flushing = TRUE;
1072 self->shutdown = TRUE;
1073 for (l = self->streams; l; l = l->next) {
1074 GstSyncStream *ostream = l->data;
1075 g_cond_broadcast (&ostream->stream_finish_cond);
1077 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1083 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1084 GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
1085 if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
1088 switch (transition) {
1089 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:{
1092 GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
1094 GST_STREAM_SYNCHRONIZER_LOCK (self);
1095 for (l = self->streams; l; l = l->next) {
1096 GstSyncStream *stream = l->data;
1097 /* send GAP event to sink to finished pre-roll. The reason is function
1098 * chain () will be blocked on pad_push (), so can't trigger the track
1099 * which reach EOS to send GAP event. */
1100 if (stream->is_eos && !stream->eos_sent) {
1101 stream->send_gap_event = TRUE;
1102 stream->gap_duration = GST_CLOCK_TIME_NONE;
1103 g_cond_broadcast (&stream->stream_finish_cond);
1106 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1109 case GST_STATE_CHANGE_PAUSED_TO_READY:{
1112 GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
1113 self->group_start_time = 0;
1115 GST_STREAM_SYNCHRONIZER_LOCK (self);
1116 for (l = self->streams; l; l = l->next) {
1117 GstSyncStream *stream = l->data;
1119 gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
1120 stream->gap_duration = GST_CLOCK_TIME_NONE;
1121 stream->wait = FALSE;
1122 stream->is_eos = FALSE;
1123 stream->eos_sent = FALSE;
1124 stream->flushing = FALSE;
1125 stream->send_gap_event = FALSE;
1127 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1130 case GST_STATE_CHANGE_READY_TO_NULL:{
1131 GST_DEBUG_OBJECT (self, "State change READY->NULL");
1133 GST_STREAM_SYNCHRONIZER_LOCK (self);
1134 self->current_stream_number = 0;
1135 GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1145 /* GObject vfuncs */
1147 gst_stream_synchronizer_finalize (GObject * object)
1149 GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
1151 g_mutex_clear (&self->lock);
1153 G_OBJECT_CLASS (parent_class)->finalize (object);
1156 /* GObject type initialization */
1158 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
1160 g_mutex_init (&self->lock);
1164 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
1166 GObjectClass *gobject_class = (GObjectClass *) klass;
1167 GstElementClass *element_class = (GstElementClass *) klass;
1169 gobject_class->finalize = gst_stream_synchronizer_finalize;
1171 gst_element_class_add_static_pad_template (element_class, &srctemplate);
1172 gst_element_class_add_static_pad_template (element_class, &sinktemplate);
1174 gst_element_class_set_static_metadata (element_class,
1175 "Stream Synchronizer", "Generic",
1176 "Synchronizes a group of streams to have equal durations and starting points",
1177 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
1179 element_class->change_state =
1180 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
1181 element_class->request_new_pad =
1182 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
1183 element_class->release_pad =
1184 GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
1186 GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
1187 "streamsynchronizer", 0, "Stream Synchronizer");