tizen 2.3 release
[framework/multimedia/gst-plugins-base0.10.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
25  * with newer GLib versions (>= 2.31.0) */
26 #define GLIB_DISABLE_DEPRECATION_WARNINGS
27
28 #include "gststreamsynchronizer.h"
29 #include "gst/glib-compat-private.h"
30
31 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
32 #define GST_CAT_DEFAULT stream_synchronizer_debug
33
34 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
35     GST_LOG_OBJECT (obj,                                                \
36                     "locking from thread %p",                           \
37                     g_thread_self ());                                  \
38     g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
39     GST_LOG_OBJECT (obj,                                                \
40                     "locked from thread %p",                            \
41                     g_thread_self ());                                  \
42 } G_STMT_END
43
44 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
45     GST_LOG_OBJECT (obj,                                                \
46                     "unlocking from thread %p",                         \
47                     g_thread_self ());                                  \
48     g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
49 } G_STMT_END
50
51 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
52     GST_PAD_SRC,
53     GST_PAD_SOMETIMES,
54     GST_STATIC_CAPS_ANY);
55 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
56     GST_PAD_SINK,
57     GST_PAD_REQUEST,
58     GST_STATIC_CAPS_ANY);
59
60 static const gboolean passthrough = TRUE;
61
62 GST_BOILERPLATE (GstStreamSynchronizer, gst_stream_synchronizer,
63     GstElement, GST_TYPE_ELEMENT);
64
65 typedef struct
66 {
67   GstStreamSynchronizer *transform;
68   guint stream_number;
69   GstPad *srcpad;
70   GstPad *sinkpad;
71   GstSegment segment;
72
73   gboolean wait;
74   gboolean new_stream;
75   gboolean drop_discont;
76   gboolean is_eos;
77   gboolean seen_data;
78
79   gint64 running_time_diff;
80 } GstStream;
81
82 /* Must be called with lock! */
83 static GstPad *
84 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
85 {
86   if (stream->sinkpad == pad)
87     return gst_object_ref (stream->srcpad);
88   else if (stream->srcpad == pad)
89     return gst_object_ref (stream->sinkpad);
90
91   return NULL;
92 }
93
94 static GstPad *
95 gst_stream_get_other_pad_from_pad (GstPad * pad)
96 {
97   GstObject *parent = gst_pad_get_parent (pad);
98   GstStreamSynchronizer *self;
99   GstStream *stream;
100   GstPad *opad = NULL;
101
102   /* released pad does not have parent anymore */
103   if (!G_LIKELY (parent))
104     goto exit;
105
106   self = GST_STREAM_SYNCHRONIZER (parent);
107   GST_STREAM_SYNCHRONIZER_LOCK (self);
108   stream = gst_pad_get_element_private (pad);
109   if (!stream)
110     goto out;
111
112   opad = gst_stream_get_other_pad (stream, pad);
113
114 out:
115   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
116   gst_object_unref (self);
117
118 exit:
119   if (!opad)
120     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
121
122   return opad;
123 }
124
125 /* Generic pad functions */
126 static GstIterator *
127 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
128 {
129   GstIterator *it = NULL;
130   GstPad *opad;
131
132   opad = gst_stream_get_other_pad_from_pad (pad);
133   if (opad) {
134     it = gst_iterator_new_single (GST_TYPE_PAD, opad,
135         (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
136     gst_object_unref (opad);
137   }
138
139   return it;
140 }
141
142 static gboolean
143 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
144 {
145   GstPad *opad;
146   gboolean ret = FALSE;
147
148   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
149
150   opad = gst_stream_get_other_pad_from_pad (pad);
151   if (opad) {
152     ret = gst_pad_peer_query (opad, query);
153     gst_object_unref (opad);
154   }
155
156   return ret;
157 }
158
159 static GstCaps *
160 gst_stream_synchronizer_getcaps (GstPad * pad)
161 {
162   GstPad *opad;
163   GstCaps *ret = NULL;
164
165   opad = gst_stream_get_other_pad_from_pad (pad);
166   if (opad) {
167     ret = gst_pad_peer_get_caps (opad);
168     gst_object_unref (opad);
169   }
170
171   if (ret == NULL)
172     ret = gst_caps_new_any ();
173
174   GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
175
176   return ret;
177 }
178
179 static gboolean
180 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
181 {
182   GstPad *opad;
183   gboolean ret = FALSE;
184
185   opad = gst_stream_get_other_pad_from_pad (pad);
186   if (opad) {
187     ret = gst_pad_peer_accept_caps (opad, caps);
188     gst_object_unref (opad);
189   }
190
191   GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
192       caps);
193
194   return ret;
195 }
196
197 /* srcpad functions */
198 static gboolean
199 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
200 {
201   GstStreamSynchronizer *self =
202       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
203   GstPad *opad;
204   gboolean ret = FALSE;
205
206   if (passthrough)
207     goto skip_adjustments;
208
209   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
210       GST_EVENT_TYPE_NAME (event), event->structure);
211
212   switch (GST_EVENT_TYPE (event)) {
213     case GST_EVENT_QOS:{
214       gdouble proportion;
215       GstClockTimeDiff diff;
216       GstClockTime timestamp;
217       gint64 running_time_diff;
218       GstStream *stream;
219
220       gst_event_parse_qos (event, &proportion, &diff, &timestamp);
221       gst_event_unref (event);
222
223       GST_STREAM_SYNCHRONIZER_LOCK (self);
224       stream = gst_pad_get_element_private (pad);
225       if (stream)
226         running_time_diff = stream->running_time_diff;
227       else
228         running_time_diff = -1;
229       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
230
231       if (running_time_diff == -1) {
232         GST_WARNING_OBJECT (pad, "QOS event before group start");
233         goto out;
234       } else if (timestamp < running_time_diff) {
235         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
236         goto out;
237       }
238
239       GST_LOG_OBJECT (pad,
240           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
241           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
242           GST_TIME_ARGS (running_time_diff),
243           GST_TIME_ARGS (timestamp - running_time_diff));
244
245       timestamp -= running_time_diff;
246
247       /* That case is invalid for QoS events */
248       if (diff < 0 && -diff > timestamp) {
249         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
250         ret = TRUE;
251         goto out;
252       }
253
254       event = gst_event_new_qos (proportion, diff, timestamp);
255       break;
256     }
257     default:
258       break;
259   }
260
261 skip_adjustments:
262
263   opad = gst_stream_get_other_pad_from_pad (pad);
264   if (opad) {
265     ret = gst_pad_push_event (opad, event);
266     gst_object_unref (opad);
267   }
268
269 out:
270   gst_object_unref (self);
271
272   return ret;
273 }
274
275 /* sinkpad functions */
276 static gboolean
277 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
278 {
279   GstStreamSynchronizer *self =
280       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
281   GstPad *opad;
282   gboolean ret = FALSE;
283
284   if (passthrough)
285     goto skip_adjustments;
286
287   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
288       GST_EVENT_TYPE_NAME (event), event->structure);
289
290   switch (GST_EVENT_TYPE (event)) {
291     case GST_EVENT_SINK_MESSAGE:{
292       GstMessage *message;
293
294       gst_event_parse_sink_message (event, &message);
295       if (message->structure
296           && gst_structure_has_name (message->structure,
297               "playbin2-stream-changed")) {
298         GstStream *stream;
299
300         GST_STREAM_SYNCHRONIZER_LOCK (self);
301         stream = gst_pad_get_element_private (pad);
302         if (stream) {
303           GList *l;
304           gboolean all_wait = TRUE;
305
306           GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
307
308           stream->is_eos = FALSE;
309           stream->wait = TRUE;
310           stream->new_stream = TRUE;
311
312           for (l = self->streams; l; l = l->next) {
313             GstStream *ostream = l->data;
314
315             all_wait = all_wait && ostream->wait;
316             if (!all_wait)
317               break;
318           }
319           if (all_wait) {
320             gint64 last_stop = 0;
321
322             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
323
324             for (l = self->streams; l; l = l->next) {
325               GstStream *ostream = l->data;
326               gint64 stop_running_time;
327               gint64 last_stop_running_time;
328
329               ostream->wait = FALSE;
330
331               stop_running_time =
332                   gst_segment_to_running_time (&ostream->segment,
333                   GST_FORMAT_TIME, ostream->segment.stop);
334               last_stop_running_time =
335                   gst_segment_to_running_time (&ostream->segment,
336                   GST_FORMAT_TIME, ostream->segment.last_stop);
337               last_stop =
338                   MAX (last_stop, MAX (stop_running_time,
339                       last_stop_running_time));
340             }
341             last_stop = MAX (0, last_stop);
342             self->group_start_time = MAX (self->group_start_time, last_stop);
343
344             GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
345                 GST_TIME_ARGS (self->group_start_time));
346
347             g_cond_broadcast (self->stream_finish_cond);
348           }
349         }
350         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
351       }
352       gst_message_unref (message);
353       break;
354     }
355     case GST_EVENT_NEWSEGMENT:{
356       GstStream *stream;
357       gboolean update;
358       gdouble rate, applied_rate;
359       GstFormat format;
360       gint64 start, stop, position;
361
362       gst_event_parse_new_segment_full (event,
363           &update, &rate, &applied_rate, &format, &start, &stop, &position);
364
365       GST_STREAM_SYNCHRONIZER_LOCK (self);
366       stream = gst_pad_get_element_private (pad);
367       if (stream) {
368         if (stream->wait) {
369           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
370           g_cond_wait (self->stream_finish_cond, self->lock);
371           stream = gst_pad_get_element_private (pad);
372           if (stream)
373             stream->wait = FALSE;
374         }
375       }
376
377       if (self->shutdown) {
378         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
379         gst_event_unref (event);
380         goto done;
381       }
382
383       if (stream && format == GST_FORMAT_TIME) {
384         if (stream->new_stream) {
385           gint64 last_stop_running_time = 0;
386           gint64 stop_running_time = 0;
387
388           if (stream->segment.format == GST_FORMAT_TIME) {
389             last_stop_running_time =
390                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
391                 stream->segment.last_stop);
392             last_stop_running_time = MAX (last_stop_running_time, 0);
393             stop_running_time =
394                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
395                 stream->segment.stop);
396             stop_running_time = MAX (last_stop_running_time, 0);
397
398             if (stop_running_time != last_stop_running_time) {
399               GST_WARNING_OBJECT (pad,
400                   "Gap between last_stop and segment stop: %" GST_TIME_FORMAT
401                   " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
402                   GST_TIME_ARGS (last_stop_running_time));
403             }
404
405             if (stop_running_time < last_stop_running_time) {
406               GST_DEBUG_OBJECT (pad, "Updating stop position");
407               gst_pad_push_event (stream->srcpad,
408                   gst_event_new_new_segment_full (TRUE, stream->segment.rate,
409                       stream->segment.applied_rate, GST_FORMAT_TIME,
410                       stream->segment.start, stream->segment.last_stop,
411                       stream->segment.time));
412               gst_segment_set_newsegment_full (&stream->segment, TRUE,
413                   stream->segment.rate, stream->segment.applied_rate,
414                   GST_FORMAT_TIME, stream->segment.start,
415                   stream->segment.last_stop, stream->segment.time);
416             }
417             stop_running_time = MAX (stop_running_time, last_stop_running_time);
418             GST_DEBUG_OBJECT (pad,
419                 "Stop running time of last group: %" GST_TIME_FORMAT,
420                 GST_TIME_ARGS (stop_running_time));
421           }
422           stream->new_stream = FALSE;
423           stream->drop_discont = TRUE;
424
425           if (stop_running_time < self->group_start_time) {
426             gint64 diff = self->group_start_time - stop_running_time;
427
428             GST_DEBUG_OBJECT (pad,
429                 "Advancing running time for other streams by: %"
430                 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
431             gst_pad_push_event (stream->srcpad,
432                 gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
433                     GST_FORMAT_TIME, 0, diff, 0));
434             gst_segment_set_newsegment_full (&stream->segment, FALSE, 1.0, 1.0,
435                 GST_FORMAT_TIME, 0, diff, 0);
436           }
437         }
438
439         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
440             &stream->segment);
441         gst_segment_set_newsegment_full (&stream->segment, update, rate,
442             applied_rate, format, start, stop, position);
443         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
444             &stream->segment);
445
446         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
447             GST_TIME_ARGS (stream->segment.accum));
448         stream->running_time_diff = stream->segment.accum;
449       } else if (stream) {
450         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
451             gst_format_get_name (format));
452         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
453       }
454       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
455       break;
456     }
457     case GST_EVENT_FLUSH_STOP:{
458       GstStream *stream;
459
460       GST_STREAM_SYNCHRONIZER_LOCK (self);
461       stream = gst_pad_get_element_private (pad);
462       if (stream) {
463         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
464             stream->stream_number);
465         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
466
467         stream->is_eos = FALSE;
468         stream->wait = FALSE;
469         stream->new_stream = FALSE;
470         stream->drop_discont = FALSE;
471         stream->seen_data = FALSE;
472       }
473       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
474       break;
475     }
476     case GST_EVENT_EOS:{
477       GstStream *stream;
478       GList *l;
479       gboolean all_eos = TRUE;
480       gboolean seen_data;
481       GSList *pads = NULL;
482       GstPad *srcpad;
483
484       GST_STREAM_SYNCHRONIZER_LOCK (self);
485       stream = gst_pad_get_element_private (pad);
486       if (!stream) {
487         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
488         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
489         break;
490       }
491
492       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
493       stream->is_eos = TRUE;
494
495       seen_data = stream->seen_data;
496       srcpad = gst_object_ref (stream->srcpad);
497
498       for (l = self->streams; l; l = l->next) {
499         GstStream *ostream = l->data;
500
501         all_eos = all_eos && ostream->is_eos;
502         if (!all_eos)
503           break;
504       }
505
506       if (all_eos) {
507         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
508         for (l = self->streams; l; l = l->next) {
509           GstStream *ostream = l->data;
510           /* local snapshot of current pads */
511           gst_object_ref (ostream->srcpad);
512           pads = g_slist_prepend (pads, ostream->srcpad);
513         }
514       }
515       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
516       /* drop lock when sending eos, which may block in e.g. preroll */
517       if (pads) {
518         GstPad *pad;
519         GSList *epad;
520
521         ret = TRUE;
522         epad = pads;
523         while (epad) {
524           pad = epad->data;
525           GST_DEBUG_OBJECT (pad, "Pushing EOS");
526           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
527           gst_object_unref (pad);
528           epad = g_slist_next (epad);
529         }
530         g_slist_free (pads);
531       } else {
532         /* if EOS, but no data has passed, then send something to replace EOS
533          * for preroll purposes */
534         if (!seen_data) {
535           GstBuffer *buf = gst_buffer_new ();
536
537           GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
538           gst_pad_push (srcpad, buf);
539         }
540       }
541       gst_object_unref (srcpad);
542       goto done;
543       break;
544     }
545     default:
546       break;
547   }
548
549 skip_adjustments:
550
551   opad = gst_stream_get_other_pad_from_pad (pad);
552   if (opad) {
553     ret = gst_pad_push_event (opad, event);
554     gst_object_unref (opad);
555   }
556
557 done:
558   gst_object_unref (self);
559
560   return ret;
561 }
562
563 static GstFlowReturn
564 gst_stream_synchronizer_sink_bufferalloc (GstPad * pad, guint64 offset,
565     guint size, GstCaps * caps, GstBuffer ** buf)
566 {
567   GstPad *opad;
568   GstFlowReturn ret = GST_FLOW_OK;
569
570   GST_LOG_OBJECT (pad, "Allocating buffer: size=%u", size);
571
572   opad = gst_stream_get_other_pad_from_pad (pad);
573   if (opad) {
574     ret = gst_pad_alloc_buffer (opad, offset, size, caps, buf);
575     gst_object_unref (opad);
576   } else {
577     /* may have been released during shutdown;
578      * silently trigger fallback */
579     *buf = NULL;
580   }
581
582   GST_LOG_OBJECT (pad, "Allocation: %s", gst_flow_get_name (ret));
583
584   return ret;
585 }
586
587 static GstFlowReturn
588 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
589 {
590   GstStreamSynchronizer *self =
591       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
592   GstPad *opad;
593   GstFlowReturn ret = GST_FLOW_ERROR;
594   GstStream *stream;
595   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
596   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
597
598   if (passthrough) {
599     opad = gst_stream_get_other_pad_from_pad (pad);
600     if (opad) {
601       ret = gst_pad_push (opad, buffer);
602       gst_object_unref (opad);
603     }
604     goto done;
605   }
606
607   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
608       GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
609       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
610       buffer, GST_BUFFER_SIZE (buffer),
611       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
612       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
613       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
614
615   timestamp = GST_BUFFER_TIMESTAMP (buffer);
616   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
617       && GST_BUFFER_DURATION_IS_VALID (buffer))
618     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
619
620   GST_STREAM_SYNCHRONIZER_LOCK (self);
621   stream = gst_pad_get_element_private (pad);
622
623   if (stream)
624     stream->seen_data = TRUE;
625   if (stream && stream->drop_discont) {
626     buffer = gst_buffer_make_metadata_writable (buffer);
627     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
628     stream->drop_discont = FALSE;
629   }
630
631   if (stream && stream->segment.format == GST_FORMAT_TIME
632       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
633     GST_LOG_OBJECT (pad,
634         "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
635         GST_TIME_ARGS (stream->segment.last_stop), GST_TIME_ARGS (timestamp));
636     gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME, timestamp);
637   }
638   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
639
640   opad = gst_stream_get_other_pad_from_pad (pad);
641   if (opad) {
642     ret = gst_pad_push (opad, buffer);
643     gst_object_unref (opad);
644   }
645
646   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
647   if (ret == GST_FLOW_OK) {
648     GList *l;
649
650     GST_STREAM_SYNCHRONIZER_LOCK (self);
651     stream = gst_pad_get_element_private (pad);
652     if (stream && stream->segment.format == GST_FORMAT_TIME
653         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
654       GST_LOG_OBJECT (pad,
655           "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
656           GST_TIME_ARGS (stream->segment.last_stop),
657           GST_TIME_ARGS (timestamp_end));
658       gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME,
659           timestamp_end);
660     }
661
662     /* Advance EOS streams if necessary. For non-EOS
663      * streams the demuxers should already do this! */
664     for (l = self->streams; l; l = l->next) {
665       GstStream *ostream = l->data;
666       gint64 last_stop;
667
668       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
669         continue;
670
671       if (ostream->segment.last_stop != -1)
672         last_stop = ostream->segment.last_stop;
673       else
674         last_stop = ostream->segment.start;
675
676       /* Is there a 1 second lag? */
677       if (last_stop != -1 && last_stop + GST_SECOND < timestamp_end) {
678         gint64 new_start, new_stop;
679
680         new_start = timestamp_end - GST_SECOND;
681         if (ostream->segment.stop == -1)
682           new_stop = -1;
683         else
684           new_stop = MAX (new_start, ostream->segment.stop);
685
686         GST_DEBUG_OBJECT (ostream->sinkpad,
687             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
688             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (last_stop),
689             GST_TIME_ARGS (new_start));
690
691         gst_pad_push_event (ostream->srcpad,
692             gst_event_new_new_segment_full (TRUE, ostream->segment.rate,
693                 ostream->segment.applied_rate, ostream->segment.format,
694                 new_start, new_stop, new_start));
695         gst_segment_set_newsegment_full (&ostream->segment, TRUE,
696             ostream->segment.rate, ostream->segment.applied_rate,
697             ostream->segment.format, new_start, new_stop, new_start);
698         gst_segment_set_last_stop (&ostream->segment, GST_FORMAT_TIME,
699             new_start);
700       }
701     }
702     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
703   }
704
705 done:
706
707   gst_object_unref (self);
708
709   return ret;
710 }
711
712 /* GstElement vfuncs */
713 static GstPad *
714 gst_stream_synchronizer_request_new_pad (GstElement * element,
715     GstPadTemplate * temp, const gchar * name)
716 {
717   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
718   GstStream *stream;
719   gchar *tmp;
720
721   GST_STREAM_SYNCHRONIZER_LOCK (self);
722   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
723       self->current_stream_number);
724
725   stream = g_slice_new0 (GstStream);
726   stream->transform = self;
727   stream->stream_number = self->current_stream_number;
728
729   tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
730   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
731   g_free (tmp);
732   gst_pad_set_element_private (stream->sinkpad, stream);
733   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
734       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
735   gst_pad_set_query_function (stream->sinkpad,
736       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
737   gst_pad_set_getcaps_function (stream->sinkpad,
738       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
739   gst_pad_set_acceptcaps_function (stream->sinkpad,
740       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
741   gst_pad_set_event_function (stream->sinkpad,
742       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
743   gst_pad_set_chain_function (stream->sinkpad,
744       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
745   gst_pad_set_bufferalloc_function (stream->sinkpad,
746       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_bufferalloc));
747
748   tmp = g_strdup_printf ("src_%d", self->current_stream_number);
749   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
750   g_free (tmp);
751   gst_pad_set_element_private (stream->srcpad, stream);
752   gst_pad_set_iterate_internal_links_function (stream->srcpad,
753       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
754   gst_pad_set_query_function (stream->srcpad,
755       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
756   gst_pad_set_getcaps_function (stream->srcpad,
757       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
758   gst_pad_set_acceptcaps_function (stream->srcpad,
759       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
760   gst_pad_set_event_function (stream->srcpad,
761       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
762
763   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
764
765   self->streams = g_list_prepend (self->streams, stream);
766   self->current_stream_number++;
767   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
768
769   /* Add pads and activate unless we're going to NULL */
770   g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
771   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
772     gst_pad_set_active (stream->srcpad, TRUE);
773     gst_pad_set_active (stream->sinkpad, TRUE);
774   }
775   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
776   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
777   g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
778
779   return stream->sinkpad;
780 }
781
782 /* Must be called with lock! */
783 static void
784 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
785     GstStream * stream)
786 {
787   GList *l;
788
789   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
790
791   for (l = self->streams; l; l = l->next) {
792     if (l->data == stream) {
793       self->streams = g_list_delete_link (self->streams, l);
794       break;
795     }
796   }
797   g_assert (l != NULL);
798
799   /* we can drop the lock, since stream exists now only local.
800    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
801    * (due to reverse lock order) when deactivating pads */
802   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
803
804   gst_pad_set_element_private (stream->srcpad, NULL);
805   gst_pad_set_element_private (stream->sinkpad, NULL);
806   gst_pad_set_active (stream->srcpad, FALSE);
807   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
808   gst_pad_set_active (stream->sinkpad, FALSE);
809   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
810
811   if (stream->segment.format == GST_FORMAT_TIME) {
812     gint64 stop_running_time;
813     gint64 last_stop_running_time;
814
815     stop_running_time =
816         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
817         stream->segment.stop);
818     last_stop_running_time =
819         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
820         stream->segment.last_stop);
821     stop_running_time = MAX (stop_running_time, last_stop_running_time);
822
823     GST_DEBUG_OBJECT (stream->sinkpad,
824         "Stop running time was: %" GST_TIME_FORMAT,
825         GST_TIME_ARGS (stop_running_time));
826
827     self->group_start_time = MAX (self->group_start_time, stop_running_time);
828   }
829
830   g_slice_free (GstStream, stream);
831
832   /* NOTE: In theory we have to check here if all streams
833    * are EOS but the one that was removed wasn't and then
834    * send EOS downstream. But due to the way how playsink
835    * works this is not necessary and will only cause problems
836    * for gapless playback. playsink will only add/remove pads
837    * when it's reconfigured, which happens when the streams
838    * change
839    */
840
841   /* lock for good measure, since the caller had it */
842   GST_STREAM_SYNCHRONIZER_LOCK (self);
843 }
844
845 static void
846 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
847 {
848   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
849   GstStream *stream;
850
851   GST_STREAM_SYNCHRONIZER_LOCK (self);
852   stream = gst_pad_get_element_private (pad);
853   if (stream) {
854     g_assert (stream->sinkpad == pad);
855
856     gst_stream_synchronizer_release_stream (self, stream);
857   }
858   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
859 }
860
861 static GstStateChangeReturn
862 gst_stream_synchronizer_change_state (GstElement * element,
863     GstStateChange transition)
864 {
865   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
866   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
867
868   switch (transition) {
869     case GST_STATE_CHANGE_NULL_TO_READY:
870       GST_DEBUG_OBJECT (self, "State change NULL->READY");
871       self->shutdown = FALSE;
872       break;
873     case GST_STATE_CHANGE_READY_TO_PAUSED:
874       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
875       self->group_start_time = 0;
876       self->shutdown = FALSE;
877       break;
878     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
879       GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
880       break;
881     case GST_STATE_CHANGE_PAUSED_TO_READY:
882       GST_DEBUG_OBJECT (self, "State change READY->NULL");
883
884       GST_STREAM_SYNCHRONIZER_LOCK (self);
885       g_cond_broadcast (self->stream_finish_cond);
886       self->shutdown = TRUE;
887       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
888     default:
889       break;
890   }
891
892   {
893     GstStateChangeReturn bret;
894
895     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
896     GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
897     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
898       return ret;
899   }
900
901   switch (transition) {
902     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
903       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
904       break;
905     case GST_STATE_CHANGE_PAUSED_TO_READY:{
906       GList *l;
907
908       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
909       self->group_start_time = 0;
910
911       GST_STREAM_SYNCHRONIZER_LOCK (self);
912       for (l = self->streams; l; l = l->next) {
913         GstStream *stream = l->data;
914
915         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
916         stream->wait = FALSE;
917         stream->new_stream = FALSE;
918         stream->drop_discont = FALSE;
919         stream->is_eos = FALSE;
920       }
921       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
922       break;
923     }
924     case GST_STATE_CHANGE_READY_TO_NULL:{
925       GST_DEBUG_OBJECT (self, "State change READY->NULL");
926
927       GST_STREAM_SYNCHRONIZER_LOCK (self);
928       while (self->streams)
929         gst_stream_synchronizer_release_stream (self, self->streams->data);
930       self->current_stream_number = 0;
931       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
932       break;
933     }
934     default:
935       break;
936   }
937
938   return ret;
939 }
940
941 /* GObject vfuncs */
942 static void
943 gst_stream_synchronizer_finalize (GObject * object)
944 {
945   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
946
947   if (self->lock) {
948     g_mutex_free (self->lock);
949     self->lock = NULL;
950   }
951
952   if (self->stream_finish_cond) {
953     g_cond_free (self->stream_finish_cond);
954     self->stream_finish_cond = NULL;
955   }
956
957   G_OBJECT_CLASS (parent_class)->finalize (object);
958 }
959
960 /* GObject type initialization */
961 static void
962 gst_stream_synchronizer_init (GstStreamSynchronizer * self,
963     GstStreamSynchronizerClass * klass)
964 {
965   self->lock = g_mutex_new ();
966   self->stream_finish_cond = g_cond_new ();
967 }
968
969 static void
970 gst_stream_synchronizer_base_init (gpointer g_class)
971 {
972   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
973
974   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
975   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
976
977   gst_element_class_set_details_simple (gstelement_class,
978       "Stream Synchronizer", "Generic",
979       "Synchronizes a group of streams to have equal durations and starting points",
980       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
981 }
982
983 static void
984 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
985 {
986   GObjectClass *gobject_class = (GObjectClass *) klass;
987   GstElementClass *element_class = (GstElementClass *) klass;
988
989   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
990       "streamsynchronizer", 0, "Stream Synchronizer");
991
992   gobject_class->finalize = gst_stream_synchronizer_finalize;
993
994   element_class->change_state =
995       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
996   element_class->request_new_pad =
997       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
998   element_class->release_pad =
999       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
1000 }