Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25 #include "gst/glib-compat-private.h"
26
27 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
28 #define GST_CAT_DEFAULT stream_synchronizer_debug
29
30 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
31     GST_LOG_OBJECT (obj,                                                \
32                     "locking from thread %p",                           \
33                     g_thread_self ());                                  \
34     g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
35     GST_LOG_OBJECT (obj,                                                \
36                     "locked from thread %p",                            \
37                     g_thread_self ());                                  \
38 } G_STMT_END
39
40 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
41     GST_LOG_OBJECT (obj,                                                \
42                     "unlocking from thread %p",                         \
43                     g_thread_self ());                                  \
44     g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
45 } G_STMT_END
46
47 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
48     GST_PAD_SRC,
49     GST_PAD_SOMETIMES,
50     GST_STATIC_CAPS_ANY);
51 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
52     GST_PAD_SINK,
53     GST_PAD_REQUEST,
54     GST_STATIC_CAPS_ANY);
55
56 static const gboolean passthrough = TRUE;
57
58 #define gst_stream_synchronizer_parent_class parent_class
59 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
60     GST_TYPE_ELEMENT);
61
62 typedef struct
63 {
64   GstStreamSynchronizer *transform;
65   guint stream_number;
66   GstPad *srcpad;
67   GstPad *sinkpad;
68   GstSegment segment;
69
70   gboolean wait;
71   gboolean new_stream;
72   gboolean drop_discont;
73   gboolean is_eos;
74   gboolean seen_data;
75
76   gint64 running_time_diff;
77 } GstStream;
78
79 /* Must be called with lock! */
80 static GstPad *
81 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
82 {
83   if (stream->sinkpad == pad)
84     return gst_object_ref (stream->srcpad);
85   else if (stream->srcpad == pad)
86     return gst_object_ref (stream->sinkpad);
87
88   return NULL;
89 }
90
91 static GstPad *
92 gst_stream_get_other_pad_from_pad (GstPad * pad)
93 {
94   GstObject *parent = gst_pad_get_parent (pad);
95   GstStreamSynchronizer *self;
96   GstStream *stream;
97   GstPad *opad = NULL;
98
99   /* released pad does not have parent anymore */
100   if (!G_LIKELY (parent))
101     goto exit;
102
103   self = GST_STREAM_SYNCHRONIZER (parent);
104   GST_STREAM_SYNCHRONIZER_LOCK (self);
105   stream = gst_pad_get_element_private (pad);
106   if (!stream)
107     goto out;
108
109   opad = gst_stream_get_other_pad (stream, pad);
110
111 out:
112   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
113   gst_object_unref (self);
114
115 exit:
116   if (!opad)
117     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
118
119   return opad;
120 }
121
122 /* Generic pad functions */
123 static GstIterator *
124 gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
125     GstObject * parent)
126 {
127   GstIterator *it = NULL;
128   GstPad *opad;
129
130   opad = gst_stream_get_other_pad_from_pad (pad);
131   if (opad) {
132     GValue value = { 0, };
133
134     g_value_init (&value, GST_TYPE_PAD);
135     g_value_set_object (&value, opad);
136     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
137     g_value_unset (&value);
138     gst_object_unref (opad);
139   }
140
141   return it;
142 }
143
144 static gboolean
145 gst_stream_synchronizer_query (GstPad * pad, GstObject * parent,
146     GstQuery * query)
147 {
148   GstPad *opad;
149   gboolean ret = FALSE;
150
151   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
152
153   opad = gst_stream_get_other_pad_from_pad (pad);
154   if (opad) {
155     ret = gst_pad_peer_query (opad, query);
156     gst_object_unref (opad);
157   }
158
159   return ret;
160 }
161
162 /* srcpad functions */
163 static gboolean
164 gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
165     GstEvent * event)
166 {
167   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
168   GstPad *opad;
169   gboolean ret = FALSE;
170
171   if (passthrough)
172     goto skip_adjustments;
173
174   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
175       GST_EVENT_TYPE_NAME (event), event);
176
177   switch (GST_EVENT_TYPE (event)) {
178     case GST_EVENT_QOS:{
179       gdouble proportion;
180       GstClockTimeDiff diff;
181       GstClockTime timestamp;
182       gint64 running_time_diff;
183       GstStream *stream;
184
185       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
186       gst_event_unref (event);
187
188       GST_STREAM_SYNCHRONIZER_LOCK (self);
189       stream = gst_pad_get_element_private (pad);
190       if (stream)
191         running_time_diff = stream->running_time_diff;
192       else
193         running_time_diff = -1;
194       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
195
196       if (running_time_diff == -1) {
197         GST_WARNING_OBJECT (pad, "QOS event before group start");
198         goto out;
199       } else if (timestamp < running_time_diff) {
200         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
201         goto out;
202       }
203
204       GST_LOG_OBJECT (pad,
205           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
206           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
207           GST_TIME_ARGS (running_time_diff),
208           GST_TIME_ARGS (timestamp - running_time_diff));
209
210       timestamp -= running_time_diff;
211
212       /* That case is invalid for QoS events */
213       if (diff < 0 && -diff > timestamp) {
214         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
215         ret = TRUE;
216         goto out;
217       }
218
219       event =
220           gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
221           timestamp);
222       break;
223     }
224     default:
225       break;
226   }
227
228 skip_adjustments:
229
230   opad = gst_stream_get_other_pad_from_pad (pad);
231   if (opad) {
232     ret = gst_pad_push_event (opad, event);
233     gst_object_unref (opad);
234   }
235
236 out:
237   return ret;
238 }
239
240 /* sinkpad functions */
241 static gboolean
242 gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
243     GstEvent * event)
244 {
245   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
246   GstPad *opad;
247   gboolean ret = FALSE;
248
249   if (passthrough)
250     goto skip_adjustments;
251
252   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
253       GST_EVENT_TYPE_NAME (event), event);
254
255   switch (GST_EVENT_TYPE (event)) {
256     case GST_EVENT_SINK_MESSAGE:{
257       GstMessage *message;
258
259       gst_event_parse_sink_message (event, &message);
260       if (gst_message_has_name (message, "playbin-stream-changed")) {
261         GstStream *stream;
262
263         GST_STREAM_SYNCHRONIZER_LOCK (self);
264         stream = gst_pad_get_element_private (pad);
265         if (stream) {
266           GList *l;
267           gboolean all_wait = TRUE;
268
269           GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
270
271           stream->is_eos = FALSE;
272           stream->wait = TRUE;
273           stream->new_stream = TRUE;
274
275           for (l = self->streams; l; l = l->next) {
276             GstStream *ostream = l->data;
277
278             all_wait = all_wait && ostream->wait;
279             if (!all_wait)
280               break;
281           }
282           if (all_wait) {
283             gint64 position = 0;
284
285             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
286
287             for (l = self->streams; l; l = l->next) {
288               GstStream *ostream = l->data;
289               gint64 stop_running_time;
290               gint64 position_running_time;
291
292               ostream->wait = FALSE;
293
294               stop_running_time =
295                   gst_segment_to_running_time (&ostream->segment,
296                   GST_FORMAT_TIME, ostream->segment.stop);
297               position_running_time =
298                   gst_segment_to_running_time (&ostream->segment,
299                   GST_FORMAT_TIME, ostream->segment.position);
300               position =
301                   MAX (position, MAX (stop_running_time,
302                       position_running_time));
303             }
304             position = MAX (0, position);
305             self->group_start_time = MAX (self->group_start_time, position);
306
307             GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
308                 GST_TIME_ARGS (self->group_start_time));
309
310             g_cond_broadcast (self->stream_finish_cond);
311           }
312         }
313         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
314       }
315       gst_message_unref (message);
316       break;
317     }
318     case GST_EVENT_SEGMENT:{
319       GstStream *stream;
320       GstSegment segment;
321
322       gst_event_copy_segment (event, &segment);
323
324       GST_STREAM_SYNCHRONIZER_LOCK (self);
325       stream = gst_pad_get_element_private (pad);
326       if (stream) {
327         if (stream->wait) {
328           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
329           g_cond_wait (self->stream_finish_cond, self->lock);
330           stream = gst_pad_get_element_private (pad);
331           if (stream)
332             stream->wait = FALSE;
333         }
334       }
335
336       if (self->shutdown) {
337         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
338         gst_event_unref (event);
339         goto done;
340       }
341
342       if (stream && segment.format == GST_FORMAT_TIME) {
343         if (stream->new_stream) {
344           gint64 position_running_time = 0;
345           gint64 stop_running_time = 0;
346
347           if (stream->segment.format == GST_FORMAT_TIME) {
348             position_running_time =
349                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
350                 stream->segment.position);
351             position_running_time = MAX (position_running_time, 0);
352             stop_running_time =
353                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
354                 stream->segment.stop);
355             stop_running_time = MAX (position_running_time, 0);
356
357             if (stop_running_time != position_running_time) {
358               GST_WARNING_OBJECT (pad,
359                   "Gap between position and segment stop: %" GST_TIME_FORMAT
360                   " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
361                   GST_TIME_ARGS (position_running_time));
362             }
363
364             if (stop_running_time < position_running_time) {
365               GST_DEBUG_OBJECT (pad, "Updating stop position");
366               stream->segment.stop = stream->segment.position;
367               gst_pad_push_event (stream->srcpad,
368                   gst_event_new_segment (&stream->segment));
369             }
370             stop_running_time = MAX (stop_running_time, position_running_time);
371             GST_DEBUG_OBJECT (pad,
372                 "Stop running time of last group: %" GST_TIME_FORMAT,
373                 GST_TIME_ARGS (stop_running_time));
374           }
375           stream->new_stream = FALSE;
376           stream->drop_discont = TRUE;
377
378           if (stop_running_time < self->group_start_time) {
379             gint64 diff = self->group_start_time - stop_running_time;
380
381             GST_DEBUG_OBJECT (pad,
382                 "Advancing running time for other streams by: %"
383                 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
384
385             segment.base += diff;
386           }
387         }
388
389         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
390             &stream->segment);
391         gst_segment_copy_into (&segment, &stream->segment);
392         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
393             &stream->segment);
394
395         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
396             GST_TIME_ARGS (stream->segment.base));
397         stream->running_time_diff = stream->segment.base;
398       } else if (stream) {
399         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
400             gst_format_get_name (segment.format));
401         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
402       }
403       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
404       break;
405     }
406     case GST_EVENT_FLUSH_STOP:{
407       GstStream *stream;
408
409       GST_STREAM_SYNCHRONIZER_LOCK (self);
410       stream = gst_pad_get_element_private (pad);
411       if (stream) {
412         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
413             stream->stream_number);
414         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
415
416         stream->is_eos = FALSE;
417         stream->wait = FALSE;
418         stream->new_stream = FALSE;
419         stream->drop_discont = FALSE;
420         stream->seen_data = FALSE;
421       }
422       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
423       break;
424     }
425     case GST_EVENT_EOS:{
426       GstStream *stream;
427       GList *l;
428       gboolean all_eos = TRUE;
429       gboolean seen_data;
430       GSList *pads = NULL;
431       GstPad *srcpad;
432
433       GST_STREAM_SYNCHRONIZER_LOCK (self);
434       stream = gst_pad_get_element_private (pad);
435       if (!stream) {
436         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
437         GST_WARNING_OBJECT (pad, "EOS for unknown stream");
438         break;
439       }
440
441       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
442       stream->is_eos = TRUE;
443
444       seen_data = stream->seen_data;
445       srcpad = gst_object_ref (stream->srcpad);
446
447       for (l = self->streams; l; l = l->next) {
448         GstStream *ostream = l->data;
449
450         all_eos = all_eos && ostream->is_eos;
451         if (!all_eos)
452           break;
453       }
454
455       if (all_eos) {
456         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
457         for (l = self->streams; l; l = l->next) {
458           GstStream *ostream = l->data;
459           /* local snapshot of current pads */
460           gst_object_ref (ostream->srcpad);
461           pads = g_slist_prepend (pads, ostream->srcpad);
462         }
463       }
464       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
465       /* drop lock when sending eos, which may block in e.g. preroll */
466       if (pads) {
467         GstPad *pad;
468         GSList *epad;
469
470         ret = TRUE;
471         epad = pads;
472         while (epad) {
473           pad = epad->data;
474           GST_DEBUG_OBJECT (pad, "Pushing EOS");
475           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
476           gst_object_unref (pad);
477           epad = g_slist_next (epad);
478         }
479         g_slist_free (pads);
480       } else {
481         /* if EOS, but no data has passed, then send something to replace EOS
482          * for preroll purposes */
483         if (!seen_data) {
484           GstBuffer *buf = gst_buffer_new ();
485
486           gst_pad_push (srcpad, buf);
487         }
488       }
489       gst_object_unref (srcpad);
490       goto done;
491       break;
492     }
493     default:
494       break;
495   }
496
497 skip_adjustments:
498
499   opad = gst_stream_get_other_pad_from_pad (pad);
500   if (opad) {
501     ret = gst_pad_push_event (opad, event);
502     gst_object_unref (opad);
503   }
504
505 done:
506
507   return ret;
508 }
509
510 static GstFlowReturn
511 gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
512     GstBuffer * buffer)
513 {
514   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
515   GstPad *opad;
516   GstFlowReturn ret = GST_FLOW_ERROR;
517   GstStream *stream;
518   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
519   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
520
521   if (passthrough) {
522     opad = gst_stream_get_other_pad_from_pad (pad);
523     if (opad) {
524       ret = gst_pad_push (opad, buffer);
525       gst_object_unref (opad);
526     }
527     goto done;
528   }
529
530   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
531       ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
532       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
533       buffer, gst_buffer_get_size (buffer),
534       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
535       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
536       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
537
538   timestamp = GST_BUFFER_TIMESTAMP (buffer);
539   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
540       && GST_BUFFER_DURATION_IS_VALID (buffer))
541     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
542
543   GST_STREAM_SYNCHRONIZER_LOCK (self);
544   stream = gst_pad_get_element_private (pad);
545
546   if (stream)
547     stream->seen_data = TRUE;
548   if (stream && stream->drop_discont) {
549     buffer = gst_buffer_make_writable (buffer);
550     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
551     stream->drop_discont = FALSE;
552   }
553
554   if (stream && stream->segment.format == GST_FORMAT_TIME
555       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
556     GST_LOG_OBJECT (pad,
557         "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
558         GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
559     stream->segment.position = timestamp;
560   }
561   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
562
563   opad = gst_stream_get_other_pad_from_pad (pad);
564   if (opad) {
565     ret = gst_pad_push (opad, buffer);
566     gst_object_unref (opad);
567   }
568
569   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
570   if (ret == GST_FLOW_OK) {
571     GList *l;
572
573     GST_STREAM_SYNCHRONIZER_LOCK (self);
574     stream = gst_pad_get_element_private (pad);
575     if (stream && stream->segment.format == GST_FORMAT_TIME
576         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
577       GST_LOG_OBJECT (pad,
578           "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
579           GST_TIME_ARGS (stream->segment.position),
580           GST_TIME_ARGS (timestamp_end));
581       stream->segment.position = timestamp_end;
582     }
583
584     /* Advance EOS streams if necessary. For non-EOS
585      * streams the demuxers should already do this! */
586     for (l = self->streams; l; l = l->next) {
587       GstStream *ostream = l->data;
588       gint64 position;
589
590       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
591         continue;
592
593       if (ostream->segment.position != -1)
594         position = ostream->segment.position;
595       else
596         position = ostream->segment.start;
597
598       /* Is there a 1 second lag? */
599       if (position != -1 && position + GST_SECOND < timestamp_end) {
600         gint64 new_start, new_stop;
601
602         new_start = timestamp_end - GST_SECOND;
603         if (ostream->segment.stop == -1)
604           new_stop = -1;
605         else
606           new_stop = MAX (new_start, ostream->segment.stop);
607
608         GST_DEBUG_OBJECT (ostream->sinkpad,
609             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
610             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
611             GST_TIME_ARGS (new_start));
612
613         ostream->segment.start = new_start;
614         ostream->segment.stop = new_stop;
615         ostream->segment.time = new_start;
616         ostream->segment.position = new_start;
617
618         gst_pad_push_event (ostream->srcpad,
619             gst_event_new_segment (&ostream->segment));
620       }
621     }
622     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
623   }
624
625 done:
626   return ret;
627 }
628
629 /* GstElement vfuncs */
630 static GstPad *
631 gst_stream_synchronizer_request_new_pad (GstElement * element,
632     GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
633 {
634   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
635   GstStream *stream;
636   gchar *tmp;
637
638   GST_STREAM_SYNCHRONIZER_LOCK (self);
639   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
640       self->current_stream_number);
641
642   stream = g_slice_new0 (GstStream);
643   stream->transform = self;
644   stream->stream_number = self->current_stream_number;
645
646   tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
647   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
648   g_free (tmp);
649   gst_pad_set_element_private (stream->sinkpad, stream);
650   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
651       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
652   gst_pad_set_query_function (stream->sinkpad,
653       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
654   gst_pad_set_event_function (stream->sinkpad,
655       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
656   gst_pad_set_chain_function (stream->sinkpad,
657       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
658
659   tmp = g_strdup_printf ("src_%u", self->current_stream_number);
660   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
661   g_free (tmp);
662   gst_pad_set_element_private (stream->srcpad, stream);
663   gst_pad_set_iterate_internal_links_function (stream->srcpad,
664       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
665   gst_pad_set_query_function (stream->srcpad,
666       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
667   gst_pad_set_event_function (stream->srcpad,
668       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
669
670   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
671
672   self->streams = g_list_prepend (self->streams, stream);
673   self->current_stream_number++;
674   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
675
676   /* Add pads and activate unless we're going to NULL */
677   g_rec_mutex_lock (GST_STATE_GET_LOCK (self));
678   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
679     gst_pad_set_active (stream->srcpad, TRUE);
680     gst_pad_set_active (stream->sinkpad, TRUE);
681   }
682   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
683   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
684   g_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
685
686   return stream->sinkpad;
687 }
688
689 /* Must be called with lock! */
690 static void
691 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
692     GstStream * stream)
693 {
694   GList *l;
695
696   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
697
698   for (l = self->streams; l; l = l->next) {
699     if (l->data == stream) {
700       self->streams = g_list_delete_link (self->streams, l);
701       break;
702     }
703   }
704   g_assert (l != NULL);
705
706   /* we can drop the lock, since stream exists now only local.
707    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
708    * (due to reverse lock order) when deactivating pads */
709   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
710
711   gst_pad_set_element_private (stream->srcpad, NULL);
712   gst_pad_set_element_private (stream->sinkpad, NULL);
713   gst_pad_set_active (stream->srcpad, FALSE);
714   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
715   gst_pad_set_active (stream->sinkpad, FALSE);
716   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
717
718   if (stream->segment.format == GST_FORMAT_TIME) {
719     gint64 stop_running_time;
720     gint64 position_running_time;
721
722     stop_running_time =
723         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
724         stream->segment.stop);
725     position_running_time =
726         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
727         stream->segment.position);
728     stop_running_time = MAX (stop_running_time, position_running_time);
729
730     GST_DEBUG_OBJECT (stream->sinkpad,
731         "Stop running time was: %" GST_TIME_FORMAT,
732         GST_TIME_ARGS (stop_running_time));
733
734     self->group_start_time = MAX (self->group_start_time, stop_running_time);
735   }
736
737   g_slice_free (GstStream, stream);
738
739   /* NOTE: In theory we have to check here if all streams
740    * are EOS but the one that was removed wasn't and then
741    * send EOS downstream. But due to the way how playsink
742    * works this is not necessary and will only cause problems
743    * for gapless playback. playsink will only add/remove pads
744    * when it's reconfigured, which happens when the streams
745    * change
746    */
747
748   /* lock for good measure, since the caller had it */
749   GST_STREAM_SYNCHRONIZER_LOCK (self);
750 }
751
752 static void
753 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
754 {
755   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
756   GstStream *stream;
757
758   GST_STREAM_SYNCHRONIZER_LOCK (self);
759   stream = gst_pad_get_element_private (pad);
760   if (stream) {
761     g_assert (stream->sinkpad == pad);
762
763     gst_stream_synchronizer_release_stream (self, stream);
764   }
765   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
766 }
767
768 static GstStateChangeReturn
769 gst_stream_synchronizer_change_state (GstElement * element,
770     GstStateChange transition)
771 {
772   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
773   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
774
775   switch (transition) {
776     case GST_STATE_CHANGE_NULL_TO_READY:
777       GST_DEBUG_OBJECT (self, "State change NULL->READY");
778       self->shutdown = FALSE;
779       break;
780     case GST_STATE_CHANGE_READY_TO_PAUSED:
781       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
782       self->group_start_time = 0;
783       self->shutdown = FALSE;
784       break;
785     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
786       GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
787       break;
788     case GST_STATE_CHANGE_PAUSED_TO_READY:
789       GST_DEBUG_OBJECT (self, "State change READY->NULL");
790
791       GST_STREAM_SYNCHRONIZER_LOCK (self);
792       g_cond_broadcast (self->stream_finish_cond);
793       self->shutdown = TRUE;
794       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
795     default:
796       break;
797   }
798
799   {
800     GstStateChangeReturn bret;
801
802     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
803     GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
804     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
805       return ret;
806   }
807
808   switch (transition) {
809     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
810       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
811       break;
812     case GST_STATE_CHANGE_PAUSED_TO_READY:{
813       GList *l;
814
815       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
816       self->group_start_time = 0;
817
818       GST_STREAM_SYNCHRONIZER_LOCK (self);
819       for (l = self->streams; l; l = l->next) {
820         GstStream *stream = l->data;
821
822         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
823         stream->wait = FALSE;
824         stream->new_stream = FALSE;
825         stream->drop_discont = FALSE;
826         stream->is_eos = FALSE;
827       }
828       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
829       break;
830     }
831     case GST_STATE_CHANGE_READY_TO_NULL:{
832       GST_DEBUG_OBJECT (self, "State change READY->NULL");
833
834       GST_STREAM_SYNCHRONIZER_LOCK (self);
835       while (self->streams)
836         gst_stream_synchronizer_release_stream (self, self->streams->data);
837       self->current_stream_number = 0;
838       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
839       break;
840     }
841     default:
842       break;
843   }
844
845   return ret;
846 }
847
848 /* GObject vfuncs */
849 static void
850 gst_stream_synchronizer_finalize (GObject * object)
851 {
852   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
853
854   if (self->lock) {
855     g_mutex_free (self->lock);
856     self->lock = NULL;
857   }
858
859   if (self->stream_finish_cond) {
860     g_cond_free (self->stream_finish_cond);
861     self->stream_finish_cond = NULL;
862   }
863
864   G_OBJECT_CLASS (parent_class)->finalize (object);
865 }
866
867 /* GObject type initialization */
868 static void
869 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
870 {
871   self->lock = g_mutex_new ();
872   self->stream_finish_cond = g_cond_new ();
873 }
874
875 static void
876 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
877 {
878   GObjectClass *gobject_class = (GObjectClass *) klass;
879   GstElementClass *element_class = (GstElementClass *) klass;
880
881   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
882       "streamsynchronizer", 0, "Stream Synchronizer");
883
884   gobject_class->finalize = gst_stream_synchronizer_finalize;
885
886   gst_element_class_add_pad_template (element_class,
887       gst_static_pad_template_get (&srctemplate));
888   gst_element_class_add_pad_template (element_class,
889       gst_static_pad_template_get (&sinktemplate));
890
891   gst_element_class_set_details_simple (element_class,
892       "Stream Synchronizer", "Generic",
893       "Synchronizes a group of streams to have equal durations and starting points",
894       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
895
896   element_class->change_state =
897       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
898   element_class->request_new_pad =
899       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
900   element_class->release_pad =
901       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
902 }