Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
30     GST_LOG_OBJECT (obj,                                                \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
34     GST_LOG_OBJECT (obj,                                                \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
40     GST_LOG_OBJECT (obj,                                                \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54
55 static const gboolean passthrough = TRUE;
56
57 #define gst_stream_synchronizer_parent_class parent_class
58 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
59     GST_TYPE_ELEMENT);
60
61 typedef struct
62 {
63   GstStreamSynchronizer *transform;
64   guint stream_number;
65   GstPad *srcpad;
66   GstPad *sinkpad;
67   GstSegment segment;
68
69   gboolean wait;
70   gboolean new_stream;
71   gboolean drop_discont;
72   gboolean is_eos;
73   gboolean seen_data;
74
75   gint64 running_time_diff;
76 } GstStream;
77
78 /* Must be called with lock! */
79 static GstPad *
80 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
81 {
82   if (stream->sinkpad == pad)
83     return gst_object_ref (stream->srcpad);
84   else if (stream->srcpad == pad)
85     return gst_object_ref (stream->sinkpad);
86
87   return NULL;
88 }
89
90 static GstPad *
91 gst_stream_get_other_pad_from_pad (GstPad * pad)
92 {
93   GstStreamSynchronizer *self =
94       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
95   GstStream *stream;
96   GstPad *opad = NULL;
97
98   GST_STREAM_SYNCHRONIZER_LOCK (self);
99   stream = gst_pad_get_element_private (pad);
100   if (!stream)
101     goto out;
102
103   opad = gst_stream_get_other_pad (stream, pad);
104
105 out:
106   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
107   gst_object_unref (self);
108
109   if (!opad)
110     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
111
112   return opad;
113 }
114
115 /* Generic pad functions */
116 static GstIterator *
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
118 {
119   GstIterator *it = NULL;
120   GstPad *opad;
121
122   opad = gst_stream_get_other_pad_from_pad (pad);
123   if (opad) {
124     GValue value = { 0, };
125
126     g_value_init (&value, GST_TYPE_PAD);
127     g_value_set_object (&value, opad);
128     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
129     g_value_unset (&value);
130     gst_object_unref (opad);
131   }
132
133   return it;
134 }
135
136 static gboolean
137 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
138 {
139   GstPad *opad;
140   gboolean ret = FALSE;
141
142   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
143
144   opad = gst_stream_get_other_pad_from_pad (pad);
145   if (opad) {
146     ret = gst_pad_peer_query (opad, query);
147     gst_object_unref (opad);
148   }
149
150   return ret;
151 }
152
153 static GstCaps *
154 gst_stream_synchronizer_getcaps (GstPad * pad, GstCaps * filter)
155 {
156   GstPad *opad;
157   GstCaps *ret = NULL;
158
159   opad = gst_stream_get_other_pad_from_pad (pad);
160   if (opad) {
161     ret = gst_pad_peer_get_caps (opad, filter);
162     gst_object_unref (opad);
163   }
164
165   if (ret == NULL)
166     ret = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
167
168   GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
169
170   return ret;
171 }
172
173 static gboolean
174 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
175 {
176   GstPad *opad;
177   gboolean ret = FALSE;
178
179   opad = gst_stream_get_other_pad_from_pad (pad);
180   if (opad) {
181     ret = gst_pad_peer_accept_caps (opad, caps);
182     gst_object_unref (opad);
183   }
184
185   GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
186       caps);
187
188   return ret;
189 }
190
191 /* srcpad functions */
192 static gboolean
193 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
194 {
195   GstStreamSynchronizer *self =
196       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
197   GstPad *opad;
198   gboolean ret = FALSE;
199
200   if (passthrough)
201     goto skip_adjustments;
202
203   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
204       GST_EVENT_TYPE_NAME (event), event);
205
206   switch (GST_EVENT_TYPE (event)) {
207     case GST_EVENT_QOS:{
208       gdouble proportion;
209       GstClockTimeDiff diff;
210       GstClockTime timestamp;
211       gint64 running_time_diff;
212       GstStream *stream;
213
214       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
215       gst_event_unref (event);
216
217       GST_STREAM_SYNCHRONIZER_LOCK (self);
218       stream = gst_pad_get_element_private (pad);
219       if (stream)
220         running_time_diff = stream->running_time_diff;
221       else
222         running_time_diff = -1;
223       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
224
225       if (running_time_diff == -1) {
226         GST_WARNING_OBJECT (pad, "QOS event before group start");
227         goto out;
228       } else if (timestamp < running_time_diff) {
229         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
230         goto out;
231       }
232
233       GST_LOG_OBJECT (pad,
234           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
235           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
236           GST_TIME_ARGS (running_time_diff),
237           GST_TIME_ARGS (timestamp - running_time_diff));
238
239       timestamp -= running_time_diff;
240
241       /* That case is invalid for QoS events */
242       if (diff < 0 && -diff > timestamp) {
243         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
244         ret = TRUE;
245         goto out;
246       }
247
248       event =
249           gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
250           timestamp);
251       break;
252     }
253     default:
254       break;
255   }
256
257 skip_adjustments:
258
259   opad = gst_stream_get_other_pad_from_pad (pad);
260   if (opad) {
261     ret = gst_pad_push_event (opad, event);
262     gst_object_unref (opad);
263   }
264
265 out:
266   gst_object_unref (self);
267
268   return ret;
269 }
270
271 /* sinkpad functions */
272 static gboolean
273 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
274 {
275   GstStreamSynchronizer *self =
276       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
277   GstPad *opad;
278   gboolean ret = FALSE;
279
280   if (passthrough)
281     goto skip_adjustments;
282
283   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
284       GST_EVENT_TYPE_NAME (event), event);
285
286   switch (GST_EVENT_TYPE (event)) {
287     case GST_EVENT_SINK_MESSAGE:{
288       GstMessage *message;
289
290       gst_event_parse_sink_message (event, &message);
291       if (gst_message_has_name (message, "playbin-stream-changed")) {
292         GstStream *stream;
293
294         GST_STREAM_SYNCHRONIZER_LOCK (self);
295         stream = gst_pad_get_element_private (pad);
296         if (stream) {
297           GList *l;
298           gboolean all_wait = TRUE;
299
300           GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
301
302           stream->is_eos = FALSE;
303           stream->wait = TRUE;
304           stream->new_stream = TRUE;
305
306           for (l = self->streams; l; l = l->next) {
307             GstStream *ostream = l->data;
308
309             all_wait = all_wait && ostream->wait;
310             if (!all_wait)
311               break;
312           }
313           if (all_wait) {
314             gint64 position = 0;
315
316             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
317
318             for (l = self->streams; l; l = l->next) {
319               GstStream *ostream = l->data;
320               gint64 stop_running_time;
321               gint64 position_running_time;
322
323               ostream->wait = FALSE;
324
325               stop_running_time =
326                   gst_segment_to_running_time (&ostream->segment,
327                   GST_FORMAT_TIME, ostream->segment.stop);
328               position_running_time =
329                   gst_segment_to_running_time (&ostream->segment,
330                   GST_FORMAT_TIME, ostream->segment.position);
331               position =
332                   MAX (position, MAX (stop_running_time,
333                       position_running_time));
334             }
335             position = MAX (0, position);
336             self->group_start_time = MAX (self->group_start_time, position);
337
338             GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
339                 GST_TIME_ARGS (self->group_start_time));
340
341             g_cond_broadcast (self->stream_finish_cond);
342           }
343         }
344         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
345       }
346       gst_message_unref (message);
347       break;
348     }
349     case GST_EVENT_SEGMENT:{
350       GstStream *stream;
351       GstSegment segment;
352
353       gst_event_copy_segment (event, &segment);
354
355       GST_STREAM_SYNCHRONIZER_LOCK (self);
356       stream = gst_pad_get_element_private (pad);
357       if (stream) {
358         if (stream->wait) {
359           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
360           g_cond_wait (self->stream_finish_cond, self->lock);
361           stream = gst_pad_get_element_private (pad);
362           if (stream)
363             stream->wait = FALSE;
364         }
365       }
366
367       if (self->shutdown) {
368         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
369         gst_event_unref (event);
370         goto done;
371       }
372
373       if (stream && segment.format == GST_FORMAT_TIME) {
374         if (stream->new_stream) {
375           gint64 position_running_time = 0;
376           gint64 stop_running_time = 0;
377
378           if (stream->segment.format == GST_FORMAT_TIME) {
379             position_running_time =
380                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
381                 stream->segment.position);
382             position_running_time = MAX (position_running_time, 0);
383             stop_running_time =
384                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
385                 stream->segment.stop);
386             stop_running_time = MAX (position_running_time, 0);
387
388             if (stop_running_time != position_running_time) {
389               GST_WARNING_OBJECT (pad,
390                   "Gap between position and segment stop: %" GST_TIME_FORMAT
391                   " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
392                   GST_TIME_ARGS (position_running_time));
393             }
394
395             if (stop_running_time < position_running_time) {
396               GST_DEBUG_OBJECT (pad, "Updating stop position");
397               stream->segment.stop = stream->segment.position;
398               gst_pad_push_event (stream->srcpad,
399                   gst_event_new_segment (&stream->segment));
400             }
401             stop_running_time = MAX (stop_running_time, position_running_time);
402             GST_DEBUG_OBJECT (pad,
403                 "Stop running time of last group: %" GST_TIME_FORMAT,
404                 GST_TIME_ARGS (stop_running_time));
405           }
406           stream->new_stream = FALSE;
407           stream->drop_discont = TRUE;
408
409           if (stop_running_time < self->group_start_time) {
410             gint64 diff = self->group_start_time - stop_running_time;
411
412             GST_DEBUG_OBJECT (pad,
413                 "Advancing running time for other streams by: %"
414                 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
415
416             segment.base += diff;
417           }
418         }
419
420         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
421             &stream->segment);
422         gst_segment_copy_into (&segment, &stream->segment);
423         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
424             &stream->segment);
425
426         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
427             GST_TIME_ARGS (stream->segment.base));
428         stream->running_time_diff = stream->segment.base;
429       } else if (stream) {
430         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
431             gst_format_get_name (segment.format));
432         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
433       }
434       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
435       break;
436     }
437     case GST_EVENT_FLUSH_STOP:{
438       GstStream *stream;
439
440       GST_STREAM_SYNCHRONIZER_LOCK (self);
441       stream = gst_pad_get_element_private (pad);
442       if (stream) {
443         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
444             stream->stream_number);
445         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
446
447         stream->is_eos = FALSE;
448         stream->wait = FALSE;
449         stream->new_stream = FALSE;
450         stream->drop_discont = FALSE;
451         stream->seen_data = FALSE;
452       }
453       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
454       break;
455     }
456     case GST_EVENT_EOS:{
457       GstStream *stream;
458       GList *l;
459       gboolean all_eos = TRUE;
460       gboolean seen_data;
461       GSList *pads = NULL;
462       GstPad *srcpad;
463
464       GST_STREAM_SYNCHRONIZER_LOCK (self);
465       stream = gst_pad_get_element_private (pad);
466       if (!stream) {
467         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
468         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
469         break;
470       }
471
472       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
473       stream->is_eos = TRUE;
474
475       seen_data = stream->seen_data;
476       srcpad = gst_object_ref (stream->srcpad);
477
478       for (l = self->streams; l; l = l->next) {
479         GstStream *ostream = l->data;
480
481         all_eos = all_eos && ostream->is_eos;
482         if (!all_eos)
483           break;
484       }
485
486       if (all_eos) {
487         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
488         for (l = self->streams; l; l = l->next) {
489           GstStream *ostream = l->data;
490           /* local snapshot of current pads */
491           gst_object_ref (ostream->srcpad);
492           pads = g_slist_prepend (pads, ostream->srcpad);
493         }
494       }
495       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
496       /* drop lock when sending eos, which may block in e.g. preroll */
497       if (pads) {
498         GstPad *pad;
499         GSList *epad;
500
501         ret = TRUE;
502         epad = pads;
503         while (epad) {
504           pad = epad->data;
505           GST_DEBUG_OBJECT (pad, "Pushing EOS");
506           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
507           gst_object_unref (pad);
508           epad = g_slist_next (epad);
509         }
510         g_slist_free (pads);
511       } else {
512         /* if EOS, but no data has passed, then send something to replace EOS
513          * for preroll purposes */
514         if (!seen_data) {
515           GstBuffer *buf = gst_buffer_new ();
516
517           gst_pad_push (srcpad, buf);
518         }
519       }
520       gst_object_unref (srcpad);
521       goto done;
522       break;
523     }
524     default:
525       break;
526   }
527
528 skip_adjustments:
529
530   opad = gst_stream_get_other_pad_from_pad (pad);
531   if (opad) {
532     ret = gst_pad_push_event (opad, event);
533     gst_object_unref (opad);
534   }
535
536 done:
537   gst_object_unref (self);
538
539   return ret;
540 }
541
542 static GstFlowReturn
543 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
544 {
545   GstStreamSynchronizer *self =
546       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
547   GstPad *opad;
548   GstFlowReturn ret = GST_FLOW_ERROR;
549   GstStream *stream;
550   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
551   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
552
553   if (passthrough) {
554     opad = gst_stream_get_other_pad_from_pad (pad);
555     if (opad) {
556       ret = gst_pad_push (opad, buffer);
557       gst_object_unref (opad);
558     }
559     goto done;
560   }
561
562   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
563       ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
564       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
565       buffer, gst_buffer_get_size (buffer),
566       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
567       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
568       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
569
570   timestamp = GST_BUFFER_TIMESTAMP (buffer);
571   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
572       && GST_BUFFER_DURATION_IS_VALID (buffer))
573     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
574
575   GST_STREAM_SYNCHRONIZER_LOCK (self);
576   stream = gst_pad_get_element_private (pad);
577
578   stream->seen_data = TRUE;
579   if (stream && stream->drop_discont) {
580     buffer = gst_buffer_make_writable (buffer);
581     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
582     stream->drop_discont = FALSE;
583   }
584
585   if (stream && stream->segment.format == GST_FORMAT_TIME
586       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
587     GST_LOG_OBJECT (pad,
588         "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
589         GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
590     stream->segment.position = timestamp;
591   }
592   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
593
594   opad = gst_stream_get_other_pad_from_pad (pad);
595   if (opad) {
596     ret = gst_pad_push (opad, buffer);
597     gst_object_unref (opad);
598   }
599
600   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
601   if (ret == GST_FLOW_OK) {
602     GList *l;
603
604     GST_STREAM_SYNCHRONIZER_LOCK (self);
605     stream = gst_pad_get_element_private (pad);
606     if (stream && stream->segment.format == GST_FORMAT_TIME
607         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
608       GST_LOG_OBJECT (pad,
609           "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
610           GST_TIME_ARGS (stream->segment.position),
611           GST_TIME_ARGS (timestamp_end));
612       stream->segment.position = timestamp_end;
613     }
614
615     /* Advance EOS streams if necessary. For non-EOS
616      * streams the demuxers should already do this! */
617     for (l = self->streams; l; l = l->next) {
618       GstStream *ostream = l->data;
619       gint64 position;
620
621       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
622         continue;
623
624       if (ostream->segment.position != -1)
625         position = ostream->segment.position;
626       else
627         position = ostream->segment.start;
628
629       /* Is there a 1 second lag? */
630       if (position != -1 && position + GST_SECOND < timestamp_end) {
631         gint64 new_start, new_stop;
632
633         new_start = timestamp_end - GST_SECOND;
634         if (ostream->segment.stop == -1)
635           new_stop = -1;
636         else
637           new_stop = MAX (new_start, ostream->segment.stop);
638
639         GST_DEBUG_OBJECT (ostream->sinkpad,
640             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
641             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
642             GST_TIME_ARGS (new_start));
643
644         ostream->segment.start = new_start;
645         ostream->segment.stop = new_stop;
646         ostream->segment.time = new_start;
647         ostream->segment.position = new_start;
648
649         gst_pad_push_event (ostream->srcpad,
650             gst_event_new_segment (&ostream->segment));
651       }
652     }
653     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
654   }
655
656 done:
657
658   gst_object_unref (self);
659
660   return ret;
661 }
662
663 /* GstElement vfuncs */
664 static GstPad *
665 gst_stream_synchronizer_request_new_pad (GstElement * element,
666     GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
667 {
668   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
669   GstStream *stream;
670   gchar *tmp;
671
672   GST_STREAM_SYNCHRONIZER_LOCK (self);
673   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
674       self->current_stream_number);
675
676   stream = g_slice_new0 (GstStream);
677   stream->transform = self;
678   stream->stream_number = self->current_stream_number;
679
680   tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
681   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
682   g_free (tmp);
683   gst_pad_set_element_private (stream->sinkpad, stream);
684   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
685       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
686   gst_pad_set_query_function (stream->sinkpad,
687       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
688   gst_pad_set_getcaps_function (stream->sinkpad,
689       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
690   gst_pad_set_acceptcaps_function (stream->sinkpad,
691       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
692   gst_pad_set_event_function (stream->sinkpad,
693       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
694   gst_pad_set_chain_function (stream->sinkpad,
695       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
696
697   tmp = g_strdup_printf ("src_%d", self->current_stream_number);
698   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
699   g_free (tmp);
700   gst_pad_set_element_private (stream->srcpad, stream);
701   gst_pad_set_iterate_internal_links_function (stream->srcpad,
702       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
703   gst_pad_set_query_function (stream->srcpad,
704       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
705   gst_pad_set_getcaps_function (stream->srcpad,
706       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
707   gst_pad_set_acceptcaps_function (stream->srcpad,
708       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
709   gst_pad_set_event_function (stream->srcpad,
710       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
711
712   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
713
714   self->streams = g_list_prepend (self->streams, stream);
715   self->current_stream_number++;
716   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
717
718   /* Add pads and activate unless we're going to NULL */
719   g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
720   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
721     gst_pad_set_active (stream->srcpad, TRUE);
722     gst_pad_set_active (stream->sinkpad, TRUE);
723   }
724   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
725   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
726   g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
727
728   return stream->sinkpad;
729 }
730
731 /* Must be called with lock! */
732 static void
733 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
734     GstStream * stream)
735 {
736   GList *l;
737
738   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
739
740   for (l = self->streams; l; l = l->next) {
741     if (l->data == stream) {
742       self->streams = g_list_delete_link (self->streams, l);
743       break;
744     }
745   }
746   g_assert (l != NULL);
747
748   /* we can drop the lock, since stream exists now only local.
749    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
750    * (due to reverse lock order) when deactivating pads */
751   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
752
753   gst_pad_set_element_private (stream->srcpad, NULL);
754   gst_pad_set_element_private (stream->sinkpad, NULL);
755   gst_pad_set_active (stream->srcpad, FALSE);
756   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
757   gst_pad_set_active (stream->sinkpad, FALSE);
758   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
759
760   if (stream->segment.format == GST_FORMAT_TIME) {
761     gint64 stop_running_time;
762     gint64 position_running_time;
763
764     stop_running_time =
765         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
766         stream->segment.stop);
767     position_running_time =
768         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
769         stream->segment.position);
770     stop_running_time = MAX (stop_running_time, position_running_time);
771
772     GST_DEBUG_OBJECT (stream->sinkpad,
773         "Stop running time was: %" GST_TIME_FORMAT,
774         GST_TIME_ARGS (stop_running_time));
775
776     self->group_start_time = MAX (self->group_start_time, stop_running_time);
777   }
778
779   g_slice_free (GstStream, stream);
780
781   /* NOTE: In theory we have to check here if all streams
782    * are EOS but the one that was removed wasn't and then
783    * send EOS downstream. But due to the way how playsink
784    * works this is not necessary and will only cause problems
785    * for gapless playback. playsink will only add/remove pads
786    * when it's reconfigured, which happens when the streams
787    * change
788    */
789
790   /* lock for good measure, since the caller had it */
791   GST_STREAM_SYNCHRONIZER_LOCK (self);
792 }
793
794 static void
795 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
796 {
797   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
798   GstStream *stream;
799
800   GST_STREAM_SYNCHRONIZER_LOCK (self);
801   stream = gst_pad_get_element_private (pad);
802   if (stream) {
803     g_assert (stream->sinkpad == pad);
804
805     gst_stream_synchronizer_release_stream (self, stream);
806   }
807   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
808 }
809
810 static GstStateChangeReturn
811 gst_stream_synchronizer_change_state (GstElement * element,
812     GstStateChange transition)
813 {
814   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
815   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
816
817   switch (transition) {
818     case GST_STATE_CHANGE_NULL_TO_READY:
819       GST_DEBUG_OBJECT (self, "State change NULL->READY");
820       self->shutdown = FALSE;
821       break;
822     case GST_STATE_CHANGE_READY_TO_PAUSED:
823       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
824       self->group_start_time = 0;
825       self->shutdown = FALSE;
826       break;
827     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
828       GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
829       break;
830     case GST_STATE_CHANGE_PAUSED_TO_READY:
831       GST_DEBUG_OBJECT (self, "State change READY->NULL");
832
833       GST_STREAM_SYNCHRONIZER_LOCK (self);
834       g_cond_broadcast (self->stream_finish_cond);
835       self->shutdown = TRUE;
836       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
837     default:
838       break;
839   }
840
841   {
842     GstStateChangeReturn bret;
843
844     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
845     GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
846     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
847       return ret;
848   }
849
850   switch (transition) {
851     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
852       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
853       break;
854     case GST_STATE_CHANGE_PAUSED_TO_READY:{
855       GList *l;
856
857       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
858       self->group_start_time = 0;
859
860       GST_STREAM_SYNCHRONIZER_LOCK (self);
861       for (l = self->streams; l; l = l->next) {
862         GstStream *stream = l->data;
863
864         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
865         stream->wait = FALSE;
866         stream->new_stream = FALSE;
867         stream->drop_discont = FALSE;
868         stream->is_eos = FALSE;
869       }
870       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
871       break;
872     }
873     case GST_STATE_CHANGE_READY_TO_NULL:{
874       GST_DEBUG_OBJECT (self, "State change READY->NULL");
875
876       GST_STREAM_SYNCHRONIZER_LOCK (self);
877       while (self->streams)
878         gst_stream_synchronizer_release_stream (self, self->streams->data);
879       self->current_stream_number = 0;
880       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
881       break;
882     }
883     default:
884       break;
885   }
886
887   return ret;
888 }
889
890 /* GObject vfuncs */
891 static void
892 gst_stream_synchronizer_finalize (GObject * object)
893 {
894   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
895
896   if (self->lock) {
897     g_mutex_free (self->lock);
898     self->lock = NULL;
899   }
900
901   if (self->stream_finish_cond) {
902     g_cond_free (self->stream_finish_cond);
903     self->stream_finish_cond = NULL;
904   }
905
906   G_OBJECT_CLASS (parent_class)->finalize (object);
907 }
908
909 /* GObject type initialization */
910 static void
911 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
912 {
913   self->lock = g_mutex_new ();
914   self->stream_finish_cond = g_cond_new ();
915 }
916
917 static void
918 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
919 {
920   GObjectClass *gobject_class = (GObjectClass *) klass;
921   GstElementClass *element_class = (GstElementClass *) klass;
922
923   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
924       "streamsynchronizer", 0, "Stream Synchronizer");
925
926   gobject_class->finalize = gst_stream_synchronizer_finalize;
927
928   gst_element_class_add_pad_template (element_class,
929       gst_static_pad_template_get (&srctemplate));
930   gst_element_class_add_pad_template (element_class,
931       gst_static_pad_template_get (&sinktemplate));
932
933   gst_element_class_set_details_simple (element_class,
934       "Stream Synchronizer", "Generic",
935       "Synchronizes a group of streams to have equal durations and starting points",
936       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
937
938   element_class->change_state =
939       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
940   element_class->request_new_pad =
941       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
942   element_class->release_pad =
943       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
944 }