uridecodebin: remove commented code
[platform/upstream/gstreamer.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                \
30     GST_TRACE_OBJECT (obj,                                              \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);            \
34     GST_TRACE_OBJECT (obj,                                              \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {              \
40     GST_TRACE_OBJECT (obj,                                              \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54
55 #define gst_stream_synchronizer_parent_class parent_class
56 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
57     GST_TYPE_ELEMENT);
58
59 typedef struct
60 {
61   GstStreamSynchronizer *transform;
62   guint stream_number;
63   GstPad *srcpad;
64   GstPad *sinkpad;
65   GstSegment segment;
66
67   gboolean wait;                /* TRUE if waiting/blocking */
68   gboolean new_stream;
69   gboolean drop_discont;
70   gboolean is_eos;              /* TRUE if EOS was received */
71   gboolean seen_data;
72
73   GCond stream_finish_cond;
74
75   /* seqnum of the previously received STREAM_START
76    * default: G_MAXUINT32 */
77   guint32 stream_start_seqnum;
78   guint32 segment_seqnum;
79 } GstStream;
80
81 /* Must be called with lock! */
82 static inline GstPad *
83 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
84 {
85   if (stream->sinkpad == pad)
86     return gst_object_ref (stream->srcpad);
87   if (stream->srcpad == pad)
88     return gst_object_ref (stream->sinkpad);
89
90   return NULL;
91 }
92
93 static GstPad *
94 gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
95 {
96   GstStream *stream;
97   GstPad *opad = NULL;
98
99   GST_STREAM_SYNCHRONIZER_LOCK (self);
100   stream = gst_pad_get_element_private (pad);
101   if (!stream)
102     goto out;
103
104   opad = gst_stream_get_other_pad (stream, pad);
105
106 out:
107   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
108
109   if (!opad)
110     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
111
112   return opad;
113 }
114
115 /* Generic pad functions */
116 static GstIterator *
117 gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
118     GstObject * parent)
119 {
120   GstIterator *it = NULL;
121   GstPad *opad;
122
123   opad =
124       gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
125   if (opad) {
126     GValue value = { 0, };
127
128     g_value_init (&value, GST_TYPE_PAD);
129     g_value_set_object (&value, opad);
130     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
131     g_value_unset (&value);
132     gst_object_unref (opad);
133   }
134
135   return it;
136 }
137
138 static gboolean
139 gst_stream_synchronizer_query (GstPad * pad, GstObject * parent,
140     GstQuery * query)
141 {
142   GstPad *opad;
143   gboolean ret = FALSE;
144
145   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
146
147   opad =
148       gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
149   if (opad) {
150     ret = gst_pad_peer_query (opad, query);
151     gst_object_unref (opad);
152   }
153
154   return ret;
155 }
156
157 /* srcpad functions */
158 static gboolean
159 gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
160     GstEvent * event)
161 {
162   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
163   GstPad *opad;
164   gboolean ret = FALSE;
165
166   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
167       GST_EVENT_TYPE_NAME (event), event);
168
169   switch (GST_EVENT_TYPE (event)) {
170     case GST_EVENT_QOS:{
171       gdouble proportion;
172       GstClockTimeDiff diff;
173       GstClockTime timestamp;
174       gint64 running_time_diff = -1;
175       GstStream *stream;
176
177       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
178       gst_event_unref (event);
179
180       GST_STREAM_SYNCHRONIZER_LOCK (self);
181       stream = gst_pad_get_element_private (pad);
182       if (stream)
183         running_time_diff = stream->segment.base;
184       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
185
186       if (running_time_diff == -1) {
187         GST_WARNING_OBJECT (pad, "QOS event before group start");
188         goto out;
189       }
190       if (timestamp < running_time_diff) {
191         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
192         goto out;
193       }
194
195       GST_LOG_OBJECT (pad,
196           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
197           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
198           GST_TIME_ARGS (running_time_diff),
199           GST_TIME_ARGS (timestamp - running_time_diff));
200
201       timestamp -= running_time_diff;
202
203       /* That case is invalid for QoS events */
204       if (diff < 0 && -diff > timestamp) {
205         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
206         ret = TRUE;
207         goto out;
208       }
209
210       event =
211           gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
212           timestamp);
213       break;
214     }
215     default:
216       break;
217   }
218
219   opad = gst_stream_get_other_pad_from_pad (self, pad);
220   if (opad) {
221     ret = gst_pad_push_event (opad, event);
222     gst_object_unref (opad);
223   }
224
225 out:
226   return ret;
227 }
228
229 /* sinkpad functions */
230 static gboolean
231 gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
232     GstEvent * event)
233 {
234   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
235   GstPad *opad;
236   gboolean ret = FALSE;
237
238   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
239       GST_EVENT_TYPE_NAME (event), event);
240
241   switch (GST_EVENT_TYPE (event)) {
242     case GST_EVENT_STREAM_START:
243     {
244       GstStream *stream;
245       guint32 seqnum = gst_event_get_seqnum (event);
246       GList *l;
247       gboolean all_wait = TRUE;
248
249       GST_STREAM_SYNCHRONIZER_LOCK (self);
250       stream = gst_pad_get_element_private (pad);
251       if (stream && stream->stream_start_seqnum != seqnum) {
252
253         GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
254
255         stream->is_eos = FALSE;
256         stream->wait = TRUE;
257         stream->new_stream = TRUE;
258
259         for (l = self->streams; l; l = l->next) {
260           GstStream *ostream = l->data;
261
262           all_wait = all_wait && ostream->wait;
263           if (!all_wait)
264             break;
265         }
266         if (all_wait) {
267           gint64 position = 0;
268
269           GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
270
271           for (l = self->streams; l; l = l->next) {
272             GstStream *ostream = l->data;
273             gint64 stop_running_time;
274             gint64 position_running_time;
275
276             ostream->wait = FALSE;
277
278             if (ostream->segment.format == GST_FORMAT_TIME) {
279               stop_running_time =
280                   gst_segment_to_running_time (&ostream->segment,
281                   GST_FORMAT_TIME, ostream->segment.stop);
282               position_running_time =
283                   gst_segment_to_running_time (&ostream->segment,
284                   GST_FORMAT_TIME, ostream->segment.position);
285               position =
286                   MAX (position, MAX (stop_running_time,
287                       position_running_time));
288             }
289           }
290           position = MAX (0, position);
291           self->group_start_time = MAX (self->group_start_time, position);
292
293           GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
294               GST_TIME_ARGS (self->group_start_time));
295
296           for (l = self->streams; l; l = l->next) {
297             GstStream *ostream = l->data;
298             g_cond_broadcast (&ostream->stream_finish_cond);
299           }
300         }
301       } else
302         GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
303       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
304     }
305       break;
306     case GST_EVENT_SEGMENT:{
307       GstStream *stream;
308       GstSegment segment;
309
310       gst_event_copy_segment (event, &segment);
311
312       GST_STREAM_SYNCHRONIZER_LOCK (self);
313       stream = gst_pad_get_element_private (pad);
314       if (stream) {
315         if (stream->wait) {
316           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
317           g_cond_wait (&stream->stream_finish_cond, &self->lock);
318           stream = gst_pad_get_element_private (pad);
319           if (stream)
320             stream->wait = FALSE;
321         }
322       }
323
324       if (self->shutdown) {
325         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
326         gst_event_unref (event);
327         goto done;
328       }
329
330       if (stream && segment.format == GST_FORMAT_TIME) {
331         if (stream->new_stream) {
332           stream->new_stream = FALSE;
333           stream->drop_discont = TRUE;
334           segment.base = self->group_start_time;
335         }
336
337         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
338             &stream->segment);
339         gst_segment_copy_into (&segment, &stream->segment);
340         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
341             &stream->segment);
342         stream->segment_seqnum = gst_event_get_seqnum (event);
343
344         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
345             GST_TIME_ARGS (stream->segment.base));
346         {
347           GstEvent *tmpev;
348
349           tmpev = gst_event_new_segment (&stream->segment);
350           gst_event_set_seqnum (tmpev, stream->segment_seqnum);
351           gst_event_unref (event);
352           event = tmpev;
353         }
354
355       } else if (stream) {
356         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
357             gst_format_get_name (segment.format));
358         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
359         /* Since this stream is not time-based, we mark it so that
360          * other streams don't wait forever on it */
361         stream->wait = TRUE;
362       }
363       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
364       break;
365     }
366     case GST_EVENT_FLUSH_START:{
367       GstStream *stream;
368
369       GST_STREAM_SYNCHRONIZER_LOCK (self);
370       stream = gst_pad_get_element_private (pad);
371       if (stream) {
372         GST_DEBUG_OBJECT (pad, "Flushing streams");
373         g_cond_broadcast (&stream->stream_finish_cond);
374       }
375       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
376       break;
377     }
378     case GST_EVENT_FLUSH_STOP:{
379       GstStream *stream;
380
381       GST_STREAM_SYNCHRONIZER_LOCK (self);
382       stream = gst_pad_get_element_private (pad);
383       if (stream) {
384         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
385             stream->stream_number);
386         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
387
388         stream->is_eos = FALSE;
389         stream->wait = FALSE;
390         stream->new_stream = FALSE;
391         stream->drop_discont = FALSE;
392         stream->seen_data = FALSE;
393         g_cond_broadcast (&stream->stream_finish_cond);
394       }
395       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
396       break;
397     }
398     case GST_EVENT_EOS:{
399       GstStream *stream;
400       GList *l;
401       gboolean all_eos = TRUE;
402       gboolean seen_data;
403       GSList *pads = NULL;
404       GstPad *srcpad;
405       GstClockTime timestamp;
406
407       GST_STREAM_SYNCHRONIZER_LOCK (self);
408       stream = gst_pad_get_element_private (pad);
409       if (!stream) {
410         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
411         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
412         break;
413       }
414
415       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
416       stream->is_eos = TRUE;
417
418       seen_data = stream->seen_data;
419       srcpad = gst_object_ref (stream->srcpad);
420
421       if (seen_data && stream->segment.position != -1)
422         timestamp = stream->segment.position;
423       else if (stream->segment.rate < 0.0 || stream->segment.stop == -1)
424         timestamp = stream->segment.start;
425       else
426         timestamp = stream->segment.stop;
427
428       for (l = self->streams; l; l = l->next) {
429         GstStream *ostream = l->data;
430
431         all_eos = all_eos && ostream->is_eos;
432         if (!all_eos)
433           break;
434       }
435
436       if (all_eos) {
437         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
438         for (l = self->streams; l; l = l->next) {
439           GstStream *ostream = l->data;
440           /* local snapshot of current pads */
441           gst_object_ref (ostream->srcpad);
442           pads = g_slist_prepend (pads, ostream->srcpad);
443         }
444       }
445       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
446       /* drop lock when sending eos, which may block in e.g. preroll */
447       if (pads) {
448         GstPad *pad;
449         GSList *epad;
450
451         ret = TRUE;
452         epad = pads;
453         while (epad) {
454           pad = epad->data;
455           GST_DEBUG_OBJECT (pad, "Pushing EOS");
456           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
457           gst_object_unref (pad);
458           epad = g_slist_next (epad);
459         }
460         g_slist_free (pads);
461       } else {
462         /* if EOS, but no data has passed, then send something to replace EOS
463          * for preroll purposes */
464         if (!seen_data) {
465           GstEvent *gap_event;
466
467           gap_event = gst_event_new_gap (timestamp, GST_CLOCK_TIME_NONE);
468           ret = gst_pad_push_event (srcpad, gap_event);
469         } else {
470           GstEvent *gap_event;
471
472           /* FIXME: Also send a GAP event to let audio sinks start their
473            * clock in case they did not have enough data yet */
474           gap_event = gst_event_new_gap (timestamp, GST_CLOCK_TIME_NONE);
475           ret = gst_pad_push_event (srcpad, gap_event);
476         }
477       }
478       gst_object_unref (srcpad);
479       gst_event_unref (event);
480       goto done;
481     }
482     default:
483       break;
484   }
485
486   opad = gst_stream_get_other_pad_from_pad (self, pad);
487   if (opad) {
488     ret = gst_pad_push_event (opad, event);
489     gst_object_unref (opad);
490   }
491
492 done:
493
494   return ret;
495 }
496
497 static GstFlowReturn
498 gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
499     GstBuffer * buffer)
500 {
501   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
502   GstPad *opad;
503   GstFlowReturn ret = GST_FLOW_ERROR;
504   GstStream *stream;
505   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
506   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
507
508   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
509       ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
510       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
511       buffer, gst_buffer_get_size (buffer),
512       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
513       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
514       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
515
516   timestamp = GST_BUFFER_TIMESTAMP (buffer);
517   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
518       && GST_BUFFER_DURATION_IS_VALID (buffer))
519     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
520
521   GST_STREAM_SYNCHRONIZER_LOCK (self);
522   stream = gst_pad_get_element_private (pad);
523
524   if (stream) {
525     stream->seen_data = TRUE;
526     if (stream->drop_discont) {
527       if (GST_BUFFER_IS_DISCONT (buffer)) {
528         GST_DEBUG_OBJECT (pad, "removing DISCONT from buffer %p", buffer);
529         buffer = gst_buffer_make_writable (buffer);
530         GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
531       }
532       stream->drop_discont = FALSE;
533     }
534
535     if (stream->segment.format == GST_FORMAT_TIME
536         && GST_CLOCK_TIME_IS_VALID (timestamp)) {
537       GST_LOG_OBJECT (pad,
538           "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
539           GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
540       stream->segment.position = timestamp;
541     }
542   }
543   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
544
545   opad = gst_stream_get_other_pad_from_pad (self, pad);
546   if (opad) {
547     ret = gst_pad_push (opad, buffer);
548     gst_object_unref (opad);
549   }
550
551   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
552   if (ret == GST_FLOW_OK) {
553     GList *l;
554
555     GST_STREAM_SYNCHRONIZER_LOCK (self);
556     stream = gst_pad_get_element_private (pad);
557     if (stream && stream->segment.format == GST_FORMAT_TIME
558         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
559       GST_LOG_OBJECT (pad,
560           "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
561           GST_TIME_ARGS (stream->segment.position),
562           GST_TIME_ARGS (timestamp_end));
563       stream->segment.position = timestamp_end;
564     }
565
566     /* Advance EOS streams if necessary. For non-EOS
567      * streams the demuxers should already do this! */
568     if (!GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
569         GST_CLOCK_TIME_IS_VALID (timestamp)) {
570       timestamp_end = timestamp + GST_SECOND;
571     }
572
573     for (l = self->streams; l; l = l->next) {
574       GstStream *ostream = l->data;
575       gint64 position;
576
577       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
578         continue;
579
580       if (ostream->segment.position != -1)
581         position = ostream->segment.position;
582       else
583         position = ostream->segment.start;
584
585       /* Is there a 1 second lag? */
586       if (position != -1 && GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
587           position + GST_SECOND < timestamp_end) {
588         gint64 new_start;
589
590         new_start = timestamp_end - GST_SECOND;
591
592         GST_DEBUG_OBJECT (ostream->sinkpad,
593             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
594             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
595             GST_TIME_ARGS (new_start));
596
597         ostream->segment.position = new_start;
598
599         gst_pad_push_event (ostream->srcpad,
600             gst_event_new_gap (position, new_start - position));
601       }
602     }
603     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
604   }
605
606   return ret;
607 }
608
609 /* GstElement vfuncs */
610 static GstPad *
611 gst_stream_synchronizer_request_new_pad (GstElement * element,
612     GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
613 {
614   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
615   GstStream *stream;
616   gchar *tmp;
617
618   GST_STREAM_SYNCHRONIZER_LOCK (self);
619   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
620       self->current_stream_number);
621
622   stream = g_slice_new0 (GstStream);
623   stream->transform = self;
624   stream->stream_number = self->current_stream_number;
625   g_cond_init (&stream->stream_finish_cond);
626   stream->stream_start_seqnum = G_MAXUINT32;
627   stream->segment_seqnum = G_MAXUINT32;
628
629   tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
630   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
631   g_free (tmp);
632   gst_pad_set_element_private (stream->sinkpad, stream);
633   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
634       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
635   gst_pad_set_query_function (stream->sinkpad,
636       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
637   gst_pad_set_event_function (stream->sinkpad,
638       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
639   gst_pad_set_chain_function (stream->sinkpad,
640       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
641
642   tmp = g_strdup_printf ("src_%u", self->current_stream_number);
643   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
644   g_free (tmp);
645   gst_pad_set_element_private (stream->srcpad, stream);
646   gst_pad_set_iterate_internal_links_function (stream->srcpad,
647       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
648   gst_pad_set_query_function (stream->srcpad,
649       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
650   gst_pad_set_event_function (stream->srcpad,
651       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
652
653   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
654
655   self->streams = g_list_prepend (self->streams, stream);
656   self->current_stream_number++;
657   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
658
659   /* Add pads and activate unless we're going to NULL */
660   g_rec_mutex_lock (GST_STATE_GET_LOCK (self));
661   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
662     gst_pad_set_active (stream->srcpad, TRUE);
663     gst_pad_set_active (stream->sinkpad, TRUE);
664   }
665   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
666   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
667   g_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
668
669   return stream->sinkpad;
670 }
671
672 /* Must be called with lock! */
673 static void
674 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
675     GstStream * stream)
676 {
677   GList *l;
678
679   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
680
681   for (l = self->streams; l; l = l->next) {
682     if (l->data == stream) {
683       self->streams = g_list_delete_link (self->streams, l);
684       break;
685     }
686   }
687   g_assert (l != NULL);
688
689   /* we can drop the lock, since stream exists now only local.
690    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
691    * (due to reverse lock order) when deactivating pads */
692   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
693
694   gst_pad_set_element_private (stream->srcpad, NULL);
695   gst_pad_set_element_private (stream->sinkpad, NULL);
696   gst_pad_set_active (stream->srcpad, FALSE);
697   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
698   gst_pad_set_active (stream->sinkpad, FALSE);
699   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
700
701   if (stream->segment.format == GST_FORMAT_TIME) {
702     gint64 stop_running_time;
703     gint64 position_running_time;
704
705     stop_running_time =
706         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
707         stream->segment.stop);
708     position_running_time =
709         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
710         stream->segment.position);
711     stop_running_time = MAX (stop_running_time, position_running_time);
712
713     if (stop_running_time > self->group_start_time) {
714       GST_DEBUG_OBJECT (stream->sinkpad,
715           "Updating global start running time from %" GST_TIME_FORMAT " to %"
716           GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
717           GST_TIME_ARGS (stop_running_time));
718
719       self->group_start_time = stop_running_time;
720     }
721   }
722
723   g_cond_clear (&stream->stream_finish_cond);
724   g_slice_free (GstStream, stream);
725
726   /* NOTE: In theory we have to check here if all streams
727    * are EOS but the one that was removed wasn't and then
728    * send EOS downstream. But due to the way how playsink
729    * works this is not necessary and will only cause problems
730    * for gapless playback. playsink will only add/remove pads
731    * when it's reconfigured, which happens when the streams
732    * change
733    */
734
735   /* lock for good measure, since the caller had it */
736   GST_STREAM_SYNCHRONIZER_LOCK (self);
737 }
738
739 static void
740 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
741 {
742   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
743   GstStream *stream;
744
745   GST_STREAM_SYNCHRONIZER_LOCK (self);
746   stream = gst_pad_get_element_private (pad);
747   if (stream) {
748     g_assert (stream->sinkpad == pad);
749
750     gst_stream_synchronizer_release_stream (self, stream);
751   }
752   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
753 }
754
755 static GstStateChangeReturn
756 gst_stream_synchronizer_change_state (GstElement * element,
757     GstStateChange transition)
758 {
759   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
760   GstStateChangeReturn ret;
761
762   switch (transition) {
763     case GST_STATE_CHANGE_NULL_TO_READY:
764       GST_DEBUG_OBJECT (self, "State change NULL->READY");
765       self->shutdown = FALSE;
766       break;
767     case GST_STATE_CHANGE_READY_TO_PAUSED:
768       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
769       self->group_start_time = 0;
770       self->shutdown = FALSE;
771       break;
772     case GST_STATE_CHANGE_PAUSED_TO_READY:{
773       GList *l;
774
775       GST_DEBUG_OBJECT (self, "State change READY->NULL");
776
777       GST_STREAM_SYNCHRONIZER_LOCK (self);
778       for (l = self->streams; l; l = l->next) {
779         GstStream *ostream = l->data;
780         g_cond_broadcast (&ostream->stream_finish_cond);
781       }
782       self->shutdown = TRUE;
783       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
784     }
785     default:
786       break;
787   }
788
789   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
790   GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
791   if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
792     return ret;
793
794   switch (transition) {
795     case GST_STATE_CHANGE_PAUSED_TO_READY:{
796       GList *l;
797
798       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
799       self->group_start_time = 0;
800
801       GST_STREAM_SYNCHRONIZER_LOCK (self);
802       for (l = self->streams; l; l = l->next) {
803         GstStream *stream = l->data;
804
805         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
806         stream->wait = FALSE;
807         stream->new_stream = FALSE;
808         stream->drop_discont = FALSE;
809         stream->is_eos = FALSE;
810       }
811       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
812       break;
813     }
814     case GST_STATE_CHANGE_READY_TO_NULL:{
815       GST_DEBUG_OBJECT (self, "State change READY->NULL");
816
817       GST_STREAM_SYNCHRONIZER_LOCK (self);
818       self->current_stream_number = 0;
819       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
820       break;
821     }
822     default:
823       break;
824   }
825
826   return ret;
827 }
828
829 /* GObject vfuncs */
830 static void
831 gst_stream_synchronizer_finalize (GObject * object)
832 {
833   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
834
835   g_mutex_clear (&self->lock);
836
837   G_OBJECT_CLASS (parent_class)->finalize (object);
838 }
839
840 /* GObject type initialization */
841 static void
842 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
843 {
844   g_mutex_init (&self->lock);
845 }
846
847 static void
848 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
849 {
850   GObjectClass *gobject_class = (GObjectClass *) klass;
851   GstElementClass *element_class = (GstElementClass *) klass;
852
853   gobject_class->finalize = gst_stream_synchronizer_finalize;
854
855   gst_element_class_add_pad_template (element_class,
856       gst_static_pad_template_get (&srctemplate));
857   gst_element_class_add_pad_template (element_class,
858       gst_static_pad_template_get (&sinktemplate));
859
860   gst_element_class_set_static_metadata (element_class,
861       "Stream Synchronizer", "Generic",
862       "Synchronizes a group of streams to have equal durations and starting points",
863       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
864
865   element_class->change_state =
866       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
867   element_class->request_new_pad =
868       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
869   element_class->release_pad =
870       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
871 }
872
873 gboolean
874 gst_stream_synchronizer_plugin_init (GstPlugin * plugin)
875 {
876   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
877       "streamsynchronizer", 0, "Stream Synchronizer");
878
879   return gst_element_register (plugin, "streamsynchronizer", GST_RANK_NONE,
880       GST_TYPE_STREAM_SYNCHRONIZER);
881 }