streamsynchronizer: reset eos on STREAM_START
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gstplaybackelements.h"
25 #include "gststreamsynchronizer.h"
26
27 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
28 #define GST_CAT_DEFAULT stream_synchronizer_debug
29
30 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                \
31     GST_TRACE_OBJECT (obj,                                              \
32                     "locking from thread %p",                           \
33                     g_thread_self ());                                  \
34     g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);            \
35     GST_TRACE_OBJECT (obj,                                              \
36                     "locked from thread %p",                            \
37                     g_thread_self ());                                  \
38 } G_STMT_END
39
40 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {              \
41     GST_TRACE_OBJECT (obj,                                              \
42                     "unlocking from thread %p",                         \
43                     g_thread_self ());                                  \
44     g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
45 } G_STMT_END
46
47 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
48     GST_PAD_SRC,
49     GST_PAD_SOMETIMES,
50     GST_STATIC_CAPS_ANY);
51 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
52     GST_PAD_SINK,
53     GST_PAD_REQUEST,
54     GST_STATIC_CAPS_ANY);
55
56 #define gst_stream_synchronizer_parent_class parent_class
57 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
58     GST_TYPE_ELEMENT);
59 #define _do_init \
60     playback_element_init (plugin);
61 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (streamsynchronizer, "streamsynchronizer",
62     GST_RANK_NONE, GST_TYPE_STREAM_SYNCHRONIZER, _do_init);
63
64 typedef struct
65 {
66   GstStreamSynchronizer *transform;
67   guint stream_number;
68   GstPad *srcpad;
69   GstPad *sinkpad;
70   GstSegment segment;
71
72   gboolean wait;                /* TRUE if waiting/blocking */
73   gboolean is_eos;              /* TRUE if EOS was received */
74   gboolean eos_sent;            /* when EOS was sent downstream */
75   gboolean flushing;            /* set after flush-start and before flush-stop */
76   gboolean seen_data;
77   gboolean send_gap_event;
78   GstClockTime gap_duration;
79
80   GstStreamFlags flags;
81
82   GCond stream_finish_cond;
83
84   /* seqnum of the previously received STREAM_START
85    * default: G_MAXUINT32 */
86   guint32 stream_start_seqnum;
87   guint32 segment_seqnum;
88   guint group_id;
89
90   gint refcount;
91 } GstSyncStream;
92
93 static GstSyncStream *
94 gst_syncstream_ref (GstSyncStream * stream)
95 {
96   g_return_val_if_fail (stream != NULL, NULL);
97   g_atomic_int_add (&stream->refcount, 1);
98   return stream;
99 }
100
101 static void
102 gst_syncstream_unref (GstSyncStream * stream)
103 {
104   g_return_if_fail (stream != NULL);
105   g_return_if_fail (stream->refcount > 0);
106
107   if (g_atomic_int_dec_and_test (&stream->refcount))
108     g_slice_free (GstSyncStream, stream);
109 }
110
111 G_BEGIN_DECLS
112 #define GST_TYPE_STREAMSYNC_PAD              (gst_streamsync_pad_get_type ())
113 #define GST_IS_STREAMSYNC_PAD(obj)           (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMSYNC_PAD))
114 #define GST_IS_STREAMSYNC_PAD_CLASS(klass)   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMSYNC_PAD))
115 #define GST_STREAMSYNC_PAD(obj)              (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPad))
116 #define GST_STREAMSYNC_PAD_CLASS(klass)      (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPadClass))
117 typedef struct _GstStreamSyncPad GstStreamSyncPad;
118 typedef struct _GstStreamSyncPadClass GstStreamSyncPadClass;
119
120 struct _GstStreamSyncPad
121 {
122   GstPad parent;
123
124   GstSyncStream *stream;
125
126   /* Since we need to access data associated with a pad in this
127    * element, it's important to manage the respective lifetimes of the
128    * stored pad data and the pads themselves. Pad deactivation happens
129    * without mutual exclusion to the use of pad data in this element.
130    *
131    * The approach here is to have the sinkpad (the request pad) hold a
132    * strong reference onto the srcpad (so that it stays alive until
133    * the last pad is destroyed). Similarly the srcpad has a weak
134    * reference to the sinkpad (request pad) to ensure it knows when
135    * the pads are destroyed, since the pad data may be requested from
136    * either the srcpad or the sinkpad. This avoids a nasty set of
137    * potential race conditions.
138    *
139    * The code is arranged so that in the srcpad, the pad pointer is
140    * always NULL (not used) and in the sinkpad, the otherpad is always
141    * NULL. */
142   GstPad *pad;
143   GWeakRef otherpad;
144 };
145
146 struct _GstStreamSyncPadClass
147 {
148   GstPadClass parent_class;
149 };
150
151 static GType gst_streamsync_pad_get_type (void);
152 static GstSyncStream *gst_streamsync_pad_get_stream (GstPad * pad);
153
154 G_END_DECLS
155 #define GST_STREAMSYNC_PAD_CAST(obj)         ((GstStreamSyncPad *)obj)
156   G_DEFINE_TYPE (GstStreamSyncPad, gst_streamsync_pad, GST_TYPE_PAD);
157
158 static void gst_streamsync_pad_dispose (GObject * object);
159
160 static void
161 gst_streamsync_pad_class_init (GstStreamSyncPadClass * klass)
162 {
163   GObjectClass *gobject_class;
164   gobject_class = G_OBJECT_CLASS (klass);
165   gobject_class->dispose = gst_streamsync_pad_dispose;
166 }
167
168 static void
169 gst_streamsync_pad_init (GstStreamSyncPad * ppad)
170 {
171 }
172
173 static void
174 gst_streamsync_pad_dispose (GObject * object)
175 {
176   GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (object);
177
178   if (GST_PAD_DIRECTION (spad) == GST_PAD_SINK)
179     gst_clear_object (&spad->pad);
180   else
181     g_weak_ref_clear (&spad->otherpad);
182
183   g_clear_pointer (&spad->stream, gst_syncstream_unref);
184
185   G_OBJECT_CLASS (gst_streamsync_pad_parent_class)->dispose (object);
186 }
187
188 static GstPad *
189 gst_streamsync_pad_new_from_template (GstPadTemplate * templ,
190     const gchar * name)
191 {
192   g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL);
193
194   return GST_PAD_CAST (g_object_new (GST_TYPE_STREAMSYNC_PAD,
195           "name", name, "direction", templ->direction, "template", templ,
196           NULL));
197 }
198
199 static GstPad *
200 gst_streamsync_pad_new_from_static_template (GstStaticPadTemplate * templ,
201     const gchar * name)
202 {
203   GstPad *pad;
204   GstPadTemplate *template;
205
206   template = gst_static_pad_template_get (templ);
207   pad = gst_streamsync_pad_new_from_template (template, name);
208   gst_object_unref (template);
209
210   return pad;
211 }
212
213 static GstSyncStream *
214 gst_streamsync_pad_get_stream (GstPad * pad)
215 {
216   GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
217   return gst_syncstream_ref (spad->stream);
218 }
219
220 static GstPad *
221 gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
222 {
223   GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
224   GstPad *opad = NULL;
225
226   if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK)
227     opad = gst_object_ref (spad->pad);
228   else
229     opad = g_weak_ref_get (&spad->otherpad);
230
231   if (!opad)
232     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
233
234   return opad;
235 }
236
237 /* Generic pad functions */
238 static GstIterator *
239 gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
240     GstObject * parent)
241 {
242   GstIterator *it = NULL;
243   GstPad *opad;
244
245   opad =
246       gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
247   if (opad) {
248     GValue value = { 0, };
249
250     g_value_init (&value, GST_TYPE_PAD);
251     g_value_set_object (&value, opad);
252     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
253     g_value_unset (&value);
254     gst_object_unref (opad);
255   }
256
257   return it;
258 }
259
260 static GstEvent *
261 set_event_rt_offset (GstStreamSynchronizer * self, GstPad * pad,
262     GstEvent * event)
263 {
264   gint64 running_time_diff;
265   GstSyncStream *stream;
266
267   GST_STREAM_SYNCHRONIZER_LOCK (self);
268   stream = gst_streamsync_pad_get_stream (pad);
269   running_time_diff = stream->segment.base;
270   gst_syncstream_unref (stream);
271   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
272
273   if (running_time_diff != -1) {
274     gint64 offset;
275
276     event = gst_event_make_writable (event);
277     offset = gst_event_get_running_time_offset (event);
278     if (GST_PAD_IS_SRC (pad))
279       offset -= running_time_diff;
280     else
281       offset += running_time_diff;
282
283     gst_event_set_running_time_offset (event, offset);
284   }
285
286   return event;
287 }
288
289 /* srcpad functions */
290 static gboolean
291 gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
292     GstEvent * event)
293 {
294   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
295   gboolean ret = FALSE;
296
297   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
298       GST_EVENT_TYPE_NAME (event), event);
299
300   event = set_event_rt_offset (self, pad, event);
301
302   ret = gst_pad_event_default (pad, parent, event);
303
304   return ret;
305 }
306
307 /* must be called with the STREAM_SYNCHRONIZER_LOCK */
308 static gboolean
309 gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
310 {
311   gboolean ret = FALSE;
312   GstSyncStream *stream;
313
314   stream = gst_streamsync_pad_get_stream (pad);
315
316   while (!self->eos && !self->flushing) {
317     if (stream->flushing) {
318       GST_DEBUG_OBJECT (pad, "Flushing");
319       break;
320     }
321     if (!stream->wait) {
322       GST_DEBUG_OBJECT (pad, "Stream not waiting anymore");
323       break;
324     }
325
326     if (stream->send_gap_event) {
327       GstEvent *event;
328
329       if (!GST_CLOCK_TIME_IS_VALID (stream->segment.position)) {
330         GST_WARNING_OBJECT (pad, "Have no position and can't send GAP event");
331         stream->send_gap_event = FALSE;
332         continue;
333       }
334
335       event =
336           gst_event_new_gap (stream->segment.position, stream->gap_duration);
337       GST_DEBUG_OBJECT (pad,
338           "Send GAP event, position: %" GST_TIME_FORMAT " duration: %"
339           GST_TIME_FORMAT, GST_TIME_ARGS (stream->segment.position),
340           GST_TIME_ARGS (stream->gap_duration));
341
342       /* drop lock when sending GAP event, which may block in e.g. preroll */
343       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
344       ret = gst_pad_push_event (pad, event);
345       GST_STREAM_SYNCHRONIZER_LOCK (self);
346       if (!ret) {
347         gst_syncstream_unref (stream);
348         return ret;
349       }
350       stream->send_gap_event = FALSE;
351
352       /* force a check on the loop conditions as we unlocked a
353        * few lines above and those variables could have changed */
354       continue;
355     }
356
357     g_cond_wait (&stream->stream_finish_cond, &self->lock);
358   }
359
360   gst_syncstream_unref (stream);
361   return TRUE;
362 }
363
364 /* sinkpad functions */
365 static gboolean
366 gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
367     GstEvent * event)
368 {
369   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
370   gboolean ret = FALSE;
371
372   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
373       GST_EVENT_TYPE_NAME (event), event);
374
375   switch (GST_EVENT_TYPE (event)) {
376     case GST_EVENT_STREAM_START:
377     {
378       GstSyncStream *stream, *ostream;
379       guint32 seqnum = gst_event_get_seqnum (event);
380       guint group_id;
381       gboolean have_group_id;
382       GList *l;
383       gboolean all_wait = TRUE;
384       gboolean new_stream = TRUE;
385
386       have_group_id = gst_event_parse_group_id (event, &group_id);
387
388       GST_STREAM_SYNCHRONIZER_LOCK (self);
389       self->have_group_id &= have_group_id;
390       have_group_id = self->have_group_id;
391       self->eos = FALSE;
392
393       stream = gst_streamsync_pad_get_stream (pad);
394
395       gst_event_parse_stream_flags (event, &stream->flags);
396
397       if ((have_group_id && stream->group_id != group_id) || (!have_group_id
398               && stream->stream_start_seqnum != seqnum)) {
399         stream->is_eos = FALSE;
400         stream->eos_sent = FALSE;
401         stream->flushing = FALSE;
402         stream->stream_start_seqnum = seqnum;
403         stream->group_id = group_id;
404
405         if (!have_group_id) {
406           /* Check if this belongs to a stream that is already there,
407            * e.g. we got the visualizations for an audio stream */
408           for (l = self->streams; l; l = l->next) {
409             ostream = l->data;
410
411             if (ostream != stream && ostream->stream_start_seqnum == seqnum
412                 && !ostream->wait) {
413               new_stream = FALSE;
414               break;
415             }
416           }
417
418           if (!new_stream) {
419             GST_DEBUG_OBJECT (pad,
420                 "Stream %d belongs to running stream %d, no waiting",
421                 stream->stream_number, ostream->stream_number);
422             stream->wait = FALSE;
423             gst_syncstream_unref (stream);
424             GST_STREAM_SYNCHRONIZER_UNLOCK (self);
425             break;
426           }
427         } else if (group_id == self->group_id) {
428           GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, "
429               "no waiting", stream->stream_number, group_id);
430           gst_syncstream_unref (stream);
431           GST_STREAM_SYNCHRONIZER_UNLOCK (self);
432           break;
433         }
434
435         GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
436
437         stream->wait = TRUE;
438
439         for (l = self->streams; l; l = l->next) {
440           GstSyncStream *ostream = l->data;
441
442           all_wait = all_wait && ((ostream->flags & GST_STREAM_FLAG_SPARSE)
443               || (ostream->wait && (!have_group_id
444                       || ostream->group_id == group_id)));
445           if (!all_wait)
446             break;
447         }
448
449         if (all_wait) {
450           gint64 position = 0;
451
452           if (have_group_id)
453             GST_DEBUG_OBJECT (self,
454                 "All streams have changed to group id %u -- unblocking",
455                 group_id);
456           else
457             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
458
459           self->group_id = group_id;
460
461           for (l = self->streams; l; l = l->next) {
462             GstSyncStream *ostream = l->data;
463             gint64 stop_running_time;
464             gint64 position_running_time;
465
466             ostream->wait = FALSE;
467
468             if (ostream->segment.format == GST_FORMAT_TIME) {
469               if (ostream->segment.rate > 0)
470                 stop_running_time =
471                     gst_segment_to_running_time (&ostream->segment,
472                     GST_FORMAT_TIME, ostream->segment.stop);
473               else
474                 stop_running_time =
475                     gst_segment_to_running_time (&ostream->segment,
476                     GST_FORMAT_TIME, ostream->segment.start);
477
478               position_running_time =
479                   gst_segment_to_running_time (&ostream->segment,
480                   GST_FORMAT_TIME, ostream->segment.position);
481
482               position_running_time =
483                   MAX (position_running_time, stop_running_time);
484
485               if (ostream->segment.rate > 0)
486                 position_running_time -=
487                     gst_segment_to_running_time (&ostream->segment,
488                     GST_FORMAT_TIME, ostream->segment.start);
489               else
490                 position_running_time -=
491                     gst_segment_to_running_time (&ostream->segment,
492                     GST_FORMAT_TIME, ostream->segment.stop);
493
494               position_running_time = MAX (0, position_running_time);
495
496               position = MAX (position, position_running_time);
497             }
498           }
499
500           self->group_start_time += position;
501
502           GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
503               GST_TIME_ARGS (self->group_start_time));
504
505           for (l = self->streams; l; l = l->next) {
506             GstSyncStream *ostream = l->data;
507             ostream->wait = FALSE;
508             g_cond_broadcast (&ostream->stream_finish_cond);
509           }
510         }
511       }
512
513       gst_syncstream_unref (stream);
514       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
515       break;
516     }
517     case GST_EVENT_SEGMENT:{
518       GstSyncStream *stream;
519       GstSegment segment;
520
521       gst_event_copy_segment (event, &segment);
522
523       GST_STREAM_SYNCHRONIZER_LOCK (self);
524
525       gst_stream_synchronizer_wait (self, pad);
526
527       if (self->shutdown) {
528         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
529         gst_event_unref (event);
530         goto done;
531       }
532
533       stream = gst_streamsync_pad_get_stream (pad);
534       if (segment.format == GST_FORMAT_TIME) {
535         GST_DEBUG_OBJECT (pad,
536             "New stream, updating base from %" GST_TIME_FORMAT " to %"
537             GST_TIME_FORMAT, GST_TIME_ARGS (segment.base),
538             GST_TIME_ARGS (segment.base + self->group_start_time));
539         segment.base += self->group_start_time;
540
541         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
542             &stream->segment);
543         gst_segment_copy_into (&segment, &stream->segment);
544         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
545             &stream->segment);
546         stream->segment_seqnum = gst_event_get_seqnum (event);
547
548         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
549             GST_TIME_ARGS (stream->segment.base));
550         {
551           GstEvent *tmpev;
552
553           tmpev = gst_event_new_segment (&stream->segment);
554           gst_event_set_seqnum (tmpev, stream->segment_seqnum);
555           gst_event_unref (event);
556           event = tmpev;
557         }
558       } else if (stream) {
559         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
560             gst_format_get_name (segment.format));
561         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
562       }
563       gst_syncstream_unref (stream);
564       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
565       break;
566     }
567     case GST_EVENT_FLUSH_START:{
568       GstSyncStream *stream;
569
570       GST_STREAM_SYNCHRONIZER_LOCK (self);
571       stream = gst_streamsync_pad_get_stream (pad);
572       self->eos = FALSE;
573       GST_DEBUG_OBJECT (pad, "Flushing streams");
574       stream->flushing = TRUE;
575       g_cond_broadcast (&stream->stream_finish_cond);
576       gst_syncstream_unref (stream);
577       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
578       break;
579     }
580     case GST_EVENT_FLUSH_STOP:{
581       GstSyncStream *stream;
582       GList *l;
583       GstClockTime new_group_start_time = 0;
584
585       GST_STREAM_SYNCHRONIZER_LOCK (self);
586       stream = gst_streamsync_pad_get_stream (pad);
587       GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
588           stream->stream_number);
589       gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
590
591       stream->is_eos = FALSE;
592       stream->eos_sent = FALSE;
593       stream->flushing = FALSE;
594       stream->wait = FALSE;
595       g_cond_broadcast (&stream->stream_finish_cond);
596
597       for (l = self->streams; l; l = l->next) {
598         GstSyncStream *ostream = l->data;
599         GstClockTime start_running_time;
600
601         if (ostream == stream || ostream->flushing)
602           continue;
603
604         if (ostream->segment.format == GST_FORMAT_TIME) {
605           if (ostream->segment.rate > 0)
606             start_running_time =
607                 gst_segment_to_running_time (&ostream->segment,
608                 GST_FORMAT_TIME, ostream->segment.start);
609           else
610             start_running_time =
611                 gst_segment_to_running_time (&ostream->segment,
612                 GST_FORMAT_TIME, ostream->segment.stop);
613
614           new_group_start_time = MAX (new_group_start_time, start_running_time);
615         }
616       }
617
618       GST_DEBUG_OBJECT (pad,
619           "Updating group start time from %" GST_TIME_FORMAT " to %"
620           GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
621           GST_TIME_ARGS (new_group_start_time));
622       self->group_start_time = new_group_start_time;
623
624       gst_syncstream_unref (stream);
625       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
626       break;
627     }
628       /* unblocking EOS wait when track switch. */
629     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:{
630       if (gst_event_has_name (event, "playsink-custom-video-flush")
631           || gst_event_has_name (event, "playsink-custom-audio-flush")
632           || gst_event_has_name (event, "playsink-custom-subtitle-flush")) {
633         GstSyncStream *stream;
634
635         GST_STREAM_SYNCHRONIZER_LOCK (self);
636         stream = gst_streamsync_pad_get_stream (pad);
637         stream->is_eos = FALSE;
638         stream->eos_sent = FALSE;
639         stream->wait = FALSE;
640         g_cond_broadcast (&stream->stream_finish_cond);
641         gst_syncstream_unref (stream);
642         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
643       }
644       break;
645     }
646     case GST_EVENT_EOS:{
647       GstSyncStream *stream;
648       GList *l;
649       gboolean all_eos = TRUE;
650       gboolean seen_data;
651       GSList *pads = NULL;
652       GstPad *srcpad;
653       GstClockTime timestamp;
654       guint32 seqnum;
655
656       GST_STREAM_SYNCHRONIZER_LOCK (self);
657       stream = gst_streamsync_pad_get_stream (pad);
658
659       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
660       stream->is_eos = TRUE;
661
662       seen_data = stream->seen_data;
663       srcpad = gst_object_ref (stream->srcpad);
664       seqnum = stream->segment_seqnum;
665
666       if (seen_data && stream->segment.position != -1)
667         timestamp = stream->segment.position;
668       else if (stream->segment.rate < 0.0 || stream->segment.stop == -1)
669         timestamp = stream->segment.start;
670       else
671         timestamp = stream->segment.stop;
672
673       stream->segment.position = timestamp;
674
675       for (l = self->streams; l; l = l->next) {
676         GstSyncStream *ostream = l->data;
677
678         all_eos = all_eos && ostream->is_eos;
679         if (!all_eos)
680           break;
681       }
682
683       if (all_eos) {
684         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
685         self->eos = TRUE;
686         for (l = self->streams; l; l = l->next) {
687           GstSyncStream *ostream = l->data;
688           /* local snapshot of current pads */
689           gst_object_ref (ostream->srcpad);
690           pads = g_slist_prepend (pads, ostream->srcpad);
691         }
692       }
693       if (pads) {
694         GstPad *pad;
695         GSList *epad;
696         GstSyncStream *ostream;
697
698         ret = TRUE;
699         epad = pads;
700         while (epad) {
701           pad = epad->data;
702           ostream = gst_streamsync_pad_get_stream (pad);
703           g_cond_broadcast (&ostream->stream_finish_cond);
704           gst_syncstream_unref (ostream);
705           gst_object_unref (pad);
706           epad = g_slist_next (epad);
707         }
708         g_slist_free (pads);
709       } else {
710         if (seen_data) {
711           stream->send_gap_event = TRUE;
712           stream->gap_duration = GST_CLOCK_TIME_NONE;
713           stream->wait = TRUE;
714           ret = gst_stream_synchronizer_wait (self, srcpad);
715         }
716       }
717
718       /* send eos if haven't seen data. seen_data will be true if data buffer
719        * of the track have received in anytime. sink is ready if seen_data is
720        * true, so can send GAP event. Will send EOS if sink isn't ready. The
721        * scenario for the case is one track haven't any media data and then
722        * send EOS. Or no any valid media data in one track, so decoder can't
723        * get valid CAPS for the track. sink can't ready without received CAPS.*/
724       if (!seen_data || self->eos) {
725         GstEvent *topush;
726         GST_DEBUG_OBJECT (pad, "send EOS event");
727         /* drop lock when sending eos, which may block in e.g. preroll */
728         topush = gst_event_new_eos ();
729         gst_event_set_seqnum (topush, seqnum);
730         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
731         ret = gst_pad_push_event (srcpad, topush);
732         GST_STREAM_SYNCHRONIZER_LOCK (self);
733         stream = gst_streamsync_pad_get_stream (pad);
734         stream->eos_sent = TRUE;
735         gst_syncstream_unref (stream);
736       }
737
738       gst_object_unref (srcpad);
739       gst_event_unref (event);
740       gst_syncstream_unref (stream);
741       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
742       goto done;
743     }
744     default:
745       break;
746   }
747
748   event = set_event_rt_offset (self, pad, event);
749
750   ret = gst_pad_event_default (pad, parent, event);
751
752 done:
753
754   return ret;
755 }
756
757 static GstFlowReturn
758 gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
759     GstBuffer * buffer)
760 {
761   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
762   GstPad *opad;
763   GstFlowReturn ret = GST_FLOW_ERROR;
764   GstSyncStream *stream;
765   GstClockTime duration = GST_CLOCK_TIME_NONE;
766   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
767   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
768
769   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
770       ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
771       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
772       buffer, gst_buffer_get_size (buffer),
773       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
774       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
775       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
776
777   timestamp = GST_BUFFER_TIMESTAMP (buffer);
778   duration = GST_BUFFER_DURATION (buffer);
779   if (GST_CLOCK_TIME_IS_VALID (timestamp)
780       && GST_CLOCK_TIME_IS_VALID (duration))
781     timestamp_end = timestamp + duration;
782
783   GST_STREAM_SYNCHRONIZER_LOCK (self);
784   stream = gst_streamsync_pad_get_stream (pad);
785
786   stream->seen_data = TRUE;
787   if (stream->segment.format == GST_FORMAT_TIME
788       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
789     GST_LOG_OBJECT (pad,
790         "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
791         GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
792     if (stream->segment.rate > 0.0)
793       stream->segment.position = timestamp;
794     else
795       stream->segment.position = timestamp_end;
796   }
797
798   gst_syncstream_unref (stream);
799   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
800
801   opad = gst_stream_get_other_pad_from_pad (self, pad);
802   if (opad) {
803     ret = gst_pad_push (opad, buffer);
804     gst_object_unref (opad);
805   }
806
807   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
808   if (ret == GST_FLOW_OK) {
809     GList *l;
810
811     GST_STREAM_SYNCHRONIZER_LOCK (self);
812     stream = gst_streamsync_pad_get_stream (pad);
813     if (stream->segment.format == GST_FORMAT_TIME) {
814       GstClockTime position;
815
816       if (stream->segment.rate > 0.0)
817         position = timestamp_end;
818       else
819         position = timestamp;
820
821       if (GST_CLOCK_TIME_IS_VALID (position)) {
822         GST_LOG_OBJECT (pad,
823             "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
824             GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (position));
825         stream->segment.position = position;
826       }
827     }
828
829     /* Advance EOS streams if necessary. For non-EOS
830      * streams the demuxers should already do this! */
831     if (!GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
832         GST_CLOCK_TIME_IS_VALID (timestamp)) {
833       timestamp_end = timestamp + GST_SECOND;
834     }
835
836     for (l = self->streams; l; l = l->next) {
837       GstSyncStream *ostream = l->data;
838       gint64 position;
839
840       if (!ostream->is_eos || ostream->eos_sent ||
841           ostream->segment.format != GST_FORMAT_TIME)
842         continue;
843
844       if (ostream->segment.position != -1)
845         position = ostream->segment.position;
846       else
847         position = ostream->segment.start;
848
849       /* Is there a 1 second lag? */
850       if (position != -1 && GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
851           position + GST_SECOND < timestamp_end) {
852         gint64 new_start;
853
854         new_start = timestamp_end - GST_SECOND;
855
856         GST_DEBUG_OBJECT (ostream->sinkpad,
857             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
858             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
859             GST_TIME_ARGS (new_start));
860
861         ostream->segment.position = new_start;
862
863         ostream->send_gap_event = TRUE;
864         ostream->gap_duration = new_start - position;
865         g_cond_broadcast (&ostream->stream_finish_cond);
866       }
867     }
868     gst_syncstream_unref (stream);
869     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
870   }
871
872   return ret;
873 }
874
875 /* Must be called with lock! */
876 static GstPad *
877 gst_stream_synchronizer_new_pad (GstStreamSynchronizer * sync)
878 {
879   GstSyncStream *stream = NULL;
880   GstStreamSyncPad *sinkpad, *srcpad;
881   gchar *tmp;
882
883   stream = g_slice_new0 (GstSyncStream);
884   stream->transform = sync;
885   stream->stream_number = sync->current_stream_number;
886   g_cond_init (&stream->stream_finish_cond);
887   stream->stream_start_seqnum = G_MAXUINT32;
888   stream->segment_seqnum = G_MAXUINT32;
889   stream->group_id = G_MAXUINT;
890   stream->seen_data = FALSE;
891   stream->send_gap_event = FALSE;
892   stream->refcount = 1;
893
894   tmp = g_strdup_printf ("sink_%u", sync->current_stream_number);
895   stream->sinkpad =
896       gst_streamsync_pad_new_from_static_template (&sinktemplate, tmp);
897   g_free (tmp);
898
899   GST_STREAMSYNC_PAD_CAST (stream->sinkpad)->stream =
900       gst_syncstream_ref (stream);
901
902   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
903       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
904   gst_pad_set_event_function (stream->sinkpad,
905       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
906   gst_pad_set_chain_function (stream->sinkpad,
907       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
908   GST_PAD_SET_PROXY_CAPS (stream->sinkpad);
909   GST_PAD_SET_PROXY_ALLOCATION (stream->sinkpad);
910   GST_PAD_SET_PROXY_SCHEDULING (stream->sinkpad);
911
912   tmp = g_strdup_printf ("src_%u", sync->current_stream_number);
913   stream->srcpad =
914       gst_streamsync_pad_new_from_static_template (&srctemplate, tmp);
915   g_free (tmp);
916
917   GST_STREAMSYNC_PAD_CAST (stream->srcpad)->stream =
918       gst_syncstream_ref (stream);
919
920   sinkpad = GST_STREAMSYNC_PAD_CAST (stream->sinkpad);
921   srcpad = GST_STREAMSYNC_PAD_CAST (stream->srcpad);
922   /* Hold a strong reference from the sink (request pad) to the src to
923    * ensure a predicatable destruction order */
924   sinkpad->pad = gst_object_ref (srcpad);
925   /* And a weak reference from the src to the sink, to know when pad
926    * release is occuring, and to ensure we do not try and take
927    * references to inactive / destructing streams. */
928   g_weak_ref_init (&srcpad->otherpad, stream->sinkpad);
929
930   gst_pad_set_iterate_internal_links_function (stream->srcpad,
931       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
932   gst_pad_set_event_function (stream->srcpad,
933       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
934   GST_PAD_SET_PROXY_CAPS (stream->srcpad);
935   GST_PAD_SET_PROXY_ALLOCATION (stream->srcpad);
936   GST_PAD_SET_PROXY_SCHEDULING (stream->srcpad);
937
938   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
939
940   GST_STREAM_SYNCHRONIZER_UNLOCK (sync);
941
942   /* Add pads and activate unless we're going to NULL */
943   g_rec_mutex_lock (GST_STATE_GET_LOCK (sync));
944   if (GST_STATE_TARGET (sync) != GST_STATE_NULL) {
945     gst_pad_set_active (stream->srcpad, TRUE);
946     gst_pad_set_active (stream->sinkpad, TRUE);
947   }
948   gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->srcpad);
949   gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->sinkpad);
950   g_rec_mutex_unlock (GST_STATE_GET_LOCK (sync));
951
952   GST_STREAM_SYNCHRONIZER_LOCK (sync);
953
954   sync->streams = g_list_prepend (sync->streams, g_steal_pointer (&stream));
955   sync->current_stream_number++;
956
957   return GST_PAD_CAST (sinkpad);
958 }
959
960 /* GstElement vfuncs */
961 static GstPad *
962 gst_stream_synchronizer_request_new_pad (GstElement * element,
963     GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
964 {
965   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
966   GstPad *request_pad;
967
968   GST_STREAM_SYNCHRONIZER_LOCK (self);
969   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
970       self->current_stream_number);
971
972   request_pad = gst_stream_synchronizer_new_pad (self);
973
974   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
975
976   return request_pad;
977 }
978
979 /* Must be called with lock! */
980 static void
981 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
982     GstSyncStream * stream)
983 {
984   GList *l;
985
986   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
987
988   for (l = self->streams; l; l = l->next) {
989     if (l->data == stream) {
990       self->streams = g_list_delete_link (self->streams, l);
991       break;
992     }
993   }
994   g_assert (l != NULL);
995   if (self->streams == NULL) {
996     self->have_group_id = TRUE;
997     self->group_id = G_MAXUINT;
998   }
999
1000   /* we can drop the lock, since stream exists now only local.
1001    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
1002    * (due to reverse lock order) when deactivating pads */
1003   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1004
1005   gst_pad_set_active (stream->srcpad, FALSE);
1006   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
1007   gst_pad_set_active (stream->sinkpad, FALSE);
1008   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
1009
1010   g_cond_clear (&stream->stream_finish_cond);
1011
1012   /* Release the ref maintaining validity in the streams list */
1013   gst_syncstream_unref (stream);
1014
1015   /* NOTE: In theory we have to check here if all streams
1016    * are EOS but the one that was removed wasn't and then
1017    * send EOS downstream. But due to the way how playsink
1018    * works this is not necessary and will only cause problems
1019    * for gapless playback. playsink will only add/remove pads
1020    * when it's reconfigured, which happens when the streams
1021    * change
1022    */
1023
1024   /* lock for good measure, since the caller had it */
1025   GST_STREAM_SYNCHRONIZER_LOCK (self);
1026 }
1027
1028 static void
1029 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
1030 {
1031   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
1032   GstSyncStream *stream;
1033
1034   GST_STREAM_SYNCHRONIZER_LOCK (self);
1035   stream = gst_streamsync_pad_get_stream (pad);
1036   g_assert (stream->sinkpad == pad);
1037
1038   gst_stream_synchronizer_release_stream (self, stream);
1039
1040   gst_syncstream_unref (stream);
1041   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1042 }
1043
1044 static GstStateChangeReturn
1045 gst_stream_synchronizer_change_state (GstElement * element,
1046     GstStateChange transition)
1047 {
1048   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
1049   GstStateChangeReturn ret;
1050
1051   switch (transition) {
1052     case GST_STATE_CHANGE_NULL_TO_READY:
1053       GST_DEBUG_OBJECT (self, "State change NULL->READY");
1054       self->shutdown = FALSE;
1055       break;
1056     case GST_STATE_CHANGE_READY_TO_PAUSED:
1057       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
1058       self->group_start_time = 0;
1059       self->have_group_id = TRUE;
1060       self->group_id = G_MAXUINT;
1061       self->shutdown = FALSE;
1062       self->flushing = FALSE;
1063       self->eos = FALSE;
1064       break;
1065     case GST_STATE_CHANGE_PAUSED_TO_READY:{
1066       GList *l;
1067
1068       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
1069
1070       GST_STREAM_SYNCHRONIZER_LOCK (self);
1071       self->flushing = TRUE;
1072       self->shutdown = TRUE;
1073       for (l = self->streams; l; l = l->next) {
1074         GstSyncStream *ostream = l->data;
1075         g_cond_broadcast (&ostream->stream_finish_cond);
1076       }
1077       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1078     }
1079     default:
1080       break;
1081   }
1082
1083   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1084   GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
1085   if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
1086     return ret;
1087
1088   switch (transition) {
1089     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:{
1090       GList *l;
1091
1092       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
1093
1094       GST_STREAM_SYNCHRONIZER_LOCK (self);
1095       for (l = self->streams; l; l = l->next) {
1096         GstSyncStream *stream = l->data;
1097         /* send GAP event to sink to finished pre-roll. The reason is function
1098          * chain () will be blocked on pad_push (), so can't trigger the track
1099          * which reach EOS to send GAP event. */
1100         if (stream->is_eos && !stream->eos_sent) {
1101           stream->send_gap_event = TRUE;
1102           stream->gap_duration = GST_CLOCK_TIME_NONE;
1103           g_cond_broadcast (&stream->stream_finish_cond);
1104         }
1105       }
1106       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1107       break;
1108     }
1109     case GST_STATE_CHANGE_PAUSED_TO_READY:{
1110       GList *l;
1111
1112       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
1113       self->group_start_time = 0;
1114
1115       GST_STREAM_SYNCHRONIZER_LOCK (self);
1116       for (l = self->streams; l; l = l->next) {
1117         GstSyncStream *stream = l->data;
1118
1119         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
1120         stream->gap_duration = GST_CLOCK_TIME_NONE;
1121         stream->wait = FALSE;
1122         stream->is_eos = FALSE;
1123         stream->eos_sent = FALSE;
1124         stream->flushing = FALSE;
1125         stream->send_gap_event = FALSE;
1126       }
1127       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1128       break;
1129     }
1130     case GST_STATE_CHANGE_READY_TO_NULL:{
1131       GST_DEBUG_OBJECT (self, "State change READY->NULL");
1132
1133       GST_STREAM_SYNCHRONIZER_LOCK (self);
1134       self->current_stream_number = 0;
1135       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1136       break;
1137     }
1138     default:
1139       break;
1140   }
1141
1142   return ret;
1143 }
1144
1145 /* GObject vfuncs */
1146 static void
1147 gst_stream_synchronizer_finalize (GObject * object)
1148 {
1149   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
1150
1151   g_mutex_clear (&self->lock);
1152
1153   G_OBJECT_CLASS (parent_class)->finalize (object);
1154 }
1155
1156 /* GObject type initialization */
1157 static void
1158 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
1159 {
1160   g_mutex_init (&self->lock);
1161 }
1162
1163 static void
1164 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
1165 {
1166   GObjectClass *gobject_class = (GObjectClass *) klass;
1167   GstElementClass *element_class = (GstElementClass *) klass;
1168
1169   gobject_class->finalize = gst_stream_synchronizer_finalize;
1170
1171   gst_element_class_add_static_pad_template (element_class, &srctemplate);
1172   gst_element_class_add_static_pad_template (element_class, &sinktemplate);
1173
1174   gst_element_class_set_static_metadata (element_class,
1175       "Stream Synchronizer", "Generic",
1176       "Synchronizes a group of streams to have equal durations and starting points",
1177       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
1178
1179   element_class->change_state =
1180       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
1181   element_class->request_new_pad =
1182       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
1183   element_class->release_pad =
1184       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
1185
1186   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
1187       "streamsynchronizer", 0, "Stream Synchronizer");
1188 }