9a48ab06d4bdf311056a36fa9afc925788af6fef
[framework/multimedia/gst-plugins-base0.10.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
30     GST_LOG_OBJECT (obj,                                                \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
34     GST_LOG_OBJECT (obj,                                                \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
40     GST_LOG_OBJECT (obj,                                                \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54
55 static const gboolean passthrough = TRUE;
56
57 GST_BOILERPLATE (GstStreamSynchronizer, gst_stream_synchronizer,
58     GstElement, GST_TYPE_ELEMENT);
59
60 typedef struct
61 {
62   GstStreamSynchronizer *transform;
63   guint stream_number;
64   GstPad *srcpad;
65   GstPad *sinkpad;
66   GstSegment segment;
67
68   gboolean wait;
69   gboolean new_stream;
70   gboolean drop_discont;
71   gboolean is_eos;
72   gboolean seen_data;
73
74   gint64 running_time_diff;
75 } GstStream;
76
77 /* Must be called with lock! */
78 static GstPad *
79 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
80 {
81   if (stream->sinkpad == pad)
82     return gst_object_ref (stream->srcpad);
83   else if (stream->srcpad == pad)
84     return gst_object_ref (stream->sinkpad);
85
86   return NULL;
87 }
88
89 static GstPad *
90 gst_stream_get_other_pad_from_pad (GstPad * pad)
91 {
92   GstStreamSynchronizer *self =
93       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
94   GstStream *stream;
95   GstPad *opad = NULL;
96
97   GST_STREAM_SYNCHRONIZER_LOCK (self);
98   stream = gst_pad_get_element_private (pad);
99   if (!stream)
100     goto out;
101
102   opad = gst_stream_get_other_pad (stream, pad);
103
104 out:
105   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
106   gst_object_unref (self);
107
108   if (!opad)
109     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
110
111   return opad;
112 }
113
114 /* Generic pad functions */
115 static GstIterator *
116 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
117 {
118   GstIterator *it = NULL;
119   GstPad *opad;
120
121   opad = gst_stream_get_other_pad_from_pad (pad);
122   if (opad) {
123     it = gst_iterator_new_single (GST_TYPE_PAD, opad,
124         (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
125     gst_object_unref (opad);
126   }
127
128   return it;
129 }
130
131 static gboolean
132 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
133 {
134   GstPad *opad;
135   gboolean ret = FALSE;
136
137   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
138
139   opad = gst_stream_get_other_pad_from_pad (pad);
140   if (opad) {
141     ret = gst_pad_peer_query (opad, query);
142     gst_object_unref (opad);
143   }
144
145   return ret;
146 }
147
148 static GstCaps *
149 gst_stream_synchronizer_getcaps (GstPad * pad)
150 {
151   GstPad *opad;
152   GstCaps *ret = NULL;
153
154   opad = gst_stream_get_other_pad_from_pad (pad);
155   if (opad) {
156     ret = gst_pad_peer_get_caps (opad);
157     gst_object_unref (opad);
158   }
159
160   if (ret == NULL)
161     ret = gst_caps_new_any ();
162
163   GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
164
165   return ret;
166 }
167
168 static gboolean
169 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
170 {
171   GstPad *opad;
172   gboolean ret = FALSE;
173
174   opad = gst_stream_get_other_pad_from_pad (pad);
175   if (opad) {
176     ret = gst_pad_peer_accept_caps (opad, caps);
177     gst_object_unref (opad);
178   }
179
180   GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
181       caps);
182
183   return ret;
184 }
185
186 /* srcpad functions */
187 static gboolean
188 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
189 {
190   GstStreamSynchronizer *self =
191       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
192   GstPad *opad;
193   gboolean ret = FALSE;
194
195   if (passthrough)
196     goto skip_adjustments;
197
198   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
199       GST_EVENT_TYPE_NAME (event), event->structure);
200
201   switch (GST_EVENT_TYPE (event)) {
202     case GST_EVENT_QOS:{
203       gdouble proportion;
204       GstClockTimeDiff diff;
205       GstClockTime timestamp;
206       gint64 running_time_diff;
207       GstStream *stream;
208
209       gst_event_parse_qos (event, &proportion, &diff, &timestamp);
210       gst_event_unref (event);
211
212       GST_STREAM_SYNCHRONIZER_LOCK (self);
213       stream = gst_pad_get_element_private (pad);
214       if (stream)
215         running_time_diff = stream->running_time_diff;
216       else
217         running_time_diff = -1;
218       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
219
220       if (running_time_diff == -1) {
221         GST_WARNING_OBJECT (pad, "QOS event before group start");
222         goto out;
223       } else if (timestamp < running_time_diff) {
224         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
225         goto out;
226       }
227
228       GST_LOG_OBJECT (pad,
229           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
230           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
231           GST_TIME_ARGS (running_time_diff),
232           GST_TIME_ARGS (timestamp - running_time_diff));
233
234       timestamp -= running_time_diff;
235
236       /* That case is invalid for QoS events */
237       if (diff < 0 && -diff > timestamp) {
238         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
239         ret = TRUE;
240         goto out;
241       }
242
243       event = gst_event_new_qos (proportion, diff, timestamp);
244       break;
245     }
246     default:
247       break;
248   }
249
250 skip_adjustments:
251
252   opad = gst_stream_get_other_pad_from_pad (pad);
253   if (opad) {
254     ret = gst_pad_push_event (opad, event);
255     gst_object_unref (opad);
256   }
257
258 out:
259   gst_object_unref (self);
260
261   return ret;
262 }
263
264 /* sinkpad functions */
265 static gboolean
266 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
267 {
268   GstStreamSynchronizer *self =
269       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
270   GstPad *opad;
271   gboolean ret = FALSE;
272
273   if (passthrough)
274     goto skip_adjustments;
275
276   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
277       GST_EVENT_TYPE_NAME (event), event->structure);
278
279   switch (GST_EVENT_TYPE (event)) {
280     case GST_EVENT_SINK_MESSAGE:{
281       GstMessage *message;
282
283       gst_event_parse_sink_message (event, &message);
284       if (message->structure
285           && gst_structure_has_name (message->structure,
286               "playbin2-stream-changed")) {
287         GstStream *stream;
288
289         GST_STREAM_SYNCHRONIZER_LOCK (self);
290         stream = gst_pad_get_element_private (pad);
291         if (stream) {
292           GList *l;
293           gboolean all_wait = TRUE;
294
295           GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
296
297           stream->is_eos = FALSE;
298           stream->wait = TRUE;
299           stream->new_stream = TRUE;
300
301           for (l = self->streams; l; l = l->next) {
302             GstStream *ostream = l->data;
303
304             all_wait = all_wait && ostream->wait;
305             if (!all_wait)
306               break;
307           }
308           if (all_wait) {
309             gint64 last_stop = 0;
310
311             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
312
313             for (l = self->streams; l; l = l->next) {
314               GstStream *ostream = l->data;
315               gint64 stop_running_time;
316               gint64 last_stop_running_time;
317
318               ostream->wait = FALSE;
319
320               stop_running_time =
321                   gst_segment_to_running_time (&ostream->segment,
322                   GST_FORMAT_TIME, ostream->segment.stop);
323               last_stop_running_time =
324                   gst_segment_to_running_time (&ostream->segment,
325                   GST_FORMAT_TIME, ostream->segment.last_stop);
326               last_stop =
327                   MAX (last_stop, MAX (stop_running_time,
328                       last_stop_running_time));
329             }
330             last_stop = MAX (0, last_stop);
331             self->group_start_time = MAX (self->group_start_time, last_stop);
332
333             GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
334                 GST_TIME_ARGS (self->group_start_time));
335
336             g_cond_broadcast (self->stream_finish_cond);
337           }
338         }
339         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
340       }
341       gst_message_unref (message);
342       break;
343     }
344     case GST_EVENT_NEWSEGMENT:{
345       GstStream *stream;
346       gboolean update;
347       gdouble rate, applied_rate;
348       GstFormat format;
349       gint64 start, stop, position;
350
351       gst_event_parse_new_segment_full (event,
352           &update, &rate, &applied_rate, &format, &start, &stop, &position);
353
354       GST_STREAM_SYNCHRONIZER_LOCK (self);
355       stream = gst_pad_get_element_private (pad);
356       if (stream) {
357         if (stream->wait) {
358           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
359           g_cond_wait (self->stream_finish_cond, self->lock);
360           stream = gst_pad_get_element_private (pad);
361           if (stream)
362             stream->wait = FALSE;
363         }
364       }
365
366       if (self->shutdown) {
367         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
368         gst_event_unref (event);
369         goto done;
370       }
371
372       if (stream && format == GST_FORMAT_TIME) {
373         if (stream->new_stream) {
374           gint64 last_stop_running_time = 0;
375           gint64 stop_running_time = 0;
376
377           if (stream->segment.format == GST_FORMAT_TIME) {
378             last_stop_running_time =
379                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
380                 stream->segment.last_stop);
381             last_stop_running_time = MAX (last_stop_running_time, 0);
382             stop_running_time =
383                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
384                 stream->segment.stop);
385             stop_running_time = MAX (last_stop_running_time, 0);
386
387             if (stop_running_time != last_stop_running_time) {
388               GST_WARNING_OBJECT (pad,
389                   "Gap between last_stop and segment stop: %" GST_TIME_FORMAT
390                   " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
391                   GST_TIME_ARGS (last_stop_running_time));
392             }
393
394             if (stop_running_time < last_stop_running_time) {
395               GST_DEBUG_OBJECT (pad, "Updating stop position");
396               gst_pad_push_event (stream->srcpad,
397                   gst_event_new_new_segment_full (TRUE, stream->segment.rate,
398                       stream->segment.applied_rate, GST_FORMAT_TIME,
399                       stream->segment.start, stream->segment.last_stop,
400                       stream->segment.time));
401               gst_segment_set_newsegment_full (&stream->segment, TRUE,
402                   stream->segment.rate, stream->segment.applied_rate,
403                   GST_FORMAT_TIME, stream->segment.start,
404                   stream->segment.last_stop, stream->segment.time);
405             }
406             stop_running_time = MAX (stop_running_time, last_stop_running_time);
407             GST_DEBUG_OBJECT (pad,
408                 "Stop running time of last group: %" GST_TIME_FORMAT,
409                 GST_TIME_ARGS (stop_running_time));
410           }
411           stream->new_stream = FALSE;
412           stream->drop_discont = TRUE;
413
414           if (stop_running_time < self->group_start_time) {
415             gint64 diff = self->group_start_time - stop_running_time;
416
417             GST_DEBUG_OBJECT (pad,
418                 "Advancing running time for other streams by: %"
419                 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
420             gst_pad_push_event (stream->srcpad,
421                 gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
422                     GST_FORMAT_TIME, 0, diff, 0));
423             gst_segment_set_newsegment_full (&stream->segment, FALSE, 1.0, 1.0,
424                 GST_FORMAT_TIME, 0, diff, 0);
425           }
426         }
427
428         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
429             &stream->segment);
430         gst_segment_set_newsegment_full (&stream->segment, update, rate,
431             applied_rate, format, start, stop, position);
432         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
433             &stream->segment);
434
435         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
436             GST_TIME_ARGS (stream->segment.accum));
437         stream->running_time_diff = stream->segment.accum;
438       } else if (stream) {
439         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
440             gst_format_get_name (format));
441         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
442       }
443       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
444       break;
445     }
446     case GST_EVENT_FLUSH_STOP:{
447       GstStream *stream;
448
449       GST_STREAM_SYNCHRONIZER_LOCK (self);
450       stream = gst_pad_get_element_private (pad);
451       if (stream) {
452         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
453             stream->stream_number);
454         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
455
456         stream->is_eos = FALSE;
457         stream->wait = FALSE;
458         stream->new_stream = FALSE;
459         stream->drop_discont = FALSE;
460         stream->seen_data = FALSE;
461       }
462       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
463       break;
464     }
465     case GST_EVENT_EOS:{
466       GstStream *stream;
467       GList *l;
468       gboolean all_eos = TRUE;
469       gboolean seen_data;
470       GSList *pads = NULL;
471       GstPad *srcpad;
472
473       GST_STREAM_SYNCHRONIZER_LOCK (self);
474       stream = gst_pad_get_element_private (pad);
475       if (!stream) {
476         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
477         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
478         break;
479       }
480
481       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
482       stream->is_eos = TRUE;
483
484       seen_data = stream->seen_data;
485       srcpad = gst_object_ref (stream->srcpad);
486
487       for (l = self->streams; l; l = l->next) {
488         GstStream *ostream = l->data;
489
490         all_eos = all_eos && ostream->is_eos;
491         if (!all_eos)
492           break;
493       }
494
495       if (all_eos) {
496         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
497         for (l = self->streams; l; l = l->next) {
498           GstStream *ostream = l->data;
499           /* local snapshot of current pads */
500           gst_object_ref (ostream->srcpad);
501           pads = g_slist_prepend (pads, ostream->srcpad);
502         }
503       }
504       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
505       /* drop lock when sending eos, which may block in e.g. preroll */
506       if (pads) {
507         GstPad *pad;
508         GSList *epad;
509
510         ret = TRUE;
511         epad = pads;
512         while (epad) {
513           pad = epad->data;
514           GST_DEBUG_OBJECT (pad, "Pushing EOS");
515           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
516           gst_object_unref (pad);
517           epad = g_slist_next (epad);
518         }
519         g_slist_free (pads);
520       } else {
521         /* if EOS, but no data has passed, then send something to replace EOS
522          * for preroll purposes */
523         if (!seen_data) {
524           GstBuffer *buf = gst_buffer_new ();
525
526           GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
527           gst_pad_push (srcpad, buf);
528         }
529       }
530       gst_object_unref (srcpad);
531       goto done;
532       break;
533     }
534     default:
535       break;
536   }
537
538 skip_adjustments:
539
540   opad = gst_stream_get_other_pad_from_pad (pad);
541   if (opad) {
542     ret = gst_pad_push_event (opad, event);
543     gst_object_unref (opad);
544   }
545
546 done:
547   gst_object_unref (self);
548
549   return ret;
550 }
551
552 static GstFlowReturn
553 gst_stream_synchronizer_sink_bufferalloc (GstPad * pad, guint64 offset,
554     guint size, GstCaps * caps, GstBuffer ** buf)
555 {
556   GstPad *opad;
557   GstFlowReturn ret = GST_FLOW_ERROR;
558
559   GST_LOG_OBJECT (pad, "Allocating buffer: size=%u", size);
560
561   opad = gst_stream_get_other_pad_from_pad (pad);
562   if (opad) {
563     ret = gst_pad_alloc_buffer (opad, offset, size, caps, buf);
564     gst_object_unref (opad);
565   }
566
567   GST_LOG_OBJECT (pad, "Allocation: %s", gst_flow_get_name (ret));
568
569   return ret;
570 }
571
572 static GstFlowReturn
573 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
574 {
575   GstStreamSynchronizer *self =
576       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
577   GstPad *opad;
578   GstFlowReturn ret = GST_FLOW_ERROR;
579   GstStream *stream;
580   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
581   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
582
583   if (passthrough) {
584     opad = gst_stream_get_other_pad_from_pad (pad);
585     if (opad) {
586       ret = gst_pad_push (opad, buffer);
587       gst_object_unref (opad);
588     }
589     goto done;
590   }
591
592   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
593       GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
594       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
595       buffer, GST_BUFFER_SIZE (buffer),
596       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
597       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
598       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
599
600   timestamp = GST_BUFFER_TIMESTAMP (buffer);
601   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
602       && GST_BUFFER_DURATION_IS_VALID (buffer))
603     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
604
605   GST_STREAM_SYNCHRONIZER_LOCK (self);
606   stream = gst_pad_get_element_private (pad);
607
608   stream->seen_data = TRUE;
609   if (stream && stream->drop_discont) {
610     buffer = gst_buffer_make_metadata_writable (buffer);
611     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
612     stream->drop_discont = FALSE;
613   }
614
615   if (stream && stream->segment.format == GST_FORMAT_TIME
616       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
617     GST_LOG_OBJECT (pad,
618         "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
619         GST_TIME_ARGS (stream->segment.last_stop), GST_TIME_ARGS (timestamp));
620     gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME, timestamp);
621   }
622   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
623
624   opad = gst_stream_get_other_pad_from_pad (pad);
625   if (opad) {
626     ret = gst_pad_push (opad, buffer);
627     gst_object_unref (opad);
628   }
629
630   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
631   if (ret == GST_FLOW_OK) {
632     GList *l;
633
634     GST_STREAM_SYNCHRONIZER_LOCK (self);
635     stream = gst_pad_get_element_private (pad);
636     if (stream && stream->segment.format == GST_FORMAT_TIME
637         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
638       GST_LOG_OBJECT (pad,
639           "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
640           GST_TIME_ARGS (stream->segment.last_stop),
641           GST_TIME_ARGS (timestamp_end));
642       gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME,
643           timestamp_end);
644     }
645
646     /* Advance EOS streams if necessary. For non-EOS
647      * streams the demuxers should already do this! */
648     for (l = self->streams; l; l = l->next) {
649       GstStream *ostream = l->data;
650       gint64 last_stop;
651
652       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
653         continue;
654
655       if (ostream->segment.last_stop != -1)
656         last_stop = ostream->segment.last_stop;
657       else
658         last_stop = ostream->segment.start;
659
660       /* Is there a 1 second lag? */
661       if (last_stop != -1 && last_stop + GST_SECOND < timestamp_end) {
662         gint64 new_start, new_stop;
663
664         new_start = timestamp_end - GST_SECOND;
665         if (ostream->segment.stop == -1)
666           new_stop = -1;
667         else
668           new_stop = MAX (new_start, ostream->segment.stop);
669
670         GST_DEBUG_OBJECT (ostream->sinkpad,
671             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
672             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (last_stop),
673             GST_TIME_ARGS (new_start));
674
675         gst_pad_push_event (ostream->srcpad,
676             gst_event_new_new_segment_full (TRUE, ostream->segment.rate,
677                 ostream->segment.applied_rate, ostream->segment.format,
678                 new_start, new_stop, new_start));
679         gst_segment_set_newsegment_full (&ostream->segment, TRUE,
680             ostream->segment.rate, ostream->segment.applied_rate,
681             ostream->segment.format, new_start, new_stop, new_start);
682         gst_segment_set_last_stop (&ostream->segment, GST_FORMAT_TIME,
683             new_start);
684       }
685     }
686     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
687   }
688
689 done:
690
691   gst_object_unref (self);
692
693   return ret;
694 }
695
696 /* GstElement vfuncs */
697 static GstPad *
698 gst_stream_synchronizer_request_new_pad (GstElement * element,
699     GstPadTemplate * temp, const gchar * name)
700 {
701   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
702   GstStream *stream;
703   gchar *tmp;
704
705   GST_STREAM_SYNCHRONIZER_LOCK (self);
706   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
707       self->current_stream_number);
708
709   stream = g_slice_new0 (GstStream);
710   stream->transform = self;
711   stream->stream_number = self->current_stream_number;
712
713   tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
714   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
715   g_free (tmp);
716   gst_pad_set_element_private (stream->sinkpad, stream);
717   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
718       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
719   gst_pad_set_query_function (stream->sinkpad,
720       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
721   gst_pad_set_getcaps_function (stream->sinkpad,
722       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
723   gst_pad_set_acceptcaps_function (stream->sinkpad,
724       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
725   gst_pad_set_event_function (stream->sinkpad,
726       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
727   gst_pad_set_chain_function (stream->sinkpad,
728       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
729   gst_pad_set_bufferalloc_function (stream->sinkpad,
730       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_bufferalloc));
731
732   tmp = g_strdup_printf ("src_%d", self->current_stream_number);
733   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
734   g_free (tmp);
735   gst_pad_set_element_private (stream->srcpad, stream);
736   gst_pad_set_iterate_internal_links_function (stream->srcpad,
737       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
738   gst_pad_set_query_function (stream->srcpad,
739       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
740   gst_pad_set_getcaps_function (stream->srcpad,
741       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
742   gst_pad_set_acceptcaps_function (stream->srcpad,
743       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
744   gst_pad_set_event_function (stream->srcpad,
745       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
746
747   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
748
749   self->streams = g_list_prepend (self->streams, stream);
750   self->current_stream_number++;
751   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
752
753   /* Add pads and activate unless we're going to NULL */
754   g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
755   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
756     gst_pad_set_active (stream->srcpad, TRUE);
757     gst_pad_set_active (stream->sinkpad, TRUE);
758   }
759   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
760   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
761   g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
762
763   return stream->sinkpad;
764 }
765
766 /* Must be called with lock! */
767 static void
768 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
769     GstStream * stream)
770 {
771   GList *l;
772
773   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
774
775   for (l = self->streams; l; l = l->next) {
776     if (l->data == stream) {
777       self->streams = g_list_delete_link (self->streams, l);
778       break;
779     }
780   }
781   g_assert (l != NULL);
782
783   /* we can drop the lock, since stream exists now only local.
784    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
785    * (due to reverse lock order) when deactivating pads */
786   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
787
788   gst_pad_set_element_private (stream->srcpad, NULL);
789   gst_pad_set_element_private (stream->sinkpad, NULL);
790   gst_pad_set_active (stream->srcpad, FALSE);
791   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
792   gst_pad_set_active (stream->sinkpad, FALSE);
793   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
794
795   if (stream->segment.format == GST_FORMAT_TIME) {
796     gint64 stop_running_time;
797     gint64 last_stop_running_time;
798
799     stop_running_time =
800         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
801         stream->segment.stop);
802     last_stop_running_time =
803         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
804         stream->segment.last_stop);
805     stop_running_time = MAX (stop_running_time, last_stop_running_time);
806
807     GST_DEBUG_OBJECT (stream->sinkpad,
808         "Stop running time was: %" GST_TIME_FORMAT,
809         GST_TIME_ARGS (stop_running_time));
810
811     self->group_start_time = MAX (self->group_start_time, stop_running_time);
812   }
813
814   g_slice_free (GstStream, stream);
815
816   /* NOTE: In theory we have to check here if all streams
817    * are EOS but the one that was removed wasn't and then
818    * send EOS downstream. But due to the way how playsink
819    * works this is not necessary and will only cause problems
820    * for gapless playback. playsink will only add/remove pads
821    * when it's reconfigured, which happens when the streams
822    * change
823    */
824
825   /* lock for good measure, since the caller had it */
826   GST_STREAM_SYNCHRONIZER_LOCK (self);
827 }
828
829 static void
830 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
831 {
832   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
833   GstStream *stream;
834
835   GST_STREAM_SYNCHRONIZER_LOCK (self);
836   stream = gst_pad_get_element_private (pad);
837   if (stream) {
838     g_assert (stream->sinkpad == pad);
839
840     gst_stream_synchronizer_release_stream (self, stream);
841   }
842   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
843 }
844
845 static GstStateChangeReturn
846 gst_stream_synchronizer_change_state (GstElement * element,
847     GstStateChange transition)
848 {
849   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
850   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
851
852   switch (transition) {
853     case GST_STATE_CHANGE_NULL_TO_READY:
854       GST_DEBUG_OBJECT (self, "State change NULL->READY");
855       self->shutdown = FALSE;
856       break;
857     case GST_STATE_CHANGE_READY_TO_PAUSED:
858       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
859       self->group_start_time = 0;
860       self->shutdown = FALSE;
861       break;
862     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
863       GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
864       break;
865     case GST_STATE_CHANGE_PAUSED_TO_READY:
866       GST_DEBUG_OBJECT (self, "State change READY->NULL");
867
868       GST_STREAM_SYNCHRONIZER_LOCK (self);
869       g_cond_broadcast (self->stream_finish_cond);
870       self->shutdown = TRUE;
871       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
872     default:
873       break;
874   }
875
876   {
877     GstStateChangeReturn bret;
878
879     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
880     GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
881     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
882       return ret;
883   }
884
885   switch (transition) {
886     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
887       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
888       break;
889     case GST_STATE_CHANGE_PAUSED_TO_READY:{
890       GList *l;
891
892       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
893       self->group_start_time = 0;
894
895       GST_STREAM_SYNCHRONIZER_LOCK (self);
896       for (l = self->streams; l; l = l->next) {
897         GstStream *stream = l->data;
898
899         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
900         stream->wait = FALSE;
901         stream->new_stream = FALSE;
902         stream->drop_discont = FALSE;
903         stream->is_eos = FALSE;
904       }
905       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
906       break;
907     }
908     case GST_STATE_CHANGE_READY_TO_NULL:{
909       GST_DEBUG_OBJECT (self, "State change READY->NULL");
910
911       GST_STREAM_SYNCHRONIZER_LOCK (self);
912       while (self->streams)
913         gst_stream_synchronizer_release_stream (self, self->streams->data);
914       self->current_stream_number = 0;
915       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
916       break;
917     }
918     default:
919       break;
920   }
921
922   return ret;
923 }
924
925 /* GObject vfuncs */
926 static void
927 gst_stream_synchronizer_finalize (GObject * object)
928 {
929   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
930
931   if (self->lock) {
932     g_mutex_free (self->lock);
933     self->lock = NULL;
934   }
935
936   if (self->stream_finish_cond) {
937     g_cond_free (self->stream_finish_cond);
938     self->stream_finish_cond = NULL;
939   }
940
941   G_OBJECT_CLASS (parent_class)->finalize (object);
942 }
943
944 /* GObject type initialization */
945 static void
946 gst_stream_synchronizer_init (GstStreamSynchronizer * self,
947     GstStreamSynchronizerClass * klass)
948 {
949   self->lock = g_mutex_new ();
950   self->stream_finish_cond = g_cond_new ();
951 }
952
953 static void
954 gst_stream_synchronizer_base_init (gpointer g_class)
955 {
956   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
957
958   gst_element_class_add_pad_template (gstelement_class,
959       gst_static_pad_template_get (&srctemplate));
960   gst_element_class_add_pad_template (gstelement_class,
961       gst_static_pad_template_get (&sinktemplate));
962
963   gst_element_class_set_details_simple (gstelement_class,
964       "Stream Synchronizer", "Generic",
965       "Synchronizes a group of streams to have equal durations and starting points",
966       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
967 }
968
969 static void
970 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
971 {
972   GObjectClass *gobject_class = (GObjectClass *) klass;
973   GstElementClass *element_class = (GstElementClass *) klass;
974
975   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
976       "streamsynchronizer", 0, "Stream Synchronizer");
977
978   gobject_class->finalize = gst_stream_synchronizer_finalize;
979
980   element_class->change_state =
981       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
982   element_class->request_new_pad =
983       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
984   element_class->release_pad =
985       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
986 }