d916286d0967cfbd1cd07afc84b770a37026a1f5
[platform/upstream/gstreamer.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
30     GST_LOG_OBJECT (obj,                                                \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
34     GST_LOG_OBJECT (obj,                                                \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
40     GST_LOG_OBJECT (obj,                                                \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54
55 #define gst_stream_synchronizer_parent_class parent_class
56 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
57     GST_TYPE_ELEMENT);
58
59 typedef struct
60 {
61   GstStreamSynchronizer *transform;
62   guint stream_number;
63   GstPad *srcpad;
64   GstPad *sinkpad;
65   GstSegment segment;
66
67   gboolean wait;                /* TRUE if waiting/blocking */
68   gboolean new_stream;
69   gboolean drop_discont;
70   gboolean is_eos;              /* TRUE if EOS was received */
71   gboolean seen_data;
72
73   GCond stream_finish_cond;
74
75   /* seqnum of the previously received STREAM_START
76    * default: G_MAXUINT32 */
77   guint32 stream_start_seqnum;
78   guint32 segment_seqnum;
79 } GstStream;
80
81 /* Must be called with lock! */
82 static inline GstPad *
83 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
84 {
85   if (stream->sinkpad == pad)
86     return gst_object_ref (stream->srcpad);
87   if (stream->srcpad == pad)
88     return gst_object_ref (stream->sinkpad);
89
90   return NULL;
91 }
92
93 static GstPad *
94 gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
95 {
96   GstStream *stream;
97   GstPad *opad = NULL;
98
99   GST_STREAM_SYNCHRONIZER_LOCK (self);
100   stream = gst_pad_get_element_private (pad);
101   if (!stream)
102     goto out;
103
104   opad = gst_stream_get_other_pad (stream, pad);
105
106 out:
107   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
108
109   if (!opad)
110     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
111
112   return opad;
113 }
114
115 /* Generic pad functions */
116 static GstIterator *
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
118     GstObject * parent)
119 {
120   GstIterator *it = NULL;
121   GstPad *opad;
122
123   opad =
124       gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
125   if (opad) {
126     GValue value = { 0, };
127
128     g_value_init (&value, GST_TYPE_PAD);
129     g_value_set_object (&value, opad);
130     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
131     g_value_unset (&value);
132     gst_object_unref (opad);
133   }
134
135   return it;
136 }
137
138 static gboolean
139 gst_stream_synchronizer_query (GstPad * pad, GstObject * parent,
140     GstQuery * query)
141 {
142   GstPad *opad;
143   gboolean ret = FALSE;
144
145   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
146
147   opad =
148       gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
149   if (opad) {
150     ret = gst_pad_peer_query (opad, query);
151     gst_object_unref (opad);
152   }
153
154   return ret;
155 }
156
157 /* srcpad functions */
158 static gboolean
159 gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
160     GstEvent * event)
161 {
162   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
163   GstPad *opad;
164   gboolean ret = FALSE;
165
166   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
167       GST_EVENT_TYPE_NAME (event), event);
168
169   switch (GST_EVENT_TYPE (event)) {
170     case GST_EVENT_QOS:{
171       gdouble proportion;
172       GstClockTimeDiff diff;
173       GstClockTime timestamp;
174       gint64 running_time_diff = -1;
175       GstStream *stream;
176
177       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
178       gst_event_unref (event);
179
180       GST_STREAM_SYNCHRONIZER_LOCK (self);
181       stream = gst_pad_get_element_private (pad);
182       if (stream)
183         running_time_diff = stream->segment.base;
184       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
185
186       if (running_time_diff == -1) {
187         GST_WARNING_OBJECT (pad, "QOS event before group start");
188         goto out;
189       }
190       if (timestamp < running_time_diff) {
191         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
192         goto out;
193       }
194
195       GST_LOG_OBJECT (pad,
196           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
197           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
198           GST_TIME_ARGS (running_time_diff),
199           GST_TIME_ARGS (timestamp - running_time_diff));
200
201       timestamp -= running_time_diff;
202
203       /* That case is invalid for QoS events */
204       if (diff < 0 && -diff > timestamp) {
205         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
206         ret = TRUE;
207         goto out;
208       }
209
210       event =
211           gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
212           timestamp);
213       break;
214     }
215     default:
216       break;
217   }
218
219   opad = gst_stream_get_other_pad_from_pad (self, pad);
220   if (opad) {
221     ret = gst_pad_push_event (opad, event);
222     gst_object_unref (opad);
223   }
224
225 out:
226   return ret;
227 }
228
229 /* sinkpad functions */
230 static gboolean
231 gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
232     GstEvent * event)
233 {
234   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
235   GstPad *opad;
236   gboolean ret = FALSE;
237
238   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
239       GST_EVENT_TYPE_NAME (event), event);
240
241   switch (GST_EVENT_TYPE (event)) {
242     case GST_EVENT_STREAM_START:
243     {
244       GstStream *stream;
245       guint32 seqnum = gst_event_get_seqnum (event);
246       GList *l;
247       gboolean all_wait = TRUE;
248
249       GST_STREAM_SYNCHRONIZER_LOCK (self);
250       stream = gst_pad_get_element_private (pad);
251       if (stream && stream->stream_start_seqnum != seqnum) {
252
253         GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
254
255         stream->is_eos = FALSE;
256         stream->wait = TRUE;
257         stream->new_stream = TRUE;
258
259         for (l = self->streams; l; l = l->next) {
260           GstStream *ostream = l->data;
261
262           all_wait = all_wait && ostream->wait;
263           if (!all_wait)
264             break;
265         }
266         if (all_wait) {
267           gint64 position = 0;
268
269           GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
270
271           for (l = self->streams; l; l = l->next) {
272             GstStream *ostream = l->data;
273             gint64 stop_running_time;
274             gint64 position_running_time;
275
276             ostream->wait = FALSE;
277
278             if (ostream->segment.format == GST_FORMAT_TIME) {
279               stop_running_time =
280                   gst_segment_to_running_time (&ostream->segment,
281                   GST_FORMAT_TIME, ostream->segment.stop);
282               position_running_time =
283                   gst_segment_to_running_time (&ostream->segment,
284                   GST_FORMAT_TIME, ostream->segment.position);
285               position =
286                   MAX (position, MAX (stop_running_time,
287                       position_running_time));
288             }
289           }
290           position = MAX (0, position);
291           self->group_start_time = MAX (self->group_start_time, position);
292
293           GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
294               GST_TIME_ARGS (self->group_start_time));
295
296           for (l = self->streams; l; l = l->next) {
297             GstStream *ostream = l->data;
298             g_cond_broadcast (&ostream->stream_finish_cond);
299           }
300         }
301       } else
302         GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
303       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
304     }
305       break;
306     case GST_EVENT_SEGMENT:{
307       GstStream *stream;
308       GstSegment segment;
309
310       gst_event_copy_segment (event, &segment);
311
312       GST_STREAM_SYNCHRONIZER_LOCK (self);
313       stream = gst_pad_get_element_private (pad);
314       if (stream) {
315         if (stream->wait) {
316           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
317           g_cond_wait (&stream->stream_finish_cond, &self->lock);
318           stream = gst_pad_get_element_private (pad);
319           if (stream)
320             stream->wait = FALSE;
321         }
322       }
323
324       if (self->shutdown) {
325         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
326         gst_event_unref (event);
327         goto done;
328       }
329
330       if (stream && segment.format == GST_FORMAT_TIME) {
331         if (stream->new_stream) {
332           stream->new_stream = FALSE;
333           stream->drop_discont = TRUE;
334           segment.base = self->group_start_time;
335         }
336
337         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
338             &stream->segment);
339         gst_segment_copy_into (&segment, &stream->segment);
340         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
341             &stream->segment);
342         stream->segment_seqnum = gst_event_get_seqnum (event);
343
344         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
345             GST_TIME_ARGS (stream->segment.base));
346         {
347           GstEvent *tmpev;
348
349           tmpev = gst_event_new_segment (&stream->segment);
350           gst_event_set_seqnum (tmpev, stream->segment_seqnum);
351           gst_event_unref (event);
352           event = tmpev;
353         }
354
355       } else if (stream) {
356         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
357             gst_format_get_name (segment.format));
358         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
359         /* Since this stream is not time-based, we mark it so that
360          * other streams don't wait forever on it */
361         stream->wait = TRUE;
362       }
363       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
364       break;
365     }
366     case GST_EVENT_FLUSH_START:{
367       GstStream *stream;
368
369       GST_STREAM_SYNCHRONIZER_LOCK (self);
370       stream = gst_pad_get_element_private (pad);
371       if (stream) {
372         GST_DEBUG_OBJECT (pad, "Flushing streams");
373         g_cond_broadcast (&stream->stream_finish_cond);
374       }
375       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
376       break;
377     }
378     case GST_EVENT_FLUSH_STOP:{
379       GstStream *stream;
380
381       GST_STREAM_SYNCHRONIZER_LOCK (self);
382       stream = gst_pad_get_element_private (pad);
383       if (stream) {
384         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
385             stream->stream_number);
386         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
387
388         stream->is_eos = FALSE;
389         stream->wait = FALSE;
390         stream->new_stream = FALSE;
391         stream->drop_discont = FALSE;
392         stream->seen_data = FALSE;
393         g_cond_broadcast (&stream->stream_finish_cond);
394       }
395       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
396       break;
397     }
398     case GST_EVENT_EOS:{
399       GstStream *stream;
400       GList *l;
401       gboolean all_eos = TRUE;
402       gboolean seen_data;
403       GSList *pads = NULL;
404       GstPad *srcpad;
405       GstClockTime timestamp;
406
407       GST_STREAM_SYNCHRONIZER_LOCK (self);
408       stream = gst_pad_get_element_private (pad);
409       if (!stream) {
410         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
411         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
412         break;
413       }
414
415       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
416       stream->is_eos = TRUE;
417
418       seen_data = stream->seen_data;
419       srcpad = gst_object_ref (stream->srcpad);
420
421       if (seen_data && stream->segment.position != -1)
422         timestamp = stream->segment.position;
423       else if (stream->segment.rate < 0.0 || stream->segment.stop == -1)
424         timestamp = stream->segment.start;
425       else
426         timestamp = stream->segment.stop;
427
428       for (l = self->streams; l; l = l->next) {
429         GstStream *ostream = l->data;
430
431         all_eos = all_eos && ostream->is_eos;
432         if (!all_eos)
433           break;
434       }
435
436       if (all_eos) {
437         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
438         for (l = self->streams; l; l = l->next) {
439           GstStream *ostream = l->data;
440           /* local snapshot of current pads */
441           gst_object_ref (ostream->srcpad);
442           pads = g_slist_prepend (pads, ostream->srcpad);
443         }
444       }
445       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
446       /* drop lock when sending eos, which may block in e.g. preroll */
447       if (pads) {
448         GstPad *pad;
449         GSList *epad;
450
451         ret = TRUE;
452         epad = pads;
453         while (epad) {
454           pad = epad->data;
455           GST_DEBUG_OBJECT (pad, "Pushing EOS");
456           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
457           gst_object_unref (pad);
458           epad = g_slist_next (epad);
459         }
460         g_slist_free (pads);
461       } else {
462         /* if EOS, but no data has passed, then send something to replace EOS
463          * for preroll purposes */
464         if (!seen_data) {
465           GstEvent *gap_event;
466
467           gap_event = gst_event_new_gap (timestamp, GST_CLOCK_TIME_NONE);
468           ret = gst_pad_push_event (srcpad, gap_event);
469         } else {
470           GstEvent *gap_event;
471
472           /* FIXME: Also send a GAP event to let audio sinks start their
473            * clock in case they did not have enough data yet */
474           gap_event = gst_event_new_gap (timestamp, GST_CLOCK_TIME_NONE);
475           ret = gst_pad_push_event (srcpad, gap_event);
476         }
477       }
478       gst_object_unref (srcpad);
479       gst_event_unref (event);
480       goto done;
481     }
482     default:
483       break;
484   }
485
486   opad = gst_stream_get_other_pad_from_pad (self, pad);
487   if (opad) {
488     ret = gst_pad_push_event (opad, event);
489     gst_object_unref (opad);
490   }
491
492 done:
493
494   return ret;
495 }
496
497 static GstFlowReturn
498 gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
499     GstBuffer * buffer)
500 {
501   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
502   GstPad *opad;
503   GstFlowReturn ret = GST_FLOW_ERROR;
504   GstStream *stream;
505   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
506   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
507
508   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
509       ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
510       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
511       buffer, gst_buffer_get_size (buffer),
512       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
513       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
514       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
515
516   timestamp = GST_BUFFER_TIMESTAMP (buffer);
517   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
518       && GST_BUFFER_DURATION_IS_VALID (buffer))
519     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
520
521   GST_STREAM_SYNCHRONIZER_LOCK (self);
522   stream = gst_pad_get_element_private (pad);
523
524   if (stream) {
525     stream->seen_data = TRUE;
526     if (stream->drop_discont) {
527       if (GST_BUFFER_IS_DISCONT (buffer)) {
528         GST_DEBUG_OBJECT (pad, "removing DISCONT from buffer %p", buffer);
529         buffer = gst_buffer_make_writable (buffer);
530         GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
531       }
532       stream->drop_discont = FALSE;
533     }
534
535     if (stream->segment.format == GST_FORMAT_TIME
536         && GST_CLOCK_TIME_IS_VALID (timestamp)) {
537       GST_LOG_OBJECT (pad,
538           "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
539           GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
540       stream->segment.position = timestamp;
541     }
542   }
543   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
544
545   opad = gst_stream_get_other_pad_from_pad (self, pad);
546   if (opad) {
547     ret = gst_pad_push (opad, buffer);
548     gst_object_unref (opad);
549   }
550
551   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
552   if (ret == GST_FLOW_OK) {
553     GList *l;
554
555     GST_STREAM_SYNCHRONIZER_LOCK (self);
556     stream = gst_pad_get_element_private (pad);
557     if (stream && stream->segment.format == GST_FORMAT_TIME
558         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
559       GST_LOG_OBJECT (pad,
560           "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
561           GST_TIME_ARGS (stream->segment.position),
562           GST_TIME_ARGS (timestamp_end));
563       stream->segment.position = timestamp_end;
564     }
565
566     /* Advance EOS streams if necessary. For non-EOS
567      * streams the demuxers should already do this! */
568     for (l = self->streams; l; l = l->next) {
569       GstStream *ostream = l->data;
570       gint64 position;
571
572       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
573         continue;
574
575       if (ostream->segment.position != -1)
576         position = ostream->segment.position;
577       else
578         position = ostream->segment.start;
579
580       /* Is there a 1 second lag? */
581       if (position != -1 && position + GST_SECOND < timestamp_end) {
582         gint64 new_start;
583
584         new_start = timestamp_end - GST_SECOND;
585
586         GST_DEBUG_OBJECT (ostream->sinkpad,
587             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
588             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
589             GST_TIME_ARGS (new_start));
590
591         ostream->segment.position = new_start;
592
593         gst_pad_push_event (ostream->srcpad,
594             gst_event_new_gap (position, new_start - position));
595       }
596     }
597     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
598   }
599
600   return ret;
601 }
602
603 /* GstElement vfuncs */
604 static GstPad *
605 gst_stream_synchronizer_request_new_pad (GstElement * element,
606     GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
607 {
608   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
609   GstStream *stream;
610   gchar *tmp;
611
612   GST_STREAM_SYNCHRONIZER_LOCK (self);
613   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
614       self->current_stream_number);
615
616   stream = g_slice_new0 (GstStream);
617   stream->transform = self;
618   stream->stream_number = self->current_stream_number;
619   g_cond_init (&stream->stream_finish_cond);
620   stream->stream_start_seqnum = G_MAXUINT32;
621   stream->segment_seqnum = G_MAXUINT32;
622
623   tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
624   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
625   g_free (tmp);
626   gst_pad_set_element_private (stream->sinkpad, stream);
627   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
628       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
629   gst_pad_set_query_function (stream->sinkpad,
630       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
631   gst_pad_set_event_function (stream->sinkpad,
632       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
633   gst_pad_set_chain_function (stream->sinkpad,
634       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
635
636   tmp = g_strdup_printf ("src_%u", self->current_stream_number);
637   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
638   g_free (tmp);
639   gst_pad_set_element_private (stream->srcpad, stream);
640   gst_pad_set_iterate_internal_links_function (stream->srcpad,
641       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
642   gst_pad_set_query_function (stream->srcpad,
643       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
644   gst_pad_set_event_function (stream->srcpad,
645       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
646
647   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
648
649   self->streams = g_list_prepend (self->streams, stream);
650   self->current_stream_number++;
651   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
652
653   /* Add pads and activate unless we're going to NULL */
654   g_rec_mutex_lock (GST_STATE_GET_LOCK (self));
655   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
656     gst_pad_set_active (stream->srcpad, TRUE);
657     gst_pad_set_active (stream->sinkpad, TRUE);
658   }
659   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
660   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
661   g_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
662
663   return stream->sinkpad;
664 }
665
666 /* Must be called with lock! */
667 static void
668 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
669     GstStream * stream)
670 {
671   GList *l;
672
673   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
674
675   for (l = self->streams; l; l = l->next) {
676     if (l->data == stream) {
677       self->streams = g_list_delete_link (self->streams, l);
678       break;
679     }
680   }
681   g_assert (l != NULL);
682
683   /* we can drop the lock, since stream exists now only local.
684    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
685    * (due to reverse lock order) when deactivating pads */
686   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
687
688   gst_pad_set_element_private (stream->srcpad, NULL);
689   gst_pad_set_element_private (stream->sinkpad, NULL);
690   gst_pad_set_active (stream->srcpad, FALSE);
691   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
692   gst_pad_set_active (stream->sinkpad, FALSE);
693   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
694
695   if (stream->segment.format == GST_FORMAT_TIME) {
696     gint64 stop_running_time;
697     gint64 position_running_time;
698
699     stop_running_time =
700         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
701         stream->segment.stop);
702     position_running_time =
703         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
704         stream->segment.position);
705     stop_running_time = MAX (stop_running_time, position_running_time);
706
707     if (stop_running_time > self->group_start_time) {
708       GST_DEBUG_OBJECT (stream->sinkpad,
709           "Updating global start running time from %" GST_TIME_FORMAT " to %"
710           GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
711           GST_TIME_ARGS (stop_running_time));
712
713       self->group_start_time = stop_running_time;
714     }
715   }
716
717   g_cond_clear (&stream->stream_finish_cond);
718   g_slice_free (GstStream, stream);
719
720   /* NOTE: In theory we have to check here if all streams
721    * are EOS but the one that was removed wasn't and then
722    * send EOS downstream. But due to the way how playsink
723    * works this is not necessary and will only cause problems
724    * for gapless playback. playsink will only add/remove pads
725    * when it's reconfigured, which happens when the streams
726    * change
727    */
728
729   /* lock for good measure, since the caller had it */
730   GST_STREAM_SYNCHRONIZER_LOCK (self);
731 }
732
733 static void
734 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
735 {
736   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
737   GstStream *stream;
738
739   GST_STREAM_SYNCHRONIZER_LOCK (self);
740   stream = gst_pad_get_element_private (pad);
741   if (stream) {
742     g_assert (stream->sinkpad == pad);
743
744     gst_stream_synchronizer_release_stream (self, stream);
745   }
746   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
747 }
748
749 static GstStateChangeReturn
750 gst_stream_synchronizer_change_state (GstElement * element,
751     GstStateChange transition)
752 {
753   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
754   GstStateChangeReturn ret;
755
756   switch (transition) {
757     case GST_STATE_CHANGE_NULL_TO_READY:
758       GST_DEBUG_OBJECT (self, "State change NULL->READY");
759       self->shutdown = FALSE;
760       break;
761     case GST_STATE_CHANGE_READY_TO_PAUSED:
762       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
763       self->group_start_time = 0;
764       self->shutdown = FALSE;
765       break;
766     case GST_STATE_CHANGE_PAUSED_TO_READY:{
767       GList *l;
768
769       GST_DEBUG_OBJECT (self, "State change READY->NULL");
770
771       GST_STREAM_SYNCHRONIZER_LOCK (self);
772       for (l = self->streams; l; l = l->next) {
773         GstStream *ostream = l->data;
774         g_cond_broadcast (&ostream->stream_finish_cond);
775       }
776       self->shutdown = TRUE;
777       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
778     }
779     default:
780       break;
781   }
782
783   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
784   GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
785   if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
786     return ret;
787
788   switch (transition) {
789     case GST_STATE_CHANGE_PAUSED_TO_READY:{
790       GList *l;
791
792       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
793       self->group_start_time = 0;
794
795       GST_STREAM_SYNCHRONIZER_LOCK (self);
796       for (l = self->streams; l; l = l->next) {
797         GstStream *stream = l->data;
798
799         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
800         stream->wait = FALSE;
801         stream->new_stream = FALSE;
802         stream->drop_discont = FALSE;
803         stream->is_eos = FALSE;
804       }
805       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
806       break;
807     }
808     case GST_STATE_CHANGE_READY_TO_NULL:{
809       GST_DEBUG_OBJECT (self, "State change READY->NULL");
810
811       GST_STREAM_SYNCHRONIZER_LOCK (self);
812       self->current_stream_number = 0;
813       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
814       break;
815     }
816     default:
817       break;
818   }
819
820   return ret;
821 }
822
823 /* GObject vfuncs */
824 static void
825 gst_stream_synchronizer_finalize (GObject * object)
826 {
827   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
828
829   g_mutex_clear (&self->lock);
830
831   G_OBJECT_CLASS (parent_class)->finalize (object);
832 }
833
834 /* GObject type initialization */
835 static void
836 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
837 {
838   g_mutex_init (&self->lock);
839 }
840
841 static void
842 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
843 {
844   GObjectClass *gobject_class = (GObjectClass *) klass;
845   GstElementClass *element_class = (GstElementClass *) klass;
846
847   gobject_class->finalize = gst_stream_synchronizer_finalize;
848
849   gst_element_class_add_pad_template (element_class,
850       gst_static_pad_template_get (&srctemplate));
851   gst_element_class_add_pad_template (element_class,
852       gst_static_pad_template_get (&sinktemplate));
853
854   gst_element_class_set_static_metadata (element_class,
855       "Stream Synchronizer", "Generic",
856       "Synchronizes a group of streams to have equal durations and starting points",
857       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
858
859   element_class->change_state =
860       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
861   element_class->request_new_pad =
862       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
863   element_class->release_pad =
864       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
865 }
866
867 gboolean
868 gst_stream_synchronizer_plugin_init (GstPlugin * plugin)
869 {
870   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
871       "streamsynchronizer", 0, "Stream Synchronizer");
872
873   return gst_element_register (plugin, "streamsynchronizer", GST_RANK_NONE,
874       GST_TYPE_STREAM_SYNCHRONIZER);
875 }