Merge remote-tracking branch 'origin/master' into 0.11
[platform/upstream/gstreamer.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
30     GST_LOG_OBJECT (obj,                                                \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
34     GST_LOG_OBJECT (obj,                                                \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
40     GST_LOG_OBJECT (obj,                                                \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54
55 static const gboolean passthrough = TRUE;
56
57 #define gst_stream_synchronizer_parent_class parent_class
58 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
59     GST_TYPE_ELEMENT);
60
61 typedef struct
62 {
63   GstStreamSynchronizer *transform;
64   guint stream_number;
65   GstPad *srcpad;
66   GstPad *sinkpad;
67   GstSegment segment;
68
69   gboolean wait;
70   gboolean new_stream;
71   gboolean drop_discont;
72   gboolean is_eos;
73   gboolean seen_data;
74
75   gint64 running_time_diff;
76 } GstStream;
77
78 /* Must be called with lock! */
79 static GstPad *
80 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
81 {
82   if (stream->sinkpad == pad)
83     return gst_object_ref (stream->srcpad);
84   else if (stream->srcpad == pad)
85     return gst_object_ref (stream->sinkpad);
86
87   return NULL;
88 }
89
90 static GstPad *
91 gst_stream_get_other_pad_from_pad (GstPad * pad)
92 {
93   GstStreamSynchronizer *self =
94       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
95   GstStream *stream;
96   GstPad *opad = NULL;
97
98   GST_STREAM_SYNCHRONIZER_LOCK (self);
99   stream = gst_pad_get_element_private (pad);
100   if (!stream)
101     goto out;
102
103   opad = gst_stream_get_other_pad (stream, pad);
104
105 out:
106   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
107   gst_object_unref (self);
108
109   if (!opad)
110     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
111
112   return opad;
113 }
114
115 /* Generic pad functions */
116 static GstIterator *
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
118     GstObject * parent)
119 {
120   GstIterator *it = NULL;
121   GstPad *opad;
122
123   opad = gst_stream_get_other_pad_from_pad (pad);
124   if (opad) {
125     GValue value = { 0, };
126
127     g_value_init (&value, GST_TYPE_PAD);
128     g_value_set_object (&value, opad);
129     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
130     g_value_unset (&value);
131     gst_object_unref (opad);
132   }
133
134   return it;
135 }
136
137 static gboolean
138 gst_stream_synchronizer_query (GstPad * pad, GstObject * parent,
139     GstQuery * query)
140 {
141   GstPad *opad;
142   gboolean ret = FALSE;
143
144   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
145
146   opad = gst_stream_get_other_pad_from_pad (pad);
147   if (opad) {
148     ret = gst_pad_peer_query (opad, query);
149     gst_object_unref (opad);
150   }
151
152   return ret;
153 }
154
155 /* srcpad functions */
156 static gboolean
157 gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
158     GstEvent * event)
159 {
160   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
161   GstPad *opad;
162   gboolean ret = FALSE;
163
164   if (passthrough)
165     goto skip_adjustments;
166
167   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
168       GST_EVENT_TYPE_NAME (event), event);
169
170   switch (GST_EVENT_TYPE (event)) {
171     case GST_EVENT_QOS:{
172       gdouble proportion;
173       GstClockTimeDiff diff;
174       GstClockTime timestamp;
175       gint64 running_time_diff;
176       GstStream *stream;
177
178       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
179       gst_event_unref (event);
180
181       GST_STREAM_SYNCHRONIZER_LOCK (self);
182       stream = gst_pad_get_element_private (pad);
183       if (stream)
184         running_time_diff = stream->running_time_diff;
185       else
186         running_time_diff = -1;
187       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
188
189       if (running_time_diff == -1) {
190         GST_WARNING_OBJECT (pad, "QOS event before group start");
191         goto out;
192       } else if (timestamp < running_time_diff) {
193         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
194         goto out;
195       }
196
197       GST_LOG_OBJECT (pad,
198           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
199           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
200           GST_TIME_ARGS (running_time_diff),
201           GST_TIME_ARGS (timestamp - running_time_diff));
202
203       timestamp -= running_time_diff;
204
205       /* That case is invalid for QoS events */
206       if (diff < 0 && -diff > timestamp) {
207         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
208         ret = TRUE;
209         goto out;
210       }
211
212       event =
213           gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
214           timestamp);
215       break;
216     }
217     default:
218       break;
219   }
220
221 skip_adjustments:
222
223   opad = gst_stream_get_other_pad_from_pad (pad);
224   if (opad) {
225     ret = gst_pad_push_event (opad, event);
226     gst_object_unref (opad);
227   }
228
229 out:
230   return ret;
231 }
232
233 /* sinkpad functions */
234 static gboolean
235 gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
236     GstEvent * event)
237 {
238   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
239   GstPad *opad;
240   gboolean ret = FALSE;
241
242   if (passthrough)
243     goto skip_adjustments;
244
245   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
246       GST_EVENT_TYPE_NAME (event), event);
247
248   switch (GST_EVENT_TYPE (event)) {
249     case GST_EVENT_SINK_MESSAGE:{
250       GstMessage *message;
251
252       gst_event_parse_sink_message (event, &message);
253       if (gst_message_has_name (message, "playbin-stream-changed")) {
254         GstStream *stream;
255
256         GST_STREAM_SYNCHRONIZER_LOCK (self);
257         stream = gst_pad_get_element_private (pad);
258         if (stream) {
259           GList *l;
260           gboolean all_wait = TRUE;
261
262           GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
263
264           stream->is_eos = FALSE;
265           stream->wait = TRUE;
266           stream->new_stream = TRUE;
267
268           for (l = self->streams; l; l = l->next) {
269             GstStream *ostream = l->data;
270
271             all_wait = all_wait && ostream->wait;
272             if (!all_wait)
273               break;
274           }
275           if (all_wait) {
276             gint64 position = 0;
277
278             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
279
280             for (l = self->streams; l; l = l->next) {
281               GstStream *ostream = l->data;
282               gint64 stop_running_time;
283               gint64 position_running_time;
284
285               ostream->wait = FALSE;
286
287               stop_running_time =
288                   gst_segment_to_running_time (&ostream->segment,
289                   GST_FORMAT_TIME, ostream->segment.stop);
290               position_running_time =
291                   gst_segment_to_running_time (&ostream->segment,
292                   GST_FORMAT_TIME, ostream->segment.position);
293               position =
294                   MAX (position, MAX (stop_running_time,
295                       position_running_time));
296             }
297             position = MAX (0, position);
298             self->group_start_time = MAX (self->group_start_time, position);
299
300             GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
301                 GST_TIME_ARGS (self->group_start_time));
302
303             g_cond_broadcast (self->stream_finish_cond);
304           }
305         }
306         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
307       }
308       gst_message_unref (message);
309       break;
310     }
311     case GST_EVENT_SEGMENT:{
312       GstStream *stream;
313       GstSegment segment;
314
315       gst_event_copy_segment (event, &segment);
316
317       GST_STREAM_SYNCHRONIZER_LOCK (self);
318       stream = gst_pad_get_element_private (pad);
319       if (stream) {
320         if (stream->wait) {
321           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
322           g_cond_wait (self->stream_finish_cond, self->lock);
323           stream = gst_pad_get_element_private (pad);
324           if (stream)
325             stream->wait = FALSE;
326         }
327       }
328
329       if (self->shutdown) {
330         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
331         gst_event_unref (event);
332         goto done;
333       }
334
335       if (stream && segment.format == GST_FORMAT_TIME) {
336         if (stream->new_stream) {
337           gint64 position_running_time = 0;
338           gint64 stop_running_time = 0;
339
340           if (stream->segment.format == GST_FORMAT_TIME) {
341             position_running_time =
342                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
343                 stream->segment.position);
344             position_running_time = MAX (position_running_time, 0);
345             stop_running_time =
346                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
347                 stream->segment.stop);
348             stop_running_time = MAX (position_running_time, 0);
349
350             if (stop_running_time != position_running_time) {
351               GST_WARNING_OBJECT (pad,
352                   "Gap between position and segment stop: %" GST_TIME_FORMAT
353                   " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
354                   GST_TIME_ARGS (position_running_time));
355             }
356
357             if (stop_running_time < position_running_time) {
358               GST_DEBUG_OBJECT (pad, "Updating stop position");
359               stream->segment.stop = stream->segment.position;
360               gst_pad_push_event (stream->srcpad,
361                   gst_event_new_segment (&stream->segment));
362             }
363             stop_running_time = MAX (stop_running_time, position_running_time);
364             GST_DEBUG_OBJECT (pad,
365                 "Stop running time of last group: %" GST_TIME_FORMAT,
366                 GST_TIME_ARGS (stop_running_time));
367           }
368           stream->new_stream = FALSE;
369           stream->drop_discont = TRUE;
370
371           if (stop_running_time < self->group_start_time) {
372             gint64 diff = self->group_start_time - stop_running_time;
373
374             GST_DEBUG_OBJECT (pad,
375                 "Advancing running time for other streams by: %"
376                 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
377
378             segment.base += diff;
379           }
380         }
381
382         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
383             &stream->segment);
384         gst_segment_copy_into (&segment, &stream->segment);
385         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
386             &stream->segment);
387
388         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
389             GST_TIME_ARGS (stream->segment.base));
390         stream->running_time_diff = stream->segment.base;
391       } else if (stream) {
392         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
393             gst_format_get_name (segment.format));
394         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
395       }
396       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
397       break;
398     }
399     case GST_EVENT_FLUSH_STOP:{
400       GstStream *stream;
401
402       GST_STREAM_SYNCHRONIZER_LOCK (self);
403       stream = gst_pad_get_element_private (pad);
404       if (stream) {
405         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
406             stream->stream_number);
407         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
408
409         stream->is_eos = FALSE;
410         stream->wait = FALSE;
411         stream->new_stream = FALSE;
412         stream->drop_discont = FALSE;
413         stream->seen_data = FALSE;
414       }
415       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
416       break;
417     }
418     case GST_EVENT_EOS:{
419       GstStream *stream;
420       GList *l;
421       gboolean all_eos = TRUE;
422       gboolean seen_data;
423       GSList *pads = NULL;
424       GstPad *srcpad;
425
426       GST_STREAM_SYNCHRONIZER_LOCK (self);
427       stream = gst_pad_get_element_private (pad);
428       if (!stream) {
429         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
430         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
431         break;
432       }
433
434       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
435       stream->is_eos = TRUE;
436
437       seen_data = stream->seen_data;
438       srcpad = gst_object_ref (stream->srcpad);
439
440       for (l = self->streams; l; l = l->next) {
441         GstStream *ostream = l->data;
442
443         all_eos = all_eos && ostream->is_eos;
444         if (!all_eos)
445           break;
446       }
447
448       if (all_eos) {
449         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
450         for (l = self->streams; l; l = l->next) {
451           GstStream *ostream = l->data;
452           /* local snapshot of current pads */
453           gst_object_ref (ostream->srcpad);
454           pads = g_slist_prepend (pads, ostream->srcpad);
455         }
456       }
457       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
458       /* drop lock when sending eos, which may block in e.g. preroll */
459       if (pads) {
460         GstPad *pad;
461         GSList *epad;
462
463         ret = TRUE;
464         epad = pads;
465         while (epad) {
466           pad = epad->data;
467           GST_DEBUG_OBJECT (pad, "Pushing EOS");
468           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
469           gst_object_unref (pad);
470           epad = g_slist_next (epad);
471         }
472         g_slist_free (pads);
473       } else {
474         /* if EOS, but no data has passed, then send something to replace EOS
475          * for preroll purposes */
476         if (!seen_data) {
477           GstBuffer *buf = gst_buffer_new ();
478
479           gst_pad_push (srcpad, buf);
480         }
481       }
482       gst_object_unref (srcpad);
483       goto done;
484       break;
485     }
486     default:
487       break;
488   }
489
490 skip_adjustments:
491
492   opad = gst_stream_get_other_pad_from_pad (pad);
493   if (opad) {
494     ret = gst_pad_push_event (opad, event);
495     gst_object_unref (opad);
496   }
497
498 done:
499   return ret;
500 }
501
502 static GstFlowReturn
503 gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
504     GstBuffer * buffer)
505 {
506   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
507   GstPad *opad;
508   GstFlowReturn ret = GST_FLOW_ERROR;
509   GstStream *stream;
510   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
511   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
512
513   if (passthrough) {
514     opad = gst_stream_get_other_pad_from_pad (pad);
515     if (opad) {
516       ret = gst_pad_push (opad, buffer);
517       gst_object_unref (opad);
518     }
519     goto done;
520   }
521
522   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
523       ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
524       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
525       buffer, gst_buffer_get_size (buffer),
526       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
527       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
528       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
529
530   timestamp = GST_BUFFER_TIMESTAMP (buffer);
531   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
532       && GST_BUFFER_DURATION_IS_VALID (buffer))
533     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
534
535   GST_STREAM_SYNCHRONIZER_LOCK (self);
536   stream = gst_pad_get_element_private (pad);
537
538   stream->seen_data = TRUE;
539   if (stream && stream->drop_discont) {
540     buffer = gst_buffer_make_writable (buffer);
541     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
542     stream->drop_discont = FALSE;
543   }
544
545   if (stream && stream->segment.format == GST_FORMAT_TIME
546       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
547     GST_LOG_OBJECT (pad,
548         "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
549         GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
550     stream->segment.position = timestamp;
551   }
552   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
553
554   opad = gst_stream_get_other_pad_from_pad (pad);
555   if (opad) {
556     ret = gst_pad_push (opad, buffer);
557     gst_object_unref (opad);
558   }
559
560   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
561   if (ret == GST_FLOW_OK) {
562     GList *l;
563
564     GST_STREAM_SYNCHRONIZER_LOCK (self);
565     stream = gst_pad_get_element_private (pad);
566     if (stream && stream->segment.format == GST_FORMAT_TIME
567         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
568       GST_LOG_OBJECT (pad,
569           "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
570           GST_TIME_ARGS (stream->segment.position),
571           GST_TIME_ARGS (timestamp_end));
572       stream->segment.position = timestamp_end;
573     }
574
575     /* Advance EOS streams if necessary. For non-EOS
576      * streams the demuxers should already do this! */
577     for (l = self->streams; l; l = l->next) {
578       GstStream *ostream = l->data;
579       gint64 position;
580
581       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
582         continue;
583
584       if (ostream->segment.position != -1)
585         position = ostream->segment.position;
586       else
587         position = ostream->segment.start;
588
589       /* Is there a 1 second lag? */
590       if (position != -1 && position + GST_SECOND < timestamp_end) {
591         gint64 new_start, new_stop;
592
593         new_start = timestamp_end - GST_SECOND;
594         if (ostream->segment.stop == -1)
595           new_stop = -1;
596         else
597           new_stop = MAX (new_start, ostream->segment.stop);
598
599         GST_DEBUG_OBJECT (ostream->sinkpad,
600             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
601             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
602             GST_TIME_ARGS (new_start));
603
604         ostream->segment.start = new_start;
605         ostream->segment.stop = new_stop;
606         ostream->segment.time = new_start;
607         ostream->segment.position = new_start;
608
609         gst_pad_push_event (ostream->srcpad,
610             gst_event_new_segment (&ostream->segment));
611       }
612     }
613     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
614   }
615
616 done:
617   return ret;
618 }
619
620 /* GstElement vfuncs */
621 static GstPad *
622 gst_stream_synchronizer_request_new_pad (GstElement * element,
623     GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
624 {
625   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
626   GstStream *stream;
627   gchar *tmp;
628
629   GST_STREAM_SYNCHRONIZER_LOCK (self);
630   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
631       self->current_stream_number);
632
633   stream = g_slice_new0 (GstStream);
634   stream->transform = self;
635   stream->stream_number = self->current_stream_number;
636
637   tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
638   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
639   g_free (tmp);
640   gst_pad_set_element_private (stream->sinkpad, stream);
641   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
642       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
643   gst_pad_set_query_function (stream->sinkpad,
644       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
645   gst_pad_set_event_function (stream->sinkpad,
646       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
647   gst_pad_set_chain_function (stream->sinkpad,
648       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
649
650   tmp = g_strdup_printf ("src_%u", self->current_stream_number);
651   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
652   g_free (tmp);
653   gst_pad_set_element_private (stream->srcpad, stream);
654   gst_pad_set_iterate_internal_links_function (stream->srcpad,
655       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
656   gst_pad_set_query_function (stream->srcpad,
657       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
658   gst_pad_set_event_function (stream->srcpad,
659       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
660
661   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
662
663   self->streams = g_list_prepend (self->streams, stream);
664   self->current_stream_number++;
665   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
666
667   /* Add pads and activate unless we're going to NULL */
668   g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
669   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
670     gst_pad_set_active (stream->srcpad, TRUE);
671     gst_pad_set_active (stream->sinkpad, TRUE);
672   }
673   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
674   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
675   g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
676
677   return stream->sinkpad;
678 }
679
680 /* Must be called with lock! */
681 static void
682 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
683     GstStream * stream)
684 {
685   GList *l;
686
687   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
688
689   for (l = self->streams; l; l = l->next) {
690     if (l->data == stream) {
691       self->streams = g_list_delete_link (self->streams, l);
692       break;
693     }
694   }
695   g_assert (l != NULL);
696
697   /* we can drop the lock, since stream exists now only local.
698    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
699    * (due to reverse lock order) when deactivating pads */
700   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
701
702   gst_pad_set_element_private (stream->srcpad, NULL);
703   gst_pad_set_element_private (stream->sinkpad, NULL);
704   gst_pad_set_active (stream->srcpad, FALSE);
705   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
706   gst_pad_set_active (stream->sinkpad, FALSE);
707   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
708
709   if (stream->segment.format == GST_FORMAT_TIME) {
710     gint64 stop_running_time;
711     gint64 position_running_time;
712
713     stop_running_time =
714         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
715         stream->segment.stop);
716     position_running_time =
717         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
718         stream->segment.position);
719     stop_running_time = MAX (stop_running_time, position_running_time);
720
721     GST_DEBUG_OBJECT (stream->sinkpad,
722         "Stop running time was: %" GST_TIME_FORMAT,
723         GST_TIME_ARGS (stop_running_time));
724
725     self->group_start_time = MAX (self->group_start_time, stop_running_time);
726   }
727
728   g_slice_free (GstStream, stream);
729
730   /* NOTE: In theory we have to check here if all streams
731    * are EOS but the one that was removed wasn't and then
732    * send EOS downstream. But due to the way how playsink
733    * works this is not necessary and will only cause problems
734    * for gapless playback. playsink will only add/remove pads
735    * when it's reconfigured, which happens when the streams
736    * change
737    */
738
739   /* lock for good measure, since the caller had it */
740   GST_STREAM_SYNCHRONIZER_LOCK (self);
741 }
742
743 static void
744 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
745 {
746   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
747   GstStream *stream;
748
749   GST_STREAM_SYNCHRONIZER_LOCK (self);
750   stream = gst_pad_get_element_private (pad);
751   if (stream) {
752     g_assert (stream->sinkpad == pad);
753
754     gst_stream_synchronizer_release_stream (self, stream);
755   }
756   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
757 }
758
759 static GstStateChangeReturn
760 gst_stream_synchronizer_change_state (GstElement * element,
761     GstStateChange transition)
762 {
763   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
764   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
765
766   switch (transition) {
767     case GST_STATE_CHANGE_NULL_TO_READY:
768       GST_DEBUG_OBJECT (self, "State change NULL->READY");
769       self->shutdown = FALSE;
770       break;
771     case GST_STATE_CHANGE_READY_TO_PAUSED:
772       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
773       self->group_start_time = 0;
774       self->shutdown = FALSE;
775       break;
776     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
777       GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
778       break;
779     case GST_STATE_CHANGE_PAUSED_TO_READY:
780       GST_DEBUG_OBJECT (self, "State change READY->NULL");
781
782       GST_STREAM_SYNCHRONIZER_LOCK (self);
783       g_cond_broadcast (self->stream_finish_cond);
784       self->shutdown = TRUE;
785       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
786     default:
787       break;
788   }
789
790   {
791     GstStateChangeReturn bret;
792
793     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
794     GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
795     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
796       return ret;
797   }
798
799   switch (transition) {
800     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
801       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
802       break;
803     case GST_STATE_CHANGE_PAUSED_TO_READY:{
804       GList *l;
805
806       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
807       self->group_start_time = 0;
808
809       GST_STREAM_SYNCHRONIZER_LOCK (self);
810       for (l = self->streams; l; l = l->next) {
811         GstStream *stream = l->data;
812
813         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
814         stream->wait = FALSE;
815         stream->new_stream = FALSE;
816         stream->drop_discont = FALSE;
817         stream->is_eos = FALSE;
818       }
819       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
820       break;
821     }
822     case GST_STATE_CHANGE_READY_TO_NULL:{
823       GST_DEBUG_OBJECT (self, "State change READY->NULL");
824
825       GST_STREAM_SYNCHRONIZER_LOCK (self);
826       while (self->streams)
827         gst_stream_synchronizer_release_stream (self, self->streams->data);
828       self->current_stream_number = 0;
829       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
830       break;
831     }
832     default:
833       break;
834   }
835
836   return ret;
837 }
838
839 /* GObject vfuncs */
840 static void
841 gst_stream_synchronizer_finalize (GObject * object)
842 {
843   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
844
845   if (self->lock) {
846     g_mutex_free (self->lock);
847     self->lock = NULL;
848   }
849
850   if (self->stream_finish_cond) {
851     g_cond_free (self->stream_finish_cond);
852     self->stream_finish_cond = NULL;
853   }
854
855   G_OBJECT_CLASS (parent_class)->finalize (object);
856 }
857
858 /* GObject type initialization */
859 static void
860 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
861 {
862   self->lock = g_mutex_new ();
863   self->stream_finish_cond = g_cond_new ();
864 }
865
866 static void
867 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
868 {
869   GObjectClass *gobject_class = (GObjectClass *) klass;
870   GstElementClass *element_class = (GstElementClass *) klass;
871
872   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
873       "streamsynchronizer", 0, "Stream Synchronizer");
874
875   gobject_class->finalize = gst_stream_synchronizer_finalize;
876
877   gst_element_class_add_pad_template (element_class,
878       gst_static_pad_template_get (&srctemplate));
879   gst_element_class_add_pad_template (element_class,
880       gst_static_pad_template_get (&sinktemplate));
881
882   gst_element_class_set_details_simple (element_class,
883       "Stream Synchronizer", "Generic",
884       "Synchronizes a group of streams to have equal durations and starting points",
885       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
886
887   element_class->change_state =
888       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
889   element_class->request_new_pad =
890       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
891   element_class->release_pad =
892       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
893 }