streamsynchronizer: do not leak EOS events
[platform/upstream/gstreamer.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
30     GST_LOG_OBJECT (obj,                                                \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
34     GST_LOG_OBJECT (obj,                                                \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
40     GST_LOG_OBJECT (obj,                                                \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54
55 #define gst_stream_synchronizer_parent_class parent_class
56 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
57     GST_TYPE_ELEMENT);
58
59 typedef struct
60 {
61   GstStreamSynchronizer *transform;
62   guint stream_number;
63   GstPad *srcpad;
64   GstPad *sinkpad;
65   GstSegment segment;
66
67   gboolean wait;                /* TRUE if waiting/blocking */
68   gboolean new_stream;
69   gboolean drop_discont;
70   gboolean is_eos;              /* TRUE if EOS was received */
71   gboolean seen_data;
72
73   GCond stream_finish_cond;
74
75   /* seqnum of the previously received STREAM_START
76    * default: G_MAXUINT32 */
77   guint32 stream_start_seqnum;
78   guint32 segment_seqnum;
79 } GstStream;
80
81 /* Must be called with lock! */
82 static inline GstPad *
83 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
84 {
85   if (stream->sinkpad == pad)
86     return gst_object_ref (stream->srcpad);
87   if (stream->srcpad == pad)
88     return gst_object_ref (stream->sinkpad);
89
90   return NULL;
91 }
92
93 static GstPad *
94 gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
95 {
96   GstStream *stream;
97   GstPad *opad = NULL;
98
99   GST_STREAM_SYNCHRONIZER_LOCK (self);
100   stream = gst_pad_get_element_private (pad);
101   if (!stream)
102     goto out;
103
104   opad = gst_stream_get_other_pad (stream, pad);
105
106 out:
107   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
108
109   if (!opad)
110     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
111
112   return opad;
113 }
114
115 /* Generic pad functions */
116 static GstIterator *
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
118     GstObject * parent)
119 {
120   GstIterator *it = NULL;
121   GstPad *opad;
122
123   opad =
124       gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
125   if (opad) {
126     GValue value = { 0, };
127
128     g_value_init (&value, GST_TYPE_PAD);
129     g_value_set_object (&value, opad);
130     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
131     g_value_unset (&value);
132     gst_object_unref (opad);
133   }
134
135   return it;
136 }
137
138 static gboolean
139 gst_stream_synchronizer_query (GstPad * pad, GstObject * parent,
140     GstQuery * query)
141 {
142   GstPad *opad;
143   gboolean ret = FALSE;
144
145   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
146
147   opad =
148       gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
149   if (opad) {
150     ret = gst_pad_peer_query (opad, query);
151     gst_object_unref (opad);
152   }
153
154   return ret;
155 }
156
157 /* srcpad functions */
158 static gboolean
159 gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
160     GstEvent * event)
161 {
162   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
163   GstPad *opad;
164   gboolean ret = FALSE;
165
166   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
167       GST_EVENT_TYPE_NAME (event), event);
168
169   switch (GST_EVENT_TYPE (event)) {
170     case GST_EVENT_QOS:{
171       gdouble proportion;
172       GstClockTimeDiff diff;
173       GstClockTime timestamp;
174       gint64 running_time_diff = -1;
175       GstStream *stream;
176
177       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
178       gst_event_unref (event);
179
180       GST_STREAM_SYNCHRONIZER_LOCK (self);
181       stream = gst_pad_get_element_private (pad);
182       if (stream)
183         running_time_diff = stream->segment.base;
184       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
185
186       if (running_time_diff == -1) {
187         GST_WARNING_OBJECT (pad, "QOS event before group start");
188         goto out;
189       }
190       if (timestamp < running_time_diff) {
191         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
192         goto out;
193       }
194
195       GST_LOG_OBJECT (pad,
196           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
197           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
198           GST_TIME_ARGS (running_time_diff),
199           GST_TIME_ARGS (timestamp - running_time_diff));
200
201       timestamp -= running_time_diff;
202
203       /* That case is invalid for QoS events */
204       if (diff < 0 && -diff > timestamp) {
205         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
206         ret = TRUE;
207         goto out;
208       }
209
210       event =
211           gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
212           timestamp);
213       break;
214     }
215     default:
216       break;
217   }
218
219   opad = gst_stream_get_other_pad_from_pad (self, pad);
220   if (opad) {
221     ret = gst_pad_push_event (opad, event);
222     gst_object_unref (opad);
223   }
224
225 out:
226   return ret;
227 }
228
229 /* sinkpad functions */
230 static gboolean
231 gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
232     GstEvent * event)
233 {
234   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
235   GstPad *opad;
236   gboolean ret = FALSE;
237
238   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
239       GST_EVENT_TYPE_NAME (event), event);
240
241   switch (GST_EVENT_TYPE (event)) {
242     case GST_EVENT_STREAM_START:
243     {
244       GstStream *stream;
245       guint32 seqnum = gst_event_get_seqnum (event);
246       GList *l;
247       gboolean all_wait = TRUE;
248
249       GST_STREAM_SYNCHRONIZER_LOCK (self);
250       stream = gst_pad_get_element_private (pad);
251       if (stream && stream->stream_start_seqnum != seqnum) {
252
253         GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
254
255         stream->is_eos = FALSE;
256         stream->wait = TRUE;
257         stream->new_stream = TRUE;
258
259         for (l = self->streams; l; l = l->next) {
260           GstStream *ostream = l->data;
261
262           all_wait = all_wait && ostream->wait;
263           if (!all_wait)
264             break;
265         }
266         if (all_wait) {
267           gint64 position = 0;
268
269           GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
270
271           for (l = self->streams; l; l = l->next) {
272             GstStream *ostream = l->data;
273             gint64 stop_running_time;
274             gint64 position_running_time;
275
276             ostream->wait = FALSE;
277
278             if (ostream->segment.format == GST_FORMAT_TIME) {
279               stop_running_time =
280                   gst_segment_to_running_time (&ostream->segment,
281                   GST_FORMAT_TIME, ostream->segment.stop);
282               position_running_time =
283                   gst_segment_to_running_time (&ostream->segment,
284                   GST_FORMAT_TIME, ostream->segment.position);
285               position =
286                   MAX (position, MAX (stop_running_time,
287                       position_running_time));
288             }
289           }
290           position = MAX (0, position);
291           self->group_start_time = MAX (self->group_start_time, position);
292
293           GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
294               GST_TIME_ARGS (self->group_start_time));
295
296           for (l = self->streams; l; l = l->next) {
297             GstStream *ostream = l->data;
298             g_cond_broadcast (&ostream->stream_finish_cond);
299           }
300         }
301       } else
302         GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
303       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
304     }
305       break;
306     case GST_EVENT_SEGMENT:{
307       GstStream *stream;
308       GstSegment segment;
309
310       gst_event_copy_segment (event, &segment);
311
312       GST_STREAM_SYNCHRONIZER_LOCK (self);
313       stream = gst_pad_get_element_private (pad);
314       if (stream) {
315         if (stream->wait) {
316           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
317           g_cond_wait (&stream->stream_finish_cond, &self->lock);
318           stream = gst_pad_get_element_private (pad);
319           if (stream)
320             stream->wait = FALSE;
321         }
322       }
323
324       if (self->shutdown) {
325         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
326         gst_event_unref (event);
327         goto done;
328       }
329
330       if (stream && segment.format == GST_FORMAT_TIME) {
331         if (stream->new_stream) {
332           stream->new_stream = FALSE;
333           stream->drop_discont = TRUE;
334           segment.base = self->group_start_time;
335         }
336
337         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
338             &stream->segment);
339         gst_segment_copy_into (&segment, &stream->segment);
340         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
341             &stream->segment);
342         stream->segment_seqnum = gst_event_get_seqnum (event);
343
344         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
345             GST_TIME_ARGS (stream->segment.base));
346         {
347           GstEvent *tmpev;
348
349           tmpev = gst_event_new_segment (&stream->segment);
350           gst_event_set_seqnum (tmpev, stream->segment_seqnum);
351           gst_event_unref (event);
352           event = tmpev;
353         }
354
355       } else if (stream) {
356         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
357             gst_format_get_name (segment.format));
358         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
359       }
360       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
361       break;
362     }
363     case GST_EVENT_FLUSH_START:{
364       GstStream *stream;
365
366       GST_STREAM_SYNCHRONIZER_LOCK (self);
367       stream = gst_pad_get_element_private (pad);
368       if (stream) {
369         GST_DEBUG_OBJECT (pad, "Flushing streams");
370         g_cond_broadcast (&stream->stream_finish_cond);
371       }
372       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
373       break;
374     }
375     case GST_EVENT_FLUSH_STOP:{
376       GstStream *stream;
377
378       GST_STREAM_SYNCHRONIZER_LOCK (self);
379       stream = gst_pad_get_element_private (pad);
380       if (stream) {
381         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
382             stream->stream_number);
383         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
384
385         stream->is_eos = FALSE;
386         stream->wait = FALSE;
387         stream->new_stream = FALSE;
388         stream->drop_discont = FALSE;
389         stream->seen_data = FALSE;
390         g_cond_broadcast (&stream->stream_finish_cond);
391       }
392       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
393       break;
394     }
395     case GST_EVENT_EOS:{
396       GstStream *stream;
397       GList *l;
398       gboolean all_eos = TRUE;
399       gboolean seen_data;
400       GSList *pads = NULL;
401       GstPad *srcpad;
402
403       GST_STREAM_SYNCHRONIZER_LOCK (self);
404       stream = gst_pad_get_element_private (pad);
405       if (!stream) {
406         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
407         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
408         break;
409       }
410
411       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
412       stream->is_eos = TRUE;
413
414       seen_data = stream->seen_data;
415       srcpad = gst_object_ref (stream->srcpad);
416
417       for (l = self->streams; l; l = l->next) {
418         GstStream *ostream = l->data;
419
420         all_eos = all_eos && ostream->is_eos;
421         if (!all_eos)
422           break;
423       }
424
425       if (all_eos) {
426         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
427         for (l = self->streams; l; l = l->next) {
428           GstStream *ostream = l->data;
429           /* local snapshot of current pads */
430           gst_object_ref (ostream->srcpad);
431           pads = g_slist_prepend (pads, ostream->srcpad);
432         }
433       }
434       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
435       /* drop lock when sending eos, which may block in e.g. preroll */
436       if (pads) {
437         GstPad *pad;
438         GSList *epad;
439
440         ret = TRUE;
441         epad = pads;
442         while (epad) {
443           pad = epad->data;
444           GST_DEBUG_OBJECT (pad, "Pushing EOS");
445           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
446           gst_object_unref (pad);
447           epad = g_slist_next (epad);
448         }
449         g_slist_free (pads);
450       } else {
451         /* if EOS, but no data has passed, then send something to replace EOS
452          * for preroll purposes */
453         if (!seen_data) {
454           GstEvent *event;
455
456           event = gst_event_new_gap (0, 0);
457           gst_pad_push_event (srcpad, event);
458         }
459       }
460       gst_object_unref (srcpad);
461       gst_event_unref (event);
462       goto done;
463       break;
464     }
465     default:
466       break;
467   }
468
469   opad = gst_stream_get_other_pad_from_pad (self, pad);
470   if (opad) {
471     ret = gst_pad_push_event (opad, event);
472     gst_object_unref (opad);
473   }
474
475 done:
476
477   return ret;
478 }
479
480 static GstFlowReturn
481 gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
482     GstBuffer * buffer)
483 {
484   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
485   GstPad *opad;
486   GstFlowReturn ret = GST_FLOW_ERROR;
487   GstStream *stream;
488   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
489   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
490
491   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
492       ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
493       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
494       buffer, gst_buffer_get_size (buffer),
495       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
496       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
497       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
498
499   timestamp = GST_BUFFER_TIMESTAMP (buffer);
500   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
501       && GST_BUFFER_DURATION_IS_VALID (buffer))
502     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
503
504   GST_STREAM_SYNCHRONIZER_LOCK (self);
505   stream = gst_pad_get_element_private (pad);
506
507   if (stream) {
508     stream->seen_data = TRUE;
509     if (stream->drop_discont) {
510       if (GST_BUFFER_IS_DISCONT (buffer)) {
511         GST_DEBUG_OBJECT (pad, "removing DISCONT from buffer %p", buffer);
512         buffer = gst_buffer_make_writable (buffer);
513         GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
514       }
515       stream->drop_discont = FALSE;
516     }
517
518     if (stream->segment.format == GST_FORMAT_TIME
519         && GST_CLOCK_TIME_IS_VALID (timestamp)) {
520       GST_LOG_OBJECT (pad,
521           "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
522           GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
523       stream->segment.position = timestamp;
524     }
525   }
526   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
527
528   opad = gst_stream_get_other_pad_from_pad (self, pad);
529   if (opad) {
530     ret = gst_pad_push (opad, buffer);
531     gst_object_unref (opad);
532   }
533
534   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
535   if (ret == GST_FLOW_OK) {
536     GList *l;
537
538     GST_STREAM_SYNCHRONIZER_LOCK (self);
539     stream = gst_pad_get_element_private (pad);
540     if (stream && stream->segment.format == GST_FORMAT_TIME
541         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
542       GST_LOG_OBJECT (pad,
543           "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
544           GST_TIME_ARGS (stream->segment.position),
545           GST_TIME_ARGS (timestamp_end));
546       stream->segment.position = timestamp_end;
547     }
548
549     /* Advance EOS streams if necessary. For non-EOS
550      * streams the demuxers should already do this! */
551     for (l = self->streams; l; l = l->next) {
552       GstStream *ostream = l->data;
553       gint64 position;
554
555       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
556         continue;
557
558       if (ostream->segment.position != -1)
559         position = ostream->segment.position;
560       else
561         position = ostream->segment.start;
562
563       /* Is there a 1 second lag? */
564       if (position != -1 && position + GST_SECOND < timestamp_end) {
565         gint64 new_start, new_stop;
566
567         new_start = timestamp_end - GST_SECOND;
568         if (ostream->segment.stop == -1)
569           new_stop = -1;
570         else
571           new_stop = MAX (new_start, ostream->segment.stop);
572
573         GST_DEBUG_OBJECT (ostream->sinkpad,
574             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
575             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
576             GST_TIME_ARGS (new_start));
577
578         ostream->segment.start = new_start;
579         ostream->segment.stop = new_stop;
580         ostream->segment.time = new_start;
581         ostream->segment.position = new_start;
582
583         gst_pad_push_event (ostream->srcpad,
584             gst_event_new_segment (&ostream->segment));
585       }
586     }
587     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
588   }
589
590   return ret;
591 }
592
593 /* GstElement vfuncs */
594 static GstPad *
595 gst_stream_synchronizer_request_new_pad (GstElement * element,
596     GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
597 {
598   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
599   GstStream *stream;
600   gchar *tmp;
601
602   GST_STREAM_SYNCHRONIZER_LOCK (self);
603   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
604       self->current_stream_number);
605
606   stream = g_slice_new0 (GstStream);
607   stream->transform = self;
608   stream->stream_number = self->current_stream_number;
609   g_cond_init (&stream->stream_finish_cond);
610   stream->stream_start_seqnum = G_MAXUINT32;
611   stream->segment_seqnum = G_MAXUINT32;
612
613   tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
614   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
615   g_free (tmp);
616   gst_pad_set_element_private (stream->sinkpad, stream);
617   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
618       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
619   gst_pad_set_query_function (stream->sinkpad,
620       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
621   gst_pad_set_event_function (stream->sinkpad,
622       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
623   gst_pad_set_chain_function (stream->sinkpad,
624       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
625
626   tmp = g_strdup_printf ("src_%u", self->current_stream_number);
627   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
628   g_free (tmp);
629   gst_pad_set_element_private (stream->srcpad, stream);
630   gst_pad_set_iterate_internal_links_function (stream->srcpad,
631       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
632   gst_pad_set_query_function (stream->srcpad,
633       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
634   gst_pad_set_event_function (stream->srcpad,
635       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
636
637   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
638
639   self->streams = g_list_prepend (self->streams, stream);
640   self->current_stream_number++;
641   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
642
643   /* Add pads and activate unless we're going to NULL */
644   g_rec_mutex_lock (GST_STATE_GET_LOCK (self));
645   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
646     gst_pad_set_active (stream->srcpad, TRUE);
647     gst_pad_set_active (stream->sinkpad, TRUE);
648   }
649   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
650   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
651   g_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
652
653   return stream->sinkpad;
654 }
655
656 /* Must be called with lock! */
657 static void
658 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
659     GstStream * stream)
660 {
661   GList *l;
662
663   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
664
665   for (l = self->streams; l; l = l->next) {
666     if (l->data == stream) {
667       self->streams = g_list_delete_link (self->streams, l);
668       break;
669     }
670   }
671   g_assert (l != NULL);
672
673   /* we can drop the lock, since stream exists now only local.
674    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
675    * (due to reverse lock order) when deactivating pads */
676   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
677
678   gst_pad_set_element_private (stream->srcpad, NULL);
679   gst_pad_set_element_private (stream->sinkpad, NULL);
680   gst_pad_set_active (stream->srcpad, FALSE);
681   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
682   gst_pad_set_active (stream->sinkpad, FALSE);
683   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
684
685   if (stream->segment.format == GST_FORMAT_TIME) {
686     gint64 stop_running_time;
687     gint64 position_running_time;
688
689     stop_running_time =
690         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
691         stream->segment.stop);
692     position_running_time =
693         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
694         stream->segment.position);
695     stop_running_time = MAX (stop_running_time, position_running_time);
696
697     if (stop_running_time > self->group_start_time) {
698       GST_DEBUG_OBJECT (stream->sinkpad,
699           "Updating global start running time from %" GST_TIME_FORMAT " to %"
700           GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
701           GST_TIME_ARGS (stop_running_time));
702
703       self->group_start_time = stop_running_time;
704     }
705   }
706
707   g_cond_clear (&stream->stream_finish_cond);
708   g_slice_free (GstStream, stream);
709
710   /* NOTE: In theory we have to check here if all streams
711    * are EOS but the one that was removed wasn't and then
712    * send EOS downstream. But due to the way how playsink
713    * works this is not necessary and will only cause problems
714    * for gapless playback. playsink will only add/remove pads
715    * when it's reconfigured, which happens when the streams
716    * change
717    */
718
719   /* lock for good measure, since the caller had it */
720   GST_STREAM_SYNCHRONIZER_LOCK (self);
721 }
722
723 static void
724 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
725 {
726   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
727   GstStream *stream;
728
729   GST_STREAM_SYNCHRONIZER_LOCK (self);
730   stream = gst_pad_get_element_private (pad);
731   if (stream) {
732     g_assert (stream->sinkpad == pad);
733
734     gst_stream_synchronizer_release_stream (self, stream);
735   }
736   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
737 }
738
739 static GstStateChangeReturn
740 gst_stream_synchronizer_change_state (GstElement * element,
741     GstStateChange transition)
742 {
743   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
744   GstStateChangeReturn ret;
745
746   switch (transition) {
747     case GST_STATE_CHANGE_NULL_TO_READY:
748       GST_DEBUG_OBJECT (self, "State change NULL->READY");
749       self->shutdown = FALSE;
750       break;
751     case GST_STATE_CHANGE_READY_TO_PAUSED:
752       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
753       self->group_start_time = 0;
754       self->shutdown = FALSE;
755       break;
756     case GST_STATE_CHANGE_PAUSED_TO_READY:{
757       GList *l;
758
759       GST_DEBUG_OBJECT (self, "State change READY->NULL");
760
761       GST_STREAM_SYNCHRONIZER_LOCK (self);
762       for (l = self->streams; l; l = l->next) {
763         GstStream *ostream = l->data;
764         g_cond_broadcast (&ostream->stream_finish_cond);
765       }
766       self->shutdown = TRUE;
767       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
768     }
769     default:
770       break;
771   }
772
773   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
774   GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
775   if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
776     return ret;
777
778   switch (transition) {
779     case GST_STATE_CHANGE_PAUSED_TO_READY:{
780       GList *l;
781
782       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
783       self->group_start_time = 0;
784
785       GST_STREAM_SYNCHRONIZER_LOCK (self);
786       for (l = self->streams; l; l = l->next) {
787         GstStream *stream = l->data;
788
789         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
790         stream->wait = FALSE;
791         stream->new_stream = FALSE;
792         stream->drop_discont = FALSE;
793         stream->is_eos = FALSE;
794       }
795       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
796       break;
797     }
798     case GST_STATE_CHANGE_READY_TO_NULL:{
799       GST_DEBUG_OBJECT (self, "State change READY->NULL");
800
801       GST_STREAM_SYNCHRONIZER_LOCK (self);
802       self->current_stream_number = 0;
803       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
804       break;
805     }
806     default:
807       break;
808   }
809
810   return ret;
811 }
812
813 /* GObject vfuncs */
814 static void
815 gst_stream_synchronizer_finalize (GObject * object)
816 {
817   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
818
819   g_mutex_clear (&self->lock);
820
821   G_OBJECT_CLASS (parent_class)->finalize (object);
822 }
823
824 /* GObject type initialization */
825 static void
826 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
827 {
828   g_mutex_init (&self->lock);
829 }
830
831 static void
832 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
833 {
834   GObjectClass *gobject_class = (GObjectClass *) klass;
835   GstElementClass *element_class = (GstElementClass *) klass;
836
837   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
838       "streamsynchronizer", 0, "Stream Synchronizer");
839
840   gobject_class->finalize = gst_stream_synchronizer_finalize;
841
842   gst_element_class_add_pad_template (element_class,
843       gst_static_pad_template_get (&srctemplate));
844   gst_element_class_add_pad_template (element_class,
845       gst_static_pad_template_get (&sinktemplate));
846
847   gst_element_class_set_static_metadata (element_class,
848       "Stream Synchronizer", "Generic",
849       "Synchronizes a group of streams to have equal durations and starting points",
850       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
851
852   element_class->change_state =
853       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
854   element_class->request_new_pad =
855       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
856   element_class->release_pad =
857       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
858 }