playback: rename playbin2 to playbin
[platform/upstream/gstreamer.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
30     GST_LOG_OBJECT (obj,                                                \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
34     GST_LOG_OBJECT (obj,                                                \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
40     GST_LOG_OBJECT (obj,                                                \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54
55 static const gboolean passthrough = TRUE;
56
57 #define gst_stream_synchronizer_parent_class parent_class
58 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
59     GST_TYPE_ELEMENT);
60
61 typedef struct
62 {
63   GstStreamSynchronizer *transform;
64   guint stream_number;
65   GstPad *srcpad;
66   GstPad *sinkpad;
67   GstSegment segment;
68
69   gboolean wait;
70   gboolean new_stream;
71   gboolean drop_discont;
72   gboolean is_eos;
73   gboolean seen_data;
74
75   gint64 running_time_diff;
76 } GstStream;
77
78 /* Must be called with lock! */
79 static GstPad *
80 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
81 {
82   if (stream->sinkpad == pad)
83     return gst_object_ref (stream->srcpad);
84   else if (stream->srcpad == pad)
85     return gst_object_ref (stream->sinkpad);
86
87   return NULL;
88 }
89
90 static GstPad *
91 gst_stream_get_other_pad_from_pad (GstPad * pad)
92 {
93   GstStreamSynchronizer *self =
94       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
95   GstStream *stream;
96   GstPad *opad = NULL;
97
98   GST_STREAM_SYNCHRONIZER_LOCK (self);
99   stream = gst_pad_get_element_private (pad);
100   if (!stream)
101     goto out;
102
103   opad = gst_stream_get_other_pad (stream, pad);
104
105 out:
106   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
107   gst_object_unref (self);
108
109   if (!opad)
110     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
111
112   return opad;
113 }
114
115 /* Generic pad functions */
116 static GstIterator *
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
118 {
119   GstIterator *it = NULL;
120   GstPad *opad;
121
122   opad = gst_stream_get_other_pad_from_pad (pad);
123   if (opad) {
124     GValue value = { 0, };
125
126     g_value_init (&value, GST_TYPE_PAD);
127     g_value_set_object (&value, opad);
128     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
129     g_value_unset (&value);
130     gst_object_unref (opad);
131   }
132
133   return it;
134 }
135
136 static gboolean
137 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
138 {
139   GstPad *opad;
140   gboolean ret = FALSE;
141
142   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
143
144   opad = gst_stream_get_other_pad_from_pad (pad);
145   if (opad) {
146     ret = gst_pad_peer_query (opad, query);
147     gst_object_unref (opad);
148   }
149
150   return ret;
151 }
152
153 static GstCaps *
154 gst_stream_synchronizer_getcaps (GstPad * pad, GstCaps * filter)
155 {
156   GstPad *opad;
157   GstCaps *ret = NULL;
158
159   opad = gst_stream_get_other_pad_from_pad (pad);
160   if (opad) {
161     ret = gst_pad_peer_get_caps (opad, filter);
162     gst_object_unref (opad);
163   }
164
165   if (ret == NULL)
166     ret = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
167
168   GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
169
170   return ret;
171 }
172
173 static gboolean
174 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
175 {
176   GstPad *opad;
177   gboolean ret = FALSE;
178
179   opad = gst_stream_get_other_pad_from_pad (pad);
180   if (opad) {
181     ret = gst_pad_peer_accept_caps (opad, caps);
182     gst_object_unref (opad);
183   }
184
185   GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
186       caps);
187
188   return ret;
189 }
190
191 /* srcpad functions */
192 static gboolean
193 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
194 {
195   GstStreamSynchronizer *self =
196       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
197   GstPad *opad;
198   gboolean ret = FALSE;
199
200   if (passthrough)
201     goto skip_adjustments;
202
203   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
204       GST_EVENT_TYPE_NAME (event), event);
205
206   switch (GST_EVENT_TYPE (event)) {
207     case GST_EVENT_QOS:{
208       gdouble proportion;
209       GstClockTimeDiff diff;
210       GstClockTime timestamp;
211       gint64 running_time_diff;
212       GstStream *stream;
213
214       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
215       gst_event_unref (event);
216
217       GST_STREAM_SYNCHRONIZER_LOCK (self);
218       stream = gst_pad_get_element_private (pad);
219       if (stream)
220         running_time_diff = stream->running_time_diff;
221       else
222         running_time_diff = -1;
223       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
224
225       if (running_time_diff == -1) {
226         GST_WARNING_OBJECT (pad, "QOS event before group start");
227         goto out;
228       } else if (timestamp < running_time_diff) {
229         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
230         goto out;
231       }
232
233       GST_LOG_OBJECT (pad,
234           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
235           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
236           GST_TIME_ARGS (running_time_diff),
237           GST_TIME_ARGS (timestamp - running_time_diff));
238
239       timestamp -= running_time_diff;
240
241       /* That case is invalid for QoS events */
242       if (diff < 0 && -diff > timestamp) {
243         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
244         ret = TRUE;
245         goto out;
246       }
247
248       event =
249           gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
250           timestamp);
251       break;
252     }
253     default:
254       break;
255   }
256
257 skip_adjustments:
258
259   opad = gst_stream_get_other_pad_from_pad (pad);
260   if (opad) {
261     ret = gst_pad_push_event (opad, event);
262     gst_object_unref (opad);
263   }
264
265 out:
266   gst_object_unref (self);
267
268   return ret;
269 }
270
271 /* sinkpad functions */
272 static gboolean
273 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
274 {
275   GstStreamSynchronizer *self =
276       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
277   GstPad *opad;
278   gboolean ret = FALSE;
279
280   if (passthrough)
281     goto skip_adjustments;
282
283   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
284       GST_EVENT_TYPE_NAME (event), event);
285
286   switch (GST_EVENT_TYPE (event)) {
287     case GST_EVENT_SINK_MESSAGE:{
288       GstMessage *message;
289
290       gst_event_parse_sink_message (event, &message);
291       if (gst_message_has_name (message, "playbin-stream-changed")) {
292         GstStream *stream;
293
294         GST_STREAM_SYNCHRONIZER_LOCK (self);
295         stream = gst_pad_get_element_private (pad);
296         if (stream) {
297           GList *l;
298           gboolean all_wait = TRUE;
299
300           GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
301
302           stream->is_eos = FALSE;
303           stream->wait = TRUE;
304           stream->new_stream = TRUE;
305
306           for (l = self->streams; l; l = l->next) {
307             GstStream *ostream = l->data;
308
309             all_wait = all_wait && ostream->wait;
310             if (!all_wait)
311               break;
312           }
313           if (all_wait) {
314             gint64 position = 0;
315
316             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
317
318             for (l = self->streams; l; l = l->next) {
319               GstStream *ostream = l->data;
320               gint64 stop_running_time;
321               gint64 position_running_time;
322
323               ostream->wait = FALSE;
324
325               stop_running_time =
326                   gst_segment_to_running_time (&ostream->segment,
327                   GST_FORMAT_TIME, ostream->segment.stop);
328               position_running_time =
329                   gst_segment_to_running_time (&ostream->segment,
330                   GST_FORMAT_TIME, ostream->segment.position);
331               position =
332                   MAX (position, MAX (stop_running_time,
333                       position_running_time));
334             }
335             position = MAX (0, position);
336             self->group_start_time = MAX (self->group_start_time, position);
337
338             GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
339                 GST_TIME_ARGS (self->group_start_time));
340
341             g_cond_broadcast (self->stream_finish_cond);
342           }
343         }
344         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
345       }
346       gst_message_unref (message);
347       break;
348     }
349     case GST_EVENT_SEGMENT:{
350       GstStream *stream;
351       GstSegment segment;
352
353       gst_event_copy_segment (event, &segment);
354
355       GST_STREAM_SYNCHRONIZER_LOCK (self);
356       stream = gst_pad_get_element_private (pad);
357       if (stream) {
358         if (stream->wait) {
359           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
360           g_cond_wait (self->stream_finish_cond, self->lock);
361           stream = gst_pad_get_element_private (pad);
362           if (stream)
363             stream->wait = FALSE;
364         }
365       }
366
367       if (self->shutdown) {
368         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
369         gst_event_unref (event);
370         goto done;
371       }
372
373       if (stream && segment.format == GST_FORMAT_TIME) {
374         if (stream->new_stream) {
375           gint64 position_running_time = 0;
376           gint64 stop_running_time = 0;
377
378           if (stream->segment.format == GST_FORMAT_TIME) {
379             position_running_time =
380                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
381                 stream->segment.position);
382             position_running_time = MAX (position_running_time, 0);
383             stop_running_time =
384                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
385                 stream->segment.stop);
386             stop_running_time = MAX (position_running_time, 0);
387
388             if (stop_running_time != position_running_time) {
389               GST_WARNING_OBJECT (pad,
390                   "Gap between position and segment stop: %" GST_TIME_FORMAT
391                   " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
392                   GST_TIME_ARGS (position_running_time));
393             }
394
395             if (stop_running_time < position_running_time) {
396               GST_DEBUG_OBJECT (pad, "Updating stop position");
397               stream->segment.stop = stream->segment.position;
398               gst_pad_push_event (stream->srcpad,
399                   gst_event_new_segment (&stream->segment));
400             }
401             stop_running_time = MAX (stop_running_time, position_running_time);
402             GST_DEBUG_OBJECT (pad,
403                 "Stop running time of last group: %" GST_TIME_FORMAT,
404                 GST_TIME_ARGS (stop_running_time));
405           }
406           stream->new_stream = FALSE;
407           stream->drop_discont = TRUE;
408
409           if (stop_running_time < self->group_start_time) {
410             gint64 diff = self->group_start_time - stop_running_time;
411
412             GST_DEBUG_OBJECT (pad,
413                 "Advancing running time for other streams by: %"
414                 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
415
416             segment.base += diff;
417           }
418         }
419
420         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
421             &stream->segment);
422         gst_segment_copy_into (&segment, &stream->segment);
423         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
424             &stream->segment);
425
426         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
427             GST_TIME_ARGS (stream->segment.base));
428         stream->running_time_diff = stream->segment.base;
429       } else if (stream) {
430         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
431             gst_format_get_name (segment.format));
432         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
433       }
434       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
435       break;
436     }
437     case GST_EVENT_FLUSH_STOP:{
438       GstStream *stream;
439
440       GST_STREAM_SYNCHRONIZER_LOCK (self);
441       stream = gst_pad_get_element_private (pad);
442       if (stream) {
443         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
444             stream->stream_number);
445         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
446
447         stream->is_eos = FALSE;
448         stream->wait = FALSE;
449         stream->new_stream = FALSE;
450         stream->drop_discont = FALSE;
451         stream->seen_data = FALSE;
452       }
453       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
454       break;
455     }
456     case GST_EVENT_EOS:{
457       GstStream *stream;
458       GList *l;
459       gboolean all_eos = TRUE;
460       gboolean seen_data;
461       GSList *pads = NULL;
462       GstPad *srcpad;
463
464       GST_STREAM_SYNCHRONIZER_LOCK (self);
465       stream = gst_pad_get_element_private (pad);
466       if (!stream) {
467         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
468         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
469         break;
470       }
471
472       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
473       stream->is_eos = TRUE;
474
475       seen_data = stream->seen_data;
476       srcpad = gst_object_ref (stream->srcpad);
477
478       for (l = self->streams; l; l = l->next) {
479         GstStream *ostream = l->data;
480
481         all_eos = all_eos && ostream->is_eos;
482         if (!all_eos)
483           break;
484       }
485
486       if (all_eos) {
487         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
488         for (l = self->streams; l; l = l->next) {
489           GstStream *ostream = l->data;
490           /* local snapshot of current pads */
491           gst_object_ref (ostream->srcpad);
492           pads = g_slist_prepend (pads, ostream->srcpad);
493         }
494       }
495       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
496       /* drop lock when sending eos, which may block in e.g. preroll */
497       if (pads) {
498         GstPad *pad;
499         GSList *epad;
500
501         ret = TRUE;
502         epad = pads;
503         while (epad) {
504           pad = epad->data;
505           GST_DEBUG_OBJECT (pad, "Pushing EOS");
506           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
507           gst_object_unref (pad);
508           epad = g_slist_next (epad);
509         }
510         g_slist_free (pads);
511       } else {
512         /* if EOS, but no data has passed, then send something to replace EOS
513          * for preroll purposes */
514         if (!seen_data) {
515           GstBuffer *buf = gst_buffer_new ();
516
517           GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
518           gst_pad_push (srcpad, buf);
519         }
520       }
521       gst_object_unref (srcpad);
522       goto done;
523       break;
524     }
525     default:
526       break;
527   }
528
529 skip_adjustments:
530
531   opad = gst_stream_get_other_pad_from_pad (pad);
532   if (opad) {
533     ret = gst_pad_push_event (opad, event);
534     gst_object_unref (opad);
535   }
536
537 done:
538   gst_object_unref (self);
539
540   return ret;
541 }
542
543 static GstFlowReturn
544 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
545 {
546   GstStreamSynchronizer *self =
547       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
548   GstPad *opad;
549   GstFlowReturn ret = GST_FLOW_ERROR;
550   GstStream *stream;
551   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
552   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
553
554   if (passthrough) {
555     opad = gst_stream_get_other_pad_from_pad (pad);
556     if (opad) {
557       ret = gst_pad_push (opad, buffer);
558       gst_object_unref (opad);
559     }
560     goto done;
561   }
562
563   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
564       GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
565       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
566       buffer, gst_buffer_get_size (buffer),
567       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
568       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
569       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
570
571   timestamp = GST_BUFFER_TIMESTAMP (buffer);
572   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
573       && GST_BUFFER_DURATION_IS_VALID (buffer))
574     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
575
576   GST_STREAM_SYNCHRONIZER_LOCK (self);
577   stream = gst_pad_get_element_private (pad);
578
579   stream->seen_data = TRUE;
580   if (stream && stream->drop_discont) {
581     buffer = gst_buffer_make_writable (buffer);
582     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
583     stream->drop_discont = FALSE;
584   }
585
586   if (stream && stream->segment.format == GST_FORMAT_TIME
587       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
588     GST_LOG_OBJECT (pad,
589         "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
590         GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
591     stream->segment.position = timestamp;
592   }
593   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
594
595   opad = gst_stream_get_other_pad_from_pad (pad);
596   if (opad) {
597     ret = gst_pad_push (opad, buffer);
598     gst_object_unref (opad);
599   }
600
601   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
602   if (ret == GST_FLOW_OK) {
603     GList *l;
604
605     GST_STREAM_SYNCHRONIZER_LOCK (self);
606     stream = gst_pad_get_element_private (pad);
607     if (stream && stream->segment.format == GST_FORMAT_TIME
608         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
609       GST_LOG_OBJECT (pad,
610           "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
611           GST_TIME_ARGS (stream->segment.position),
612           GST_TIME_ARGS (timestamp_end));
613       stream->segment.position = timestamp_end;
614     }
615
616     /* Advance EOS streams if necessary. For non-EOS
617      * streams the demuxers should already do this! */
618     for (l = self->streams; l; l = l->next) {
619       GstStream *ostream = l->data;
620       gint64 position;
621
622       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
623         continue;
624
625       if (ostream->segment.position != -1)
626         position = ostream->segment.position;
627       else
628         position = ostream->segment.start;
629
630       /* Is there a 1 second lag? */
631       if (position != -1 && position + GST_SECOND < timestamp_end) {
632         gint64 new_start, new_stop;
633
634         new_start = timestamp_end - GST_SECOND;
635         if (ostream->segment.stop == -1)
636           new_stop = -1;
637         else
638           new_stop = MAX (new_start, ostream->segment.stop);
639
640         GST_DEBUG_OBJECT (ostream->sinkpad,
641             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
642             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
643             GST_TIME_ARGS (new_start));
644
645         ostream->segment.start = new_start;
646         ostream->segment.stop = new_stop;
647         ostream->segment.time = new_start;
648         ostream->segment.position = new_start;
649
650         gst_pad_push_event (ostream->srcpad,
651             gst_event_new_segment (&ostream->segment));
652       }
653     }
654     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
655   }
656
657 done:
658
659   gst_object_unref (self);
660
661   return ret;
662 }
663
664 /* GstElement vfuncs */
665 static GstPad *
666 gst_stream_synchronizer_request_new_pad (GstElement * element,
667     GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
668 {
669   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
670   GstStream *stream;
671   gchar *tmp;
672
673   GST_STREAM_SYNCHRONIZER_LOCK (self);
674   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
675       self->current_stream_number);
676
677   stream = g_slice_new0 (GstStream);
678   stream->transform = self;
679   stream->stream_number = self->current_stream_number;
680
681   tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
682   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
683   g_free (tmp);
684   gst_pad_set_element_private (stream->sinkpad, stream);
685   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
686       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
687   gst_pad_set_query_function (stream->sinkpad,
688       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
689   gst_pad_set_getcaps_function (stream->sinkpad,
690       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
691   gst_pad_set_acceptcaps_function (stream->sinkpad,
692       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
693   gst_pad_set_event_function (stream->sinkpad,
694       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
695   gst_pad_set_chain_function (stream->sinkpad,
696       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
697
698   tmp = g_strdup_printf ("src_%d", self->current_stream_number);
699   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
700   g_free (tmp);
701   gst_pad_set_element_private (stream->srcpad, stream);
702   gst_pad_set_iterate_internal_links_function (stream->srcpad,
703       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
704   gst_pad_set_query_function (stream->srcpad,
705       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
706   gst_pad_set_getcaps_function (stream->srcpad,
707       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
708   gst_pad_set_acceptcaps_function (stream->srcpad,
709       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
710   gst_pad_set_event_function (stream->srcpad,
711       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
712
713   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
714
715   self->streams = g_list_prepend (self->streams, stream);
716   self->current_stream_number++;
717   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
718
719   /* Add pads and activate unless we're going to NULL */
720   g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
721   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
722     gst_pad_set_active (stream->srcpad, TRUE);
723     gst_pad_set_active (stream->sinkpad, TRUE);
724   }
725   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
726   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
727   g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
728
729   return stream->sinkpad;
730 }
731
732 /* Must be called with lock! */
733 static void
734 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
735     GstStream * stream)
736 {
737   GList *l;
738
739   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
740
741   for (l = self->streams; l; l = l->next) {
742     if (l->data == stream) {
743       self->streams = g_list_delete_link (self->streams, l);
744       break;
745     }
746   }
747   g_assert (l != NULL);
748
749   /* we can drop the lock, since stream exists now only local.
750    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
751    * (due to reverse lock order) when deactivating pads */
752   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
753
754   gst_pad_set_element_private (stream->srcpad, NULL);
755   gst_pad_set_element_private (stream->sinkpad, NULL);
756   gst_pad_set_active (stream->srcpad, FALSE);
757   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
758   gst_pad_set_active (stream->sinkpad, FALSE);
759   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
760
761   if (stream->segment.format == GST_FORMAT_TIME) {
762     gint64 stop_running_time;
763     gint64 position_running_time;
764
765     stop_running_time =
766         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
767         stream->segment.stop);
768     position_running_time =
769         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
770         stream->segment.position);
771     stop_running_time = MAX (stop_running_time, position_running_time);
772
773     GST_DEBUG_OBJECT (stream->sinkpad,
774         "Stop running time was: %" GST_TIME_FORMAT,
775         GST_TIME_ARGS (stop_running_time));
776
777     self->group_start_time = MAX (self->group_start_time, stop_running_time);
778   }
779
780   g_slice_free (GstStream, stream);
781
782   /* NOTE: In theory we have to check here if all streams
783    * are EOS but the one that was removed wasn't and then
784    * send EOS downstream. But due to the way how playsink
785    * works this is not necessary and will only cause problems
786    * for gapless playback. playsink will only add/remove pads
787    * when it's reconfigured, which happens when the streams
788    * change
789    */
790
791   /* lock for good measure, since the caller had it */
792   GST_STREAM_SYNCHRONIZER_LOCK (self);
793 }
794
795 static void
796 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
797 {
798   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
799   GstStream *stream;
800
801   GST_STREAM_SYNCHRONIZER_LOCK (self);
802   stream = gst_pad_get_element_private (pad);
803   if (stream) {
804     g_assert (stream->sinkpad == pad);
805
806     gst_stream_synchronizer_release_stream (self, stream);
807   }
808   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
809 }
810
811 static GstStateChangeReturn
812 gst_stream_synchronizer_change_state (GstElement * element,
813     GstStateChange transition)
814 {
815   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
816   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
817
818   switch (transition) {
819     case GST_STATE_CHANGE_NULL_TO_READY:
820       GST_DEBUG_OBJECT (self, "State change NULL->READY");
821       self->shutdown = FALSE;
822       break;
823     case GST_STATE_CHANGE_READY_TO_PAUSED:
824       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
825       self->group_start_time = 0;
826       self->shutdown = FALSE;
827       break;
828     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
829       GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
830       break;
831     case GST_STATE_CHANGE_PAUSED_TO_READY:
832       GST_DEBUG_OBJECT (self, "State change READY->NULL");
833
834       GST_STREAM_SYNCHRONIZER_LOCK (self);
835       g_cond_broadcast (self->stream_finish_cond);
836       self->shutdown = TRUE;
837       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
838     default:
839       break;
840   }
841
842   {
843     GstStateChangeReturn bret;
844
845     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
846     GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
847     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
848       return ret;
849   }
850
851   switch (transition) {
852     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
853       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
854       break;
855     case GST_STATE_CHANGE_PAUSED_TO_READY:{
856       GList *l;
857
858       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
859       self->group_start_time = 0;
860
861       GST_STREAM_SYNCHRONIZER_LOCK (self);
862       for (l = self->streams; l; l = l->next) {
863         GstStream *stream = l->data;
864
865         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
866         stream->wait = FALSE;
867         stream->new_stream = FALSE;
868         stream->drop_discont = FALSE;
869         stream->is_eos = FALSE;
870       }
871       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
872       break;
873     }
874     case GST_STATE_CHANGE_READY_TO_NULL:{
875       GST_DEBUG_OBJECT (self, "State change READY->NULL");
876
877       GST_STREAM_SYNCHRONIZER_LOCK (self);
878       while (self->streams)
879         gst_stream_synchronizer_release_stream (self, self->streams->data);
880       self->current_stream_number = 0;
881       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
882       break;
883     }
884     default:
885       break;
886   }
887
888   return ret;
889 }
890
891 /* GObject vfuncs */
892 static void
893 gst_stream_synchronizer_finalize (GObject * object)
894 {
895   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
896
897   if (self->lock) {
898     g_mutex_free (self->lock);
899     self->lock = NULL;
900   }
901
902   if (self->stream_finish_cond) {
903     g_cond_free (self->stream_finish_cond);
904     self->stream_finish_cond = NULL;
905   }
906
907   G_OBJECT_CLASS (parent_class)->finalize (object);
908 }
909
910 /* GObject type initialization */
911 static void
912 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
913 {
914   self->lock = g_mutex_new ();
915   self->stream_finish_cond = g_cond_new ();
916 }
917
918 static void
919 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
920 {
921   GObjectClass *gobject_class = (GObjectClass *) klass;
922   GstElementClass *element_class = (GstElementClass *) klass;
923
924   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
925       "streamsynchronizer", 0, "Stream Synchronizer");
926
927   gobject_class->finalize = gst_stream_synchronizer_finalize;
928
929   gst_element_class_add_pad_template (element_class,
930       gst_static_pad_template_get (&srctemplate));
931   gst_element_class_add_pad_template (element_class,
932       gst_static_pad_template_get (&sinktemplate));
933
934   gst_element_class_set_details_simple (element_class,
935       "Stream Synchronizer", "Generic",
936       "Synchronizes a group of streams to have equal durations and starting points",
937       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
938
939   element_class->change_state =
940       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
941   element_class->request_new_pad =
942       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
943   element_class->release_pad =
944       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
945 }