b6ec66cc0dab7b4114c39d59c1234fdc2c788109
[platform/upstream/gstreamer.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
30     GST_LOG_OBJECT (obj,                                                \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
34     GST_LOG_OBJECT (obj,                                                \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
40     GST_LOG_OBJECT (obj,                                                \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54
55 static const gboolean passthrough = TRUE;
56
57 #define gst_stream_synchronizer_parent_class parent_class
58 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
59     GST_TYPE_ELEMENT);
60
61 typedef struct
62 {
63   GstStreamSynchronizer *transform;
64   guint stream_number;
65   GstPad *srcpad;
66   GstPad *sinkpad;
67   GstSegment segment;
68
69   gboolean wait;
70   gboolean new_stream;
71   gboolean drop_discont;
72   gboolean is_eos;
73   gboolean seen_data;
74
75   gint64 running_time_diff;
76 } GstStream;
77
78 /* Must be called with lock! */
79 static GstPad *
80 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
81 {
82   if (stream->sinkpad == pad)
83     return gst_object_ref (stream->srcpad);
84   else if (stream->srcpad == pad)
85     return gst_object_ref (stream->sinkpad);
86
87   return NULL;
88 }
89
90 static GstPad *
91 gst_stream_get_other_pad_from_pad (GstPad * pad)
92 {
93   GstStreamSynchronizer *self =
94       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
95   GstStream *stream;
96   GstPad *opad = NULL;
97
98   GST_STREAM_SYNCHRONIZER_LOCK (self);
99   stream = gst_pad_get_element_private (pad);
100   if (!stream)
101     goto out;
102
103   opad = gst_stream_get_other_pad (stream, pad);
104
105 out:
106   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
107   gst_object_unref (self);
108
109   if (!opad)
110     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
111
112   return opad;
113 }
114
115 /* Generic pad functions */
116 static GstIterator *
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
118 {
119   GstIterator *it = NULL;
120   GstPad *opad;
121
122   opad = gst_stream_get_other_pad_from_pad (pad);
123   if (opad) {
124     GValue value = { 0, };
125
126     g_value_init (&value, GST_TYPE_PAD);
127     g_value_set_object (&value, opad);
128     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
129     g_value_unset (&value);
130     gst_object_unref (opad);
131   }
132
133   return it;
134 }
135
136 static gboolean
137 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
138 {
139   GstPad *opad;
140   gboolean ret = FALSE;
141
142   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
143
144   opad = gst_stream_get_other_pad_from_pad (pad);
145   if (opad) {
146     ret = gst_pad_peer_query (opad, query);
147     gst_object_unref (opad);
148   }
149
150   return ret;
151 }
152
153 static GstCaps *
154 gst_stream_synchronizer_getcaps (GstPad * pad)
155 {
156   GstPad *opad;
157   GstCaps *ret = NULL;
158
159   opad = gst_stream_get_other_pad_from_pad (pad);
160   if (opad) {
161     ret = gst_pad_peer_get_caps (opad);
162     gst_object_unref (opad);
163   }
164
165   if (ret == NULL)
166     ret = gst_caps_new_any ();
167
168   GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
169
170   return ret;
171 }
172
173 static gboolean
174 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
175 {
176   GstPad *opad;
177   gboolean ret = FALSE;
178
179   opad = gst_stream_get_other_pad_from_pad (pad);
180   if (opad) {
181     ret = gst_pad_peer_accept_caps (opad, caps);
182     gst_object_unref (opad);
183   }
184
185   GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
186       caps);
187
188   return ret;
189 }
190
191 /* srcpad functions */
192 static gboolean
193 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
194 {
195   GstStreamSynchronizer *self =
196       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
197   GstPad *opad;
198   gboolean ret = FALSE;
199
200   if (passthrough)
201     goto skip_adjustments;
202
203   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
204       GST_EVENT_TYPE_NAME (event), event->structure);
205
206   switch (GST_EVENT_TYPE (event)) {
207     case GST_EVENT_QOS:{
208       gdouble proportion;
209       GstClockTimeDiff diff;
210       GstClockTime timestamp;
211       gint64 running_time_diff;
212       GstStream *stream;
213
214       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
215       gst_event_unref (event);
216
217       GST_STREAM_SYNCHRONIZER_LOCK (self);
218       stream = gst_pad_get_element_private (pad);
219       if (stream)
220         running_time_diff = stream->running_time_diff;
221       else
222         running_time_diff = -1;
223       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
224
225       if (running_time_diff == -1) {
226         GST_WARNING_OBJECT (pad, "QOS event before group start");
227         goto out;
228       } else if (timestamp < running_time_diff) {
229         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
230         goto out;
231       }
232
233       GST_LOG_OBJECT (pad,
234           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
235           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
236           GST_TIME_ARGS (running_time_diff),
237           GST_TIME_ARGS (timestamp - running_time_diff));
238
239       timestamp -= running_time_diff;
240
241       /* That case is invalid for QoS events */
242       if (diff < 0 && -diff > timestamp) {
243         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
244         ret = TRUE;
245         goto out;
246       }
247
248       event =
249           gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
250           timestamp);
251       break;
252     }
253     default:
254       break;
255   }
256
257 skip_adjustments:
258
259   opad = gst_stream_get_other_pad_from_pad (pad);
260   if (opad) {
261     ret = gst_pad_push_event (opad, event);
262     gst_object_unref (opad);
263   }
264
265 out:
266   gst_object_unref (self);
267
268   return ret;
269 }
270
271 /* sinkpad functions */
272 static gboolean
273 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
274 {
275   GstStreamSynchronizer *self =
276       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
277   GstPad *opad;
278   gboolean ret = FALSE;
279
280   if (passthrough)
281     goto skip_adjustments;
282
283   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
284       GST_EVENT_TYPE_NAME (event), event->structure);
285
286   switch (GST_EVENT_TYPE (event)) {
287     case GST_EVENT_SINK_MESSAGE:{
288       GstMessage *message;
289
290       gst_event_parse_sink_message (event, &message);
291       if (message->structure
292           && gst_structure_has_name (message->structure,
293               "playbin2-stream-changed")) {
294         GstStream *stream;
295
296         GST_STREAM_SYNCHRONIZER_LOCK (self);
297         stream = gst_pad_get_element_private (pad);
298         if (stream) {
299           GList *l;
300           gboolean all_wait = TRUE;
301
302           GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
303
304           stream->is_eos = FALSE;
305           stream->wait = TRUE;
306           stream->new_stream = TRUE;
307
308           for (l = self->streams; l; l = l->next) {
309             GstStream *ostream = l->data;
310
311             all_wait = all_wait && ostream->wait;
312             if (!all_wait)
313               break;
314           }
315           if (all_wait) {
316             gint64 last_stop = 0;
317
318             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
319
320             for (l = self->streams; l; l = l->next) {
321               GstStream *ostream = l->data;
322               gint64 stop_running_time;
323               gint64 last_stop_running_time;
324
325               ostream->wait = FALSE;
326
327               stop_running_time =
328                   gst_segment_to_running_time (&ostream->segment,
329                   GST_FORMAT_TIME, ostream->segment.stop);
330               last_stop_running_time =
331                   gst_segment_to_running_time (&ostream->segment,
332                   GST_FORMAT_TIME, ostream->segment.last_stop);
333               last_stop =
334                   MAX (last_stop, MAX (stop_running_time,
335                       last_stop_running_time));
336             }
337             last_stop = MAX (0, last_stop);
338             self->group_start_time = MAX (self->group_start_time, last_stop);
339
340             GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
341                 GST_TIME_ARGS (self->group_start_time));
342
343             g_cond_broadcast (self->stream_finish_cond);
344           }
345         }
346         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
347       }
348       gst_message_unref (message);
349       break;
350     }
351     case GST_EVENT_NEWSEGMENT:{
352       GstStream *stream;
353       gboolean update;
354       gdouble rate, applied_rate;
355       GstFormat format;
356       gint64 start, stop, position;
357
358       gst_event_parse_new_segment (event,
359           &update, &rate, &applied_rate, &format, &start, &stop, &position);
360
361       GST_STREAM_SYNCHRONIZER_LOCK (self);
362       stream = gst_pad_get_element_private (pad);
363       if (stream) {
364         if (stream->wait) {
365           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
366           g_cond_wait (self->stream_finish_cond, self->lock);
367           stream = gst_pad_get_element_private (pad);
368           if (stream)
369             stream->wait = FALSE;
370         }
371       }
372
373       if (self->shutdown) {
374         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
375         gst_event_unref (event);
376         goto done;
377       }
378
379       if (stream && format == GST_FORMAT_TIME) {
380         if (stream->new_stream) {
381           gint64 last_stop_running_time = 0;
382           gint64 stop_running_time = 0;
383
384           if (stream->segment.format == GST_FORMAT_TIME) {
385             last_stop_running_time =
386                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
387                 stream->segment.last_stop);
388             last_stop_running_time = MAX (last_stop_running_time, 0);
389             stop_running_time =
390                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
391                 stream->segment.stop);
392             stop_running_time = MAX (last_stop_running_time, 0);
393
394             if (stop_running_time != last_stop_running_time) {
395               GST_WARNING_OBJECT (pad,
396                   "Gap between last_stop and segment stop: %" GST_TIME_FORMAT
397                   " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
398                   GST_TIME_ARGS (last_stop_running_time));
399             }
400
401             if (stop_running_time < last_stop_running_time) {
402               GST_DEBUG_OBJECT (pad, "Updating stop position");
403               gst_pad_push_event (stream->srcpad,
404                   gst_event_new_new_segment (TRUE, stream->segment.rate,
405                       stream->segment.applied_rate, GST_FORMAT_TIME,
406                       stream->segment.start, stream->segment.last_stop,
407                       stream->segment.time));
408               gst_segment_set_newsegment (&stream->segment, TRUE,
409                   stream->segment.rate, stream->segment.applied_rate,
410                   GST_FORMAT_TIME, stream->segment.start,
411                   stream->segment.last_stop, stream->segment.time);
412             }
413             stop_running_time = MAX (stop_running_time, last_stop_running_time);
414             GST_DEBUG_OBJECT (pad,
415                 "Stop running time of last group: %" GST_TIME_FORMAT,
416                 GST_TIME_ARGS (stop_running_time));
417           }
418           stream->new_stream = FALSE;
419           stream->drop_discont = TRUE;
420
421           if (stop_running_time < self->group_start_time) {
422             gint64 diff = self->group_start_time - stop_running_time;
423
424             GST_DEBUG_OBJECT (pad,
425                 "Advancing running time for other streams by: %"
426                 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
427             gst_pad_push_event (stream->srcpad,
428                 gst_event_new_new_segment (FALSE, 1.0, 1.0,
429                     GST_FORMAT_TIME, 0, diff, 0));
430             gst_segment_set_newsegment (&stream->segment, FALSE, 1.0, 1.0,
431                 GST_FORMAT_TIME, 0, diff, 0);
432           }
433         }
434
435         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
436             &stream->segment);
437         gst_segment_set_newsegment (&stream->segment, update, rate,
438             applied_rate, format, start, stop, position);
439         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
440             &stream->segment);
441
442         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
443             GST_TIME_ARGS (stream->segment.accum));
444         stream->running_time_diff = stream->segment.accum;
445       } else if (stream) {
446         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
447             gst_format_get_name (format));
448         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
449       }
450       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
451       break;
452     }
453     case GST_EVENT_FLUSH_STOP:{
454       GstStream *stream;
455
456       GST_STREAM_SYNCHRONIZER_LOCK (self);
457       stream = gst_pad_get_element_private (pad);
458       if (stream) {
459         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
460             stream->stream_number);
461         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
462
463         stream->is_eos = FALSE;
464         stream->wait = FALSE;
465         stream->new_stream = FALSE;
466         stream->drop_discont = FALSE;
467         stream->seen_data = FALSE;
468       }
469       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
470       break;
471     }
472     case GST_EVENT_EOS:{
473       GstStream *stream;
474       GList *l;
475       gboolean all_eos = TRUE;
476       gboolean seen_data;
477       GSList *pads = NULL;
478       GstPad *srcpad;
479
480       GST_STREAM_SYNCHRONIZER_LOCK (self);
481       stream = gst_pad_get_element_private (pad);
482       if (!stream) {
483         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
484         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
485         break;
486       }
487
488       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
489       stream->is_eos = TRUE;
490
491       seen_data = stream->seen_data;
492       srcpad = gst_object_ref (stream->srcpad);
493
494       for (l = self->streams; l; l = l->next) {
495         GstStream *ostream = l->data;
496
497         all_eos = all_eos && ostream->is_eos;
498         if (!all_eos)
499           break;
500       }
501
502       if (all_eos) {
503         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
504         for (l = self->streams; l; l = l->next) {
505           GstStream *ostream = l->data;
506           /* local snapshot of current pads */
507           gst_object_ref (ostream->srcpad);
508           pads = g_slist_prepend (pads, ostream->srcpad);
509         }
510       }
511       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
512       /* drop lock when sending eos, which may block in e.g. preroll */
513       if (pads) {
514         GstPad *pad;
515         GSList *epad;
516
517         ret = TRUE;
518         epad = pads;
519         while (epad) {
520           pad = epad->data;
521           GST_DEBUG_OBJECT (pad, "Pushing EOS");
522           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
523           gst_object_unref (pad);
524           epad = g_slist_next (epad);
525         }
526         g_slist_free (pads);
527       } else {
528         /* if EOS, but no data has passed, then send something to replace EOS
529          * for preroll purposes */
530         if (!seen_data) {
531           GstBuffer *buf = gst_buffer_new ();
532
533           GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
534           gst_pad_push (srcpad, buf);
535         }
536       }
537       gst_object_unref (srcpad);
538       goto done;
539       break;
540     }
541     default:
542       break;
543   }
544
545 skip_adjustments:
546
547   opad = gst_stream_get_other_pad_from_pad (pad);
548   if (opad) {
549     ret = gst_pad_push_event (opad, event);
550     gst_object_unref (opad);
551   }
552
553 done:
554   gst_object_unref (self);
555
556   return ret;
557 }
558
559 static GstFlowReturn
560 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
561 {
562   GstStreamSynchronizer *self =
563       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
564   GstPad *opad;
565   GstFlowReturn ret = GST_FLOW_ERROR;
566   GstStream *stream;
567   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
568   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
569
570   if (passthrough) {
571     opad = gst_stream_get_other_pad_from_pad (pad);
572     if (opad) {
573       ret = gst_pad_push (opad, buffer);
574       gst_object_unref (opad);
575     }
576     goto done;
577   }
578
579   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
580       GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
581       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
582       buffer, gst_buffer_get_size (buffer),
583       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
584       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
585       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
586
587   timestamp = GST_BUFFER_TIMESTAMP (buffer);
588   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
589       && GST_BUFFER_DURATION_IS_VALID (buffer))
590     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
591
592   GST_STREAM_SYNCHRONIZER_LOCK (self);
593   stream = gst_pad_get_element_private (pad);
594
595   stream->seen_data = TRUE;
596   if (stream && stream->drop_discont) {
597     buffer = gst_buffer_make_writable (buffer);
598     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
599     stream->drop_discont = FALSE;
600   }
601
602   if (stream && stream->segment.format == GST_FORMAT_TIME
603       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
604     GST_LOG_OBJECT (pad,
605         "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
606         GST_TIME_ARGS (stream->segment.last_stop), GST_TIME_ARGS (timestamp));
607     gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME, timestamp);
608   }
609   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
610
611   opad = gst_stream_get_other_pad_from_pad (pad);
612   if (opad) {
613     ret = gst_pad_push (opad, buffer);
614     gst_object_unref (opad);
615   }
616
617   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
618   if (ret == GST_FLOW_OK) {
619     GList *l;
620
621     GST_STREAM_SYNCHRONIZER_LOCK (self);
622     stream = gst_pad_get_element_private (pad);
623     if (stream && stream->segment.format == GST_FORMAT_TIME
624         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
625       GST_LOG_OBJECT (pad,
626           "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
627           GST_TIME_ARGS (stream->segment.last_stop),
628           GST_TIME_ARGS (timestamp_end));
629       gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME,
630           timestamp_end);
631     }
632
633     /* Advance EOS streams if necessary. For non-EOS
634      * streams the demuxers should already do this! */
635     for (l = self->streams; l; l = l->next) {
636       GstStream *ostream = l->data;
637       gint64 last_stop;
638
639       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
640         continue;
641
642       if (ostream->segment.last_stop != -1)
643         last_stop = ostream->segment.last_stop;
644       else
645         last_stop = ostream->segment.start;
646
647       /* Is there a 1 second lag? */
648       if (last_stop != -1 && last_stop + GST_SECOND < timestamp_end) {
649         gint64 new_start, new_stop;
650
651         new_start = timestamp_end - GST_SECOND;
652         if (ostream->segment.stop == -1)
653           new_stop = -1;
654         else
655           new_stop = MAX (new_start, ostream->segment.stop);
656
657         GST_DEBUG_OBJECT (ostream->sinkpad,
658             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
659             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (last_stop),
660             GST_TIME_ARGS (new_start));
661
662         gst_pad_push_event (ostream->srcpad,
663             gst_event_new_new_segment (TRUE, ostream->segment.rate,
664                 ostream->segment.applied_rate, ostream->segment.format,
665                 new_start, new_stop, new_start));
666         gst_segment_set_newsegment (&ostream->segment, TRUE,
667             ostream->segment.rate, ostream->segment.applied_rate,
668             ostream->segment.format, new_start, new_stop, new_start);
669         gst_segment_set_last_stop (&ostream->segment, GST_FORMAT_TIME,
670             new_start);
671       }
672     }
673     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
674   }
675
676 done:
677
678   gst_object_unref (self);
679
680   return ret;
681 }
682
683 /* GstElement vfuncs */
684 static GstPad *
685 gst_stream_synchronizer_request_new_pad (GstElement * element,
686     GstPadTemplate * temp, const gchar * name)
687 {
688   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
689   GstStream *stream;
690   gchar *tmp;
691
692   GST_STREAM_SYNCHRONIZER_LOCK (self);
693   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
694       self->current_stream_number);
695
696   stream = g_slice_new0 (GstStream);
697   stream->transform = self;
698   stream->stream_number = self->current_stream_number;
699
700   tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
701   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
702   g_free (tmp);
703   gst_pad_set_element_private (stream->sinkpad, stream);
704   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
705       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
706   gst_pad_set_query_function (stream->sinkpad,
707       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
708   gst_pad_set_getcaps_function (stream->sinkpad,
709       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
710   gst_pad_set_acceptcaps_function (stream->sinkpad,
711       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
712   gst_pad_set_event_function (stream->sinkpad,
713       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
714   gst_pad_set_chain_function (stream->sinkpad,
715       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
716
717   tmp = g_strdup_printf ("src_%d", self->current_stream_number);
718   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
719   g_free (tmp);
720   gst_pad_set_element_private (stream->srcpad, stream);
721   gst_pad_set_iterate_internal_links_function (stream->srcpad,
722       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
723   gst_pad_set_query_function (stream->srcpad,
724       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
725   gst_pad_set_getcaps_function (stream->srcpad,
726       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
727   gst_pad_set_acceptcaps_function (stream->srcpad,
728       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
729   gst_pad_set_event_function (stream->srcpad,
730       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
731
732   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
733
734   self->streams = g_list_prepend (self->streams, stream);
735   self->current_stream_number++;
736   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
737
738   /* Add pads and activate unless we're going to NULL */
739   g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
740   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
741     gst_pad_set_active (stream->srcpad, TRUE);
742     gst_pad_set_active (stream->sinkpad, TRUE);
743   }
744   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
745   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
746   g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
747
748   return stream->sinkpad;
749 }
750
751 /* Must be called with lock! */
752 static void
753 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
754     GstStream * stream)
755 {
756   GList *l;
757
758   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
759
760   for (l = self->streams; l; l = l->next) {
761     if (l->data == stream) {
762       self->streams = g_list_delete_link (self->streams, l);
763       break;
764     }
765   }
766   g_assert (l != NULL);
767
768   /* we can drop the lock, since stream exists now only local.
769    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
770    * (due to reverse lock order) when deactivating pads */
771   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
772
773   gst_pad_set_element_private (stream->srcpad, NULL);
774   gst_pad_set_element_private (stream->sinkpad, NULL);
775   gst_pad_set_active (stream->srcpad, FALSE);
776   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
777   gst_pad_set_active (stream->sinkpad, FALSE);
778   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
779
780   if (stream->segment.format == GST_FORMAT_TIME) {
781     gint64 stop_running_time;
782     gint64 last_stop_running_time;
783
784     stop_running_time =
785         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
786         stream->segment.stop);
787     last_stop_running_time =
788         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
789         stream->segment.last_stop);
790     stop_running_time = MAX (stop_running_time, last_stop_running_time);
791
792     GST_DEBUG_OBJECT (stream->sinkpad,
793         "Stop running time was: %" GST_TIME_FORMAT,
794         GST_TIME_ARGS (stop_running_time));
795
796     self->group_start_time = MAX (self->group_start_time, stop_running_time);
797   }
798
799   g_slice_free (GstStream, stream);
800
801   /* NOTE: In theory we have to check here if all streams
802    * are EOS but the one that was removed wasn't and then
803    * send EOS downstream. But due to the way how playsink
804    * works this is not necessary and will only cause problems
805    * for gapless playback. playsink will only add/remove pads
806    * when it's reconfigured, which happens when the streams
807    * change
808    */
809
810   /* lock for good measure, since the caller had it */
811   GST_STREAM_SYNCHRONIZER_LOCK (self);
812 }
813
814 static void
815 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
816 {
817   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
818   GstStream *stream;
819
820   GST_STREAM_SYNCHRONIZER_LOCK (self);
821   stream = gst_pad_get_element_private (pad);
822   if (stream) {
823     g_assert (stream->sinkpad == pad);
824
825     gst_stream_synchronizer_release_stream (self, stream);
826   }
827   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
828 }
829
830 static GstStateChangeReturn
831 gst_stream_synchronizer_change_state (GstElement * element,
832     GstStateChange transition)
833 {
834   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
835   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
836
837   switch (transition) {
838     case GST_STATE_CHANGE_NULL_TO_READY:
839       GST_DEBUG_OBJECT (self, "State change NULL->READY");
840       self->shutdown = FALSE;
841       break;
842     case GST_STATE_CHANGE_READY_TO_PAUSED:
843       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
844       self->group_start_time = 0;
845       self->shutdown = FALSE;
846       break;
847     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
848       GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
849       break;
850     case GST_STATE_CHANGE_PAUSED_TO_READY:
851       GST_DEBUG_OBJECT (self, "State change READY->NULL");
852
853       GST_STREAM_SYNCHRONIZER_LOCK (self);
854       g_cond_broadcast (self->stream_finish_cond);
855       self->shutdown = TRUE;
856       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
857     default:
858       break;
859   }
860
861   {
862     GstStateChangeReturn bret;
863
864     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
865     GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
866     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
867       return ret;
868   }
869
870   switch (transition) {
871     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
872       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
873       break;
874     case GST_STATE_CHANGE_PAUSED_TO_READY:{
875       GList *l;
876
877       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
878       self->group_start_time = 0;
879
880       GST_STREAM_SYNCHRONIZER_LOCK (self);
881       for (l = self->streams; l; l = l->next) {
882         GstStream *stream = l->data;
883
884         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
885         stream->wait = FALSE;
886         stream->new_stream = FALSE;
887         stream->drop_discont = FALSE;
888         stream->is_eos = FALSE;
889       }
890       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
891       break;
892     }
893     case GST_STATE_CHANGE_READY_TO_NULL:{
894       GST_DEBUG_OBJECT (self, "State change READY->NULL");
895
896       GST_STREAM_SYNCHRONIZER_LOCK (self);
897       while (self->streams)
898         gst_stream_synchronizer_release_stream (self, self->streams->data);
899       self->current_stream_number = 0;
900       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
901       break;
902     }
903     default:
904       break;
905   }
906
907   return ret;
908 }
909
910 /* GObject vfuncs */
911 static void
912 gst_stream_synchronizer_finalize (GObject * object)
913 {
914   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
915
916   if (self->lock) {
917     g_mutex_free (self->lock);
918     self->lock = NULL;
919   }
920
921   if (self->stream_finish_cond) {
922     g_cond_free (self->stream_finish_cond);
923     self->stream_finish_cond = NULL;
924   }
925
926   G_OBJECT_CLASS (parent_class)->finalize (object);
927 }
928
929 /* GObject type initialization */
930 static void
931 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
932 {
933   self->lock = g_mutex_new ();
934   self->stream_finish_cond = g_cond_new ();
935 }
936
937 static void
938 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
939 {
940   GObjectClass *gobject_class = (GObjectClass *) klass;
941   GstElementClass *element_class = (GstElementClass *) klass;
942
943   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
944       "streamsynchronizer", 0, "Stream Synchronizer");
945
946   gobject_class->finalize = gst_stream_synchronizer_finalize;
947
948   gst_element_class_add_pad_template (element_class,
949       gst_static_pad_template_get (&srctemplate));
950   gst_element_class_add_pad_template (element_class,
951       gst_static_pad_template_get (&sinktemplate));
952
953   gst_element_class_set_details_simple (element_class,
954       "Stream Synchronizer", "Generic",
955       "Synchronizes a group of streams to have equal durations and starting points",
956       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
957
958   element_class->change_state =
959       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
960   element_class->request_new_pad =
961       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
962   element_class->release_pad =
963       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
964 }