streamsynchronizer: prevent deadlock with _chain when deactivating pad
[platform/upstream/gstreamer.git] / gst / playback / gststreamsynchronizer.c
1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include "gststreamsynchronizer.h"
25
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
30     GST_LOG_OBJECT (obj,                                                \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
34     GST_LOG_OBJECT (obj,                                                \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
40     GST_LOG_OBJECT (obj,                                                \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54
55 GST_BOILERPLATE (GstStreamSynchronizer, gst_stream_synchronizer,
56     GstElement, GST_TYPE_ELEMENT);
57
58 typedef struct
59 {
60   GstStreamSynchronizer *transform;
61   guint stream_number;
62   GstPad *srcpad;
63   GstPad *sinkpad;
64   GstSegment segment;
65
66   gboolean wait;
67   gboolean new_stream;
68   gboolean drop_discont;
69   gboolean is_eos;
70   gboolean seen_data;
71
72   gint64 running_time_diff;
73 } GstStream;
74
75 /* Must be called with lock! */
76 static GstPad *
77 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
78 {
79   if (stream->sinkpad == pad)
80     return gst_object_ref (stream->srcpad);
81   else if (stream->srcpad == pad)
82     return gst_object_ref (stream->sinkpad);
83
84   return NULL;
85 }
86
87 static GstPad *
88 gst_stream_get_other_pad_from_pad (GstPad * pad)
89 {
90   GstStreamSynchronizer *self =
91       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
92   GstStream *stream;
93   GstPad *opad = NULL;
94
95   GST_STREAM_SYNCHRONIZER_LOCK (self);
96   stream = gst_pad_get_element_private (pad);
97   if (!stream)
98     goto out;
99
100   opad = gst_stream_get_other_pad (stream, pad);
101
102 out:
103   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
104   gst_object_unref (self);
105
106   if (!opad)
107     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
108
109   return opad;
110 }
111
112 /* Generic pad functions */
113 static GstIterator *
114 gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
115 {
116   GstIterator *it = NULL;
117   GstPad *opad;
118
119   opad = gst_stream_get_other_pad_from_pad (pad);
120   if (opad) {
121     it = gst_iterator_new_single (GST_TYPE_PAD, opad,
122         (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
123     gst_object_unref (opad);
124   }
125
126   return it;
127 }
128
129 static gboolean
130 gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
131 {
132   GstPad *opad;
133   gboolean ret = FALSE;
134
135   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
136
137   opad = gst_stream_get_other_pad_from_pad (pad);
138   if (opad) {
139     ret = gst_pad_peer_query (opad, query);
140     gst_object_unref (opad);
141   }
142
143   return ret;
144 }
145
146 static GstCaps *
147 gst_stream_synchronizer_getcaps (GstPad * pad)
148 {
149   GstPad *opad;
150   GstCaps *ret = NULL;
151
152   opad = gst_stream_get_other_pad_from_pad (pad);
153   if (opad) {
154     ret = gst_pad_peer_get_caps (opad);
155     gst_object_unref (opad);
156   }
157
158   if (ret == NULL)
159     ret = gst_caps_new_any ();
160
161   GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
162
163   return ret;
164 }
165
166 static gboolean
167 gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
168 {
169   GstPad *opad;
170   gboolean ret = FALSE;
171
172   opad = gst_stream_get_other_pad_from_pad (pad);
173   if (opad) {
174     ret = gst_pad_peer_accept_caps (opad, caps);
175     gst_object_unref (opad);
176   }
177
178   GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
179       caps);
180
181   return ret;
182 }
183
184 /* srcpad functions */
185 static gboolean
186 gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
187 {
188   GstStreamSynchronizer *self =
189       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
190   GstPad *opad;
191   gboolean ret = FALSE;
192
193   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
194       GST_EVENT_TYPE_NAME (event), event->structure);
195
196   switch (GST_EVENT_TYPE (event)) {
197     case GST_EVENT_QOS:{
198       gdouble proportion;
199       GstClockTimeDiff diff;
200       GstClockTime timestamp;
201       gint64 running_time_diff;
202       GstStream *stream;
203
204       gst_event_parse_qos (event, &proportion, &diff, &timestamp);
205       gst_event_unref (event);
206
207       GST_STREAM_SYNCHRONIZER_LOCK (self);
208       stream = gst_pad_get_element_private (pad);
209       if (stream)
210         running_time_diff = stream->running_time_diff;
211       else
212         running_time_diff = -1;
213       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
214
215       if (running_time_diff == -1) {
216         GST_WARNING_OBJECT (pad, "QOS event before group start");
217         goto out;
218       } else if (timestamp < running_time_diff) {
219         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
220         goto out;
221       }
222
223       GST_LOG_OBJECT (pad,
224           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
225           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
226           GST_TIME_ARGS (running_time_diff),
227           GST_TIME_ARGS (timestamp - running_time_diff));
228
229       timestamp -= running_time_diff;
230
231       /* That case is invalid for QoS events */
232       if (diff < 0 && -diff > timestamp) {
233         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
234         ret = TRUE;
235         goto out;
236       }
237
238       event = gst_event_new_qos (proportion, diff, timestamp);
239       break;
240     }
241     default:
242       break;
243   }
244
245   opad = gst_stream_get_other_pad_from_pad (pad);
246   if (opad) {
247     ret = gst_pad_push_event (opad, event);
248     gst_object_unref (opad);
249   }
250
251 out:
252   gst_object_unref (self);
253
254   return ret;
255 }
256
257 /* sinkpad functions */
258 static gboolean
259 gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
260 {
261   GstStreamSynchronizer *self =
262       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
263   GstPad *opad;
264   gboolean ret = FALSE;
265
266   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
267       GST_EVENT_TYPE_NAME (event), event->structure);
268
269   switch (GST_EVENT_TYPE (event)) {
270     case GST_EVENT_SINK_MESSAGE:{
271       GstMessage *message;
272
273       gst_event_parse_sink_message (event, &message);
274       if (message->structure
275           && gst_structure_has_name (message->structure,
276               "playbin2-stream-changed")) {
277         GstStream *stream;
278
279         GST_STREAM_SYNCHRONIZER_LOCK (self);
280         stream = gst_pad_get_element_private (pad);
281         if (stream) {
282           GList *l;
283           gboolean all_wait = TRUE;
284
285           GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
286
287           stream->is_eos = FALSE;
288           stream->wait = TRUE;
289           stream->new_stream = TRUE;
290
291           for (l = self->streams; l; l = l->next) {
292             GstStream *ostream = l->data;
293
294             all_wait = all_wait && ostream->wait;
295             if (!all_wait)
296               break;
297           }
298           if (all_wait) {
299             gint64 last_stop = 0;
300
301             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
302
303             for (l = self->streams; l; l = l->next) {
304               GstStream *ostream = l->data;
305               gint64 stop_running_time;
306               gint64 last_stop_running_time;
307
308               ostream->wait = FALSE;
309
310               stop_running_time =
311                   gst_segment_to_running_time (&ostream->segment,
312                   GST_FORMAT_TIME, ostream->segment.stop);
313               last_stop_running_time =
314                   gst_segment_to_running_time (&ostream->segment,
315                   GST_FORMAT_TIME, ostream->segment.last_stop);
316               last_stop =
317                   MAX (last_stop, MAX (stop_running_time,
318                       last_stop_running_time));
319             }
320             last_stop = MAX (0, last_stop);
321             self->group_start_time = MAX (self->group_start_time, last_stop);
322
323             GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
324                 GST_TIME_ARGS (self->group_start_time));
325
326             g_cond_broadcast (self->stream_finish_cond);
327           }
328         }
329         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
330       }
331       gst_message_unref (message);
332       break;
333     }
334     case GST_EVENT_NEWSEGMENT:{
335       GstStream *stream;
336       gboolean update;
337       gdouble rate, applied_rate;
338       GstFormat format;
339       gint64 start, stop, position;
340
341       gst_event_parse_new_segment_full (event,
342           &update, &rate, &applied_rate, &format, &start, &stop, &position);
343
344       GST_STREAM_SYNCHRONIZER_LOCK (self);
345       stream = gst_pad_get_element_private (pad);
346       if (stream) {
347         if (stream->wait) {
348           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
349           g_cond_wait (self->stream_finish_cond, self->lock);
350           stream = gst_pad_get_element_private (pad);
351           if (stream)
352             stream->wait = FALSE;
353         }
354       }
355
356       if (self->shutdown) {
357         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
358         gst_event_unref (event);
359         goto done;
360       }
361
362       if (stream && format == GST_FORMAT_TIME) {
363         if (stream->new_stream) {
364           gint64 last_stop_running_time = 0;
365           gint64 stop_running_time = 0;
366
367           if (stream->segment.format == GST_FORMAT_TIME) {
368             last_stop_running_time =
369                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
370                 stream->segment.last_stop);
371             last_stop_running_time = MAX (last_stop_running_time, 0);
372             stop_running_time =
373                 gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
374                 stream->segment.stop);
375             stop_running_time = MAX (last_stop_running_time, 0);
376
377             if (stop_running_time != last_stop_running_time) {
378               GST_WARNING_OBJECT (pad,
379                   "Gap between last_stop and segment stop: %" GST_TIME_FORMAT
380                   " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
381                   GST_TIME_ARGS (last_stop_running_time));
382             }
383
384             if (stop_running_time < last_stop_running_time) {
385               GST_DEBUG_OBJECT (pad, "Updating stop position");
386               gst_pad_push_event (stream->srcpad,
387                   gst_event_new_new_segment_full (TRUE, stream->segment.rate,
388                       stream->segment.applied_rate, GST_FORMAT_TIME,
389                       stream->segment.start, stream->segment.last_stop,
390                       stream->segment.time));
391               gst_segment_set_newsegment_full (&stream->segment, TRUE,
392                   stream->segment.rate, stream->segment.applied_rate,
393                   GST_FORMAT_TIME, stream->segment.start,
394                   stream->segment.last_stop, stream->segment.time);
395             }
396             stop_running_time = MAX (stop_running_time, last_stop_running_time);
397             GST_DEBUG_OBJECT (pad,
398                 "Stop running time of last group: %" GST_TIME_FORMAT,
399                 GST_TIME_ARGS (stop_running_time));
400           }
401           stream->new_stream = FALSE;
402           stream->drop_discont = TRUE;
403
404           if (stop_running_time < self->group_start_time) {
405             gint64 diff = self->group_start_time - stop_running_time;
406
407             GST_DEBUG_OBJECT (pad,
408                 "Advancing running time for other streams by: %"
409                 GST_TIME_FORMAT, GST_TIME_ARGS (diff));
410             gst_pad_push_event (stream->srcpad,
411                 gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
412                     GST_FORMAT_TIME, 0, diff, 0));
413             gst_segment_set_newsegment_full (&stream->segment, FALSE, 1.0, 1.0,
414                 GST_FORMAT_TIME, 0, diff, 0);
415           }
416         }
417
418         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
419             &stream->segment);
420         gst_segment_set_newsegment_full (&stream->segment, update, rate,
421             applied_rate, format, start, stop, position);
422         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
423             &stream->segment);
424
425         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
426             GST_TIME_ARGS (stream->segment.accum));
427         stream->running_time_diff = stream->segment.accum;
428       } else if (stream) {
429         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
430             gst_format_get_name (format));
431         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
432       }
433       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
434       break;
435     }
436     case GST_EVENT_FLUSH_STOP:{
437       GstStream *stream;
438
439       GST_STREAM_SYNCHRONIZER_LOCK (self);
440       stream = gst_pad_get_element_private (pad);
441       if (stream) {
442         GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
443             stream->stream_number);
444         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
445
446         stream->is_eos = FALSE;
447         stream->wait = FALSE;
448         stream->new_stream = FALSE;
449         stream->drop_discont = FALSE;
450         stream->seen_data = FALSE;
451       }
452       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
453       break;
454     }
455     case GST_EVENT_EOS:{
456       GstStream *stream;
457       GList *l;
458       gboolean all_eos = TRUE;
459       gboolean seen_data;
460       GSList *pads = NULL;
461       GstPad *srcpad;
462
463       GST_STREAM_SYNCHRONIZER_LOCK (self);
464       stream = gst_pad_get_element_private (pad);
465       if (stream) {
466         GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
467         stream->is_eos = TRUE;
468       }
469
470       seen_data = stream->seen_data;
471       srcpad = gst_object_ref (stream->srcpad);
472
473       for (l = self->streams; l; l = l->next) {
474         GstStream *ostream = l->data;
475
476         all_eos = all_eos && ostream->is_eos;
477         if (!all_eos)
478           break;
479       }
480
481       if (all_eos) {
482         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
483         for (l = self->streams; l; l = l->next) {
484           GstStream *ostream = l->data;
485           /* local snapshot of current pads */
486           gst_object_ref (ostream->srcpad);
487           pads = g_slist_prepend (pads, ostream->srcpad);
488         }
489       }
490       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
491       /* drop lock when sending eos, which may block in e.g. preroll */
492       if (pads) {
493         GstPad *pad;
494         GSList *epad;
495
496         ret = TRUE;
497         epad = pads;
498         while (epad) {
499           pad = epad->data;
500           GST_DEBUG_OBJECT (pad, "Pushing EOS");
501           ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
502           gst_object_unref (pad);
503           epad = g_slist_next (epad);
504         }
505         g_slist_free (pads);
506       } else {
507         /* if EOS, but no data has passed, then send something to replace EOS
508          * for preroll purposes */
509         if (!seen_data) {
510           GstBuffer *buf = gst_buffer_new ();
511
512           GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
513           gst_pad_push (srcpad, buf);
514         }
515       }
516       gst_object_unref (srcpad);
517       goto done;
518       break;
519     }
520     default:
521       break;
522   }
523
524   opad = gst_stream_get_other_pad_from_pad (pad);
525   if (opad) {
526     ret = gst_pad_push_event (opad, event);
527     gst_object_unref (opad);
528   }
529
530 done:
531   gst_object_unref (self);
532
533   return ret;
534 }
535
536 static GstFlowReturn
537 gst_stream_synchronizer_sink_bufferalloc (GstPad * pad, guint64 offset,
538     guint size, GstCaps * caps, GstBuffer ** buf)
539 {
540   GstPad *opad;
541   GstFlowReturn ret = GST_FLOW_ERROR;
542
543   GST_LOG_OBJECT (pad, "Allocating buffer: size=%u", size);
544
545   opad = gst_stream_get_other_pad_from_pad (pad);
546   if (opad) {
547     ret = gst_pad_alloc_buffer (opad, offset, size, caps, buf);
548     gst_object_unref (opad);
549   }
550
551   GST_LOG_OBJECT (pad, "Allocation: %s", gst_flow_get_name (ret));
552
553   return ret;
554 }
555
556 static GstFlowReturn
557 gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
558 {
559   GstStreamSynchronizer *self =
560       GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
561   GstPad *opad;
562   GstFlowReturn ret = GST_FLOW_ERROR;
563   GstStream *stream;
564   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
565   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
566
567   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
568       GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
569       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
570       buffer, GST_BUFFER_SIZE (buffer),
571       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
572       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
573       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
574
575   timestamp = GST_BUFFER_TIMESTAMP (buffer);
576   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
577       && GST_BUFFER_DURATION_IS_VALID (buffer))
578     timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
579
580   GST_STREAM_SYNCHRONIZER_LOCK (self);
581   stream = gst_pad_get_element_private (pad);
582
583   stream->seen_data = TRUE;
584   if (stream && stream->drop_discont) {
585     buffer = gst_buffer_make_metadata_writable (buffer);
586     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
587     stream->drop_discont = FALSE;
588   }
589
590   if (stream && stream->segment.format == GST_FORMAT_TIME
591       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
592     GST_LOG_OBJECT (pad,
593         "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
594         GST_TIME_ARGS (stream->segment.last_stop), GST_TIME_ARGS (timestamp));
595     gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME, timestamp);
596   }
597   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
598
599   opad = gst_stream_get_other_pad_from_pad (pad);
600   if (opad) {
601     ret = gst_pad_push (opad, buffer);
602     gst_object_unref (opad);
603   }
604
605   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
606   if (ret == GST_FLOW_OK) {
607     GList *l;
608
609     GST_STREAM_SYNCHRONIZER_LOCK (self);
610     stream = gst_pad_get_element_private (pad);
611     if (stream && stream->segment.format == GST_FORMAT_TIME
612         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
613       GST_LOG_OBJECT (pad,
614           "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
615           GST_TIME_ARGS (stream->segment.last_stop),
616           GST_TIME_ARGS (timestamp_end));
617       gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME,
618           timestamp_end);
619     }
620
621     /* Advance EOS streams if necessary. For non-EOS
622      * streams the demuxers should already do this! */
623     for (l = self->streams; l; l = l->next) {
624       GstStream *ostream = l->data;
625       gint64 last_stop;
626
627       if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
628         continue;
629
630       if (ostream->segment.last_stop != -1)
631         last_stop = ostream->segment.last_stop;
632       else
633         last_stop = ostream->segment.start;
634
635       /* Is there a 1 second lag? */
636       if (last_stop != -1 && last_stop + GST_SECOND < timestamp_end) {
637         gint64 new_start;
638
639         new_start = timestamp_end - GST_SECOND;
640
641         GST_DEBUG_OBJECT (ostream->sinkpad,
642             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
643             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (last_stop),
644             GST_TIME_ARGS (new_start));
645
646         gst_pad_push_event (ostream->srcpad,
647             gst_event_new_new_segment_full (TRUE, ostream->segment.rate,
648                 ostream->segment.applied_rate, ostream->segment.format,
649                 new_start, ostream->segment.stop, new_start));
650         gst_segment_set_newsegment_full (&ostream->segment, TRUE,
651             ostream->segment.rate, ostream->segment.applied_rate,
652             ostream->segment.format, new_start, ostream->segment.stop,
653             new_start);
654         gst_segment_set_last_stop (&ostream->segment, GST_FORMAT_TIME,
655             new_start);
656       }
657     }
658     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
659   }
660
661   gst_object_unref (self);
662
663   return ret;
664 }
665
666 /* GstElement vfuncs */
667 static GstPad *
668 gst_stream_synchronizer_request_new_pad (GstElement * element,
669     GstPadTemplate * temp, const gchar * name)
670 {
671   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
672   GstStream *stream;
673   gchar *tmp;
674
675   GST_STREAM_SYNCHRONIZER_LOCK (self);
676   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
677       self->current_stream_number);
678
679   stream = g_slice_new0 (GstStream);
680   stream->transform = self;
681   stream->stream_number = self->current_stream_number;
682
683   tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
684   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
685   g_free (tmp);
686   gst_pad_set_element_private (stream->sinkpad, stream);
687   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
688       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
689   gst_pad_set_query_function (stream->sinkpad,
690       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
691   gst_pad_set_getcaps_function (stream->sinkpad,
692       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
693   gst_pad_set_acceptcaps_function (stream->sinkpad,
694       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
695   gst_pad_set_event_function (stream->sinkpad,
696       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
697   gst_pad_set_chain_function (stream->sinkpad,
698       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
699   gst_pad_set_bufferalloc_function (stream->sinkpad,
700       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_bufferalloc));
701
702   tmp = g_strdup_printf ("src_%d", self->current_stream_number);
703   stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
704   g_free (tmp);
705   gst_pad_set_element_private (stream->srcpad, stream);
706   gst_pad_set_iterate_internal_links_function (stream->srcpad,
707       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
708   gst_pad_set_query_function (stream->srcpad,
709       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
710   gst_pad_set_getcaps_function (stream->srcpad,
711       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
712   gst_pad_set_acceptcaps_function (stream->srcpad,
713       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
714   gst_pad_set_event_function (stream->srcpad,
715       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
716
717   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
718
719   self->streams = g_list_prepend (self->streams, stream);
720   self->current_stream_number++;
721   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
722
723   /* Add pads and activate unless we're going to NULL */
724   g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
725   if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
726     gst_pad_set_active (stream->srcpad, TRUE);
727     gst_pad_set_active (stream->sinkpad, TRUE);
728   }
729   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
730   gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
731   g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
732
733   return stream->sinkpad;
734 }
735
736 /* Must be called with lock! */
737 static void
738 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
739     GstStream * stream)
740 {
741   GList *l;
742
743   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
744
745   for (l = self->streams; l; l = l->next) {
746     if (l->data == stream) {
747       self->streams = g_list_delete_link (self->streams, l);
748       break;
749     }
750   }
751   g_assert (l != NULL);
752
753   /* we can drop the lock, since stream exists now only local.
754    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
755    * (due to reverse lock order) when deactivating pads */
756   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
757
758   gst_pad_set_element_private (stream->srcpad, NULL);
759   gst_pad_set_element_private (stream->sinkpad, NULL);
760   gst_pad_set_active (stream->srcpad, FALSE);
761   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
762   gst_pad_set_active (stream->sinkpad, FALSE);
763   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
764
765   if (stream->segment.format == GST_FORMAT_TIME) {
766     gint64 stop_running_time;
767     gint64 last_stop_running_time;
768
769     stop_running_time =
770         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
771         stream->segment.stop);
772     last_stop_running_time =
773         gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
774         stream->segment.last_stop);
775     stop_running_time = MAX (stop_running_time, last_stop_running_time);
776
777     GST_DEBUG_OBJECT (stream->sinkpad,
778         "Stop running time was: %" GST_TIME_FORMAT,
779         GST_TIME_ARGS (stop_running_time));
780
781     self->group_start_time = MAX (self->group_start_time, stop_running_time);
782   }
783
784   g_slice_free (GstStream, stream);
785
786   /* NOTE: In theory we have to check here if all streams
787    * are EOS but the one that was removed wasn't and then
788    * send EOS downstream. But due to the way how playsink
789    * works this is not necessary and will only cause problems
790    * for gapless playback. playsink will only add/remove pads
791    * when it's reconfigured, which happens when the streams
792    * change
793    */
794
795   /* lock for good measure, since the caller had it */
796   GST_STREAM_SYNCHRONIZER_LOCK (self);
797 }
798
799 static void
800 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
801 {
802   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
803   GstStream *stream;
804
805   GST_STREAM_SYNCHRONIZER_LOCK (self);
806   stream = gst_pad_get_element_private (pad);
807   if (stream) {
808     g_assert (stream->sinkpad == pad);
809
810     gst_stream_synchronizer_release_stream (self, stream);
811   }
812   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
813 }
814
815 static GstStateChangeReturn
816 gst_stream_synchronizer_change_state (GstElement * element,
817     GstStateChange transition)
818 {
819   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
820   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
821
822   switch (transition) {
823     case GST_STATE_CHANGE_NULL_TO_READY:
824       GST_DEBUG_OBJECT (self, "State change NULL->READY");
825       self->shutdown = FALSE;
826       break;
827     case GST_STATE_CHANGE_READY_TO_PAUSED:
828       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
829       self->group_start_time = 0;
830       self->shutdown = FALSE;
831       break;
832     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
833       GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
834       break;
835     case GST_STATE_CHANGE_PAUSED_TO_READY:
836       GST_DEBUG_OBJECT (self, "State change READY->NULL");
837
838       GST_STREAM_SYNCHRONIZER_LOCK (self);
839       g_cond_broadcast (self->stream_finish_cond);
840       self->shutdown = TRUE;
841       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
842     default:
843       break;
844   }
845
846   {
847     GstStateChangeReturn bret;
848
849     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
850     GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
851     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
852       return ret;
853   }
854
855   switch (transition) {
856     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
857       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
858       break;
859     case GST_STATE_CHANGE_PAUSED_TO_READY:{
860       GList *l;
861
862       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
863       self->group_start_time = 0;
864
865       GST_STREAM_SYNCHRONIZER_LOCK (self);
866       for (l = self->streams; l; l = l->next) {
867         GstStream *stream = l->data;
868
869         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
870         stream->wait = FALSE;
871         stream->new_stream = FALSE;
872         stream->drop_discont = FALSE;
873         stream->is_eos = FALSE;
874       }
875       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
876       break;
877     }
878     case GST_STATE_CHANGE_READY_TO_NULL:{
879       GST_DEBUG_OBJECT (self, "State change READY->NULL");
880
881       GST_STREAM_SYNCHRONIZER_LOCK (self);
882       while (self->streams)
883         gst_stream_synchronizer_release_stream (self, self->streams->data);
884       self->current_stream_number = 0;
885       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
886       break;
887     }
888     default:
889       break;
890   }
891
892   return ret;
893 }
894
895 /* GObject vfuncs */
896 static void
897 gst_stream_synchronizer_finalize (GObject * object)
898 {
899   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
900
901   if (self->lock) {
902     g_mutex_free (self->lock);
903     self->lock = NULL;
904   }
905
906   if (self->stream_finish_cond) {
907     g_cond_free (self->stream_finish_cond);
908     self->stream_finish_cond = NULL;
909   }
910
911   G_OBJECT_CLASS (parent_class)->finalize (object);
912 }
913
914 /* GObject type initialization */
915 static void
916 gst_stream_synchronizer_init (GstStreamSynchronizer * self,
917     GstStreamSynchronizerClass * klass)
918 {
919   self->lock = g_mutex_new ();
920   self->stream_finish_cond = g_cond_new ();
921 }
922
923 static void
924 gst_stream_synchronizer_base_init (gpointer g_class)
925 {
926   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
927
928   gst_element_class_add_pad_template (gstelement_class,
929       gst_static_pad_template_get (&srctemplate));
930   gst_element_class_add_pad_template (gstelement_class,
931       gst_static_pad_template_get (&sinktemplate));
932
933   gst_element_class_set_details_simple (gstelement_class,
934       "Stream Synchronizer", "Generic",
935       "Synchronizes a group of streams to have equal durations and starting points",
936       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
937 }
938
939 static void
940 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
941 {
942   GObjectClass *gobject_class = (GObjectClass *) klass;
943   GstElementClass *element_class = (GstElementClass *) klass;
944
945   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
946       "streamsynchronizer", 0, "Stream Synchronizer");
947
948   gobject_class->finalize = gst_stream_synchronizer_finalize;
949
950   element_class->change_state =
951       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
952   element_class->request_new_pad =
953       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
954   element_class->release_pad =
955       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
956 }