1e0c91ecb86919f2216b1a7b871f93bd9477b2de
[platform/upstream/gstreamer.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include <gst/video/video.h>
59 #include "gstsplitmuxsink.h"
60
61 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
62 #define GST_CAT_DEFAULT splitmux_debug
63
64 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
65 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
66 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
67 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
68
69 enum
70 {
71   PROP_0,
72   PROP_LOCATION,
73   PROP_MAX_SIZE_TIME,
74   PROP_MAX_SIZE_BYTES,
75   PROP_SEND_KEYFRAME_REQUESTS,
76   PROP_MAX_FILES,
77   PROP_MUXER_OVERHEAD,
78   PROP_MUXER,
79   PROP_SINK
80 };
81
82 #define DEFAULT_MAX_SIZE_TIME       0
83 #define DEFAULT_MAX_SIZE_BYTES      0
84 #define DEFAULT_MAX_FILES           0
85 #define DEFAULT_MUXER_OVERHEAD      0.02
86 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
87 #define DEFAULT_MUXER "mp4mux"
88 #define DEFAULT_SINK "filesink"
89
90 enum
91 {
92   SIGNAL_FORMAT_LOCATION,
93   SIGNAL_LAST
94 };
95
96 static guint signals[SIGNAL_LAST];
97
98 static GstStaticPadTemplate video_sink_template =
99 GST_STATIC_PAD_TEMPLATE ("video",
100     GST_PAD_SINK,
101     GST_PAD_REQUEST,
102     GST_STATIC_CAPS_ANY);
103 static GstStaticPadTemplate audio_sink_template =
104 GST_STATIC_PAD_TEMPLATE ("audio_%u",
105     GST_PAD_SINK,
106     GST_PAD_REQUEST,
107     GST_STATIC_CAPS_ANY);
108 static GstStaticPadTemplate subtitle_sink_template =
109 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
110     GST_PAD_SINK,
111     GST_PAD_REQUEST,
112     GST_STATIC_CAPS_ANY);
113
114 static GQuark PAD_CONTEXT;
115
116 static void
117 _do_init (void)
118 {
119   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
120 }
121
122 #define gst_splitmux_sink_parent_class parent_class
123 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
124     _do_init ());
125
126 static gboolean create_elements (GstSplitMuxSink * splitmux);
127 static gboolean create_sink (GstSplitMuxSink * splitmux);
128 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
129     const GValue * value, GParamSpec * pspec);
130 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
131     GValue * value, GParamSpec * pspec);
132 static void gst_splitmux_sink_dispose (GObject * object);
133 static void gst_splitmux_sink_finalize (GObject * object);
134
135 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
136     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
137 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
138
139 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
140     element, GstStateChange transition);
141
142 static void bus_handler (GstBin * bin, GstMessage * msg);
143 static void set_next_filename (GstSplitMuxSink * splitmux);
144 static void start_next_fragment (GstSplitMuxSink * splitmux);
145 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
146 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
147
148 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
149
150 static MqStreamBuf *
151 mq_stream_buf_new (void)
152 {
153   return g_slice_new0 (MqStreamBuf);
154 }
155
156 static void
157 mq_stream_buf_free (MqStreamBuf * data)
158 {
159   g_slice_free (MqStreamBuf, data);
160 }
161
162 static void
163 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
164 {
165   GObjectClass *gobject_class = (GObjectClass *) klass;
166   GstElementClass *gstelement_class = (GstElementClass *) klass;
167   GstBinClass *gstbin_class = (GstBinClass *) klass;
168
169   gobject_class->set_property = gst_splitmux_sink_set_property;
170   gobject_class->get_property = gst_splitmux_sink_get_property;
171   gobject_class->dispose = gst_splitmux_sink_dispose;
172   gobject_class->finalize = gst_splitmux_sink_finalize;
173
174   gst_element_class_set_static_metadata (gstelement_class,
175       "Split Muxing Bin", "Generic/Bin/Muxer",
176       "Convenience bin that muxes incoming streams into multiple time/size limited files",
177       "Jan Schmidt <jan@centricular.com>");
178
179   gst_element_class_add_static_pad_template (gstelement_class,
180       &video_sink_template);
181   gst_element_class_add_static_pad_template (gstelement_class,
182       &audio_sink_template);
183   gst_element_class_add_static_pad_template (gstelement_class,
184       &subtitle_sink_template);
185
186   gstelement_class->change_state =
187       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
188   gstelement_class->request_new_pad =
189       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
190   gstelement_class->release_pad =
191       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
192
193   gstbin_class->handle_message = bus_handler;
194
195   g_object_class_install_property (gobject_class, PROP_LOCATION,
196       g_param_spec_string ("location", "File Output Pattern",
197           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
198           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
199   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
200       g_param_spec_double ("mux-overhead", "Muxing Overhead",
201           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
202           DEFAULT_MUXER_OVERHEAD,
203           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
204
205   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
206       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
207           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
208           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
210       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
211           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
212           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
213   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
214       g_param_spec_boolean ("send-keyframe-requests",
215           "Request keyframes at max-size-time",
216           "Request a keyframe every max-size-time ns to try splitting at that point. "
217           "Needs max-size-bytes to be 0 in order to be effective.",
218           DEFAULT_SEND_KEYFRAME_REQUESTS,
219           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
221       g_param_spec_uint ("max-files", "Max files",
222           "Maximum number of files to keep on disk. Once the maximum is reached,"
223           "old files start to be deleted to make room for new ones.", 0,
224           G_MAXUINT, DEFAULT_MAX_FILES,
225           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227
228   g_object_class_install_property (gobject_class, PROP_MUXER,
229       g_param_spec_object ("muxer", "Muxer",
230           "The muxer element to use (NULL = default mp4mux)",
231           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232   g_object_class_install_property (gobject_class, PROP_SINK,
233       g_param_spec_object ("sink", "Sink",
234           "The sink element (or element chain) to use (NULL = default filesink)",
235           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
236
237   /**
238    * GstSplitMuxSink::format-location:
239    * @splitmux: the #GstSplitMuxSink
240    * @fragment_id: the sequence number of the file to be created
241    *
242    * Returns: the location to be used for the next output file
243    */
244   signals[SIGNAL_FORMAT_LOCATION] =
245       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
246       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
247 }
248
249 static void
250 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
251 {
252   g_mutex_init (&splitmux->lock);
253   g_cond_init (&splitmux->data_cond);
254
255   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
256   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
257   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
258   splitmux->max_files = DEFAULT_MAX_FILES;
259   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
260
261   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
262 }
263
264 static void
265 gst_splitmux_reset (GstSplitMuxSink * splitmux)
266 {
267   if (splitmux->mq)
268     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
269   if (splitmux->muxer)
270     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
271   if (splitmux->active_sink)
272     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
273
274   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
275       NULL;
276 }
277
278 static void
279 gst_splitmux_sink_dispose (GObject * object)
280 {
281   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
282
283   G_OBJECT_CLASS (parent_class)->dispose (object);
284
285   /* Calling parent dispose invalidates all child pointers */
286   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
287       NULL;
288 }
289
290 static void
291 gst_splitmux_sink_finalize (GObject * object)
292 {
293   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
294   g_cond_clear (&splitmux->data_cond);
295   g_mutex_clear (&splitmux->lock);
296   if (splitmux->provided_sink)
297     gst_object_unref (splitmux->provided_sink);
298   if (splitmux->provided_muxer)
299     gst_object_unref (splitmux->provided_muxer);
300
301   g_free (splitmux->location);
302
303   /* Make sure to free any un-released contexts */
304   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
305   g_list_free (splitmux->contexts);
306
307   G_OBJECT_CLASS (parent_class)->finalize (object);
308 }
309
310 static void
311 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
312     const GValue * value, GParamSpec * pspec)
313 {
314   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
315
316   switch (prop_id) {
317     case PROP_LOCATION:{
318       GST_OBJECT_LOCK (splitmux);
319       g_free (splitmux->location);
320       splitmux->location = g_value_dup_string (value);
321       GST_OBJECT_UNLOCK (splitmux);
322       break;
323     }
324     case PROP_MAX_SIZE_BYTES:
325       GST_OBJECT_LOCK (splitmux);
326       splitmux->threshold_bytes = g_value_get_uint64 (value);
327       GST_OBJECT_UNLOCK (splitmux);
328       break;
329     case PROP_MAX_SIZE_TIME:
330       GST_OBJECT_LOCK (splitmux);
331       splitmux->threshold_time = g_value_get_uint64 (value);
332       GST_OBJECT_UNLOCK (splitmux);
333       break;
334     case PROP_SEND_KEYFRAME_REQUESTS:
335       GST_OBJECT_LOCK (splitmux);
336       splitmux->send_keyframe_requests = g_value_get_boolean (value);
337       GST_OBJECT_UNLOCK (splitmux);
338       break;
339     case PROP_MAX_FILES:
340       GST_OBJECT_LOCK (splitmux);
341       splitmux->max_files = g_value_get_uint (value);
342       GST_OBJECT_UNLOCK (splitmux);
343       break;
344     case PROP_MUXER_OVERHEAD:
345       GST_OBJECT_LOCK (splitmux);
346       splitmux->mux_overhead = g_value_get_double (value);
347       GST_OBJECT_UNLOCK (splitmux);
348       break;
349     case PROP_SINK:
350       GST_OBJECT_LOCK (splitmux);
351       if (splitmux->provided_sink)
352         gst_object_unref (splitmux->provided_sink);
353       splitmux->provided_sink = g_value_dup_object (value);
354       GST_OBJECT_UNLOCK (splitmux);
355       break;
356     case PROP_MUXER:
357       GST_OBJECT_LOCK (splitmux);
358       if (splitmux->provided_muxer)
359         gst_object_unref (splitmux->provided_muxer);
360       splitmux->provided_muxer = g_value_dup_object (value);
361       GST_OBJECT_UNLOCK (splitmux);
362       break;
363     default:
364       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
365       break;
366   }
367 }
368
369 static void
370 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
371     GValue * value, GParamSpec * pspec)
372 {
373   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
374
375   switch (prop_id) {
376     case PROP_LOCATION:
377       GST_OBJECT_LOCK (splitmux);
378       g_value_set_string (value, splitmux->location);
379       GST_OBJECT_UNLOCK (splitmux);
380       break;
381     case PROP_MAX_SIZE_BYTES:
382       GST_OBJECT_LOCK (splitmux);
383       g_value_set_uint64 (value, splitmux->threshold_bytes);
384       GST_OBJECT_UNLOCK (splitmux);
385       break;
386     case PROP_MAX_SIZE_TIME:
387       GST_OBJECT_LOCK (splitmux);
388       g_value_set_uint64 (value, splitmux->threshold_time);
389       GST_OBJECT_UNLOCK (splitmux);
390       break;
391     case PROP_SEND_KEYFRAME_REQUESTS:
392       GST_OBJECT_LOCK (splitmux);
393       g_value_set_boolean (value, splitmux->send_keyframe_requests);
394       GST_OBJECT_UNLOCK (splitmux);
395       break;
396     case PROP_MAX_FILES:
397       GST_OBJECT_LOCK (splitmux);
398       g_value_set_uint (value, splitmux->max_files);
399       GST_OBJECT_UNLOCK (splitmux);
400       break;
401     case PROP_MUXER_OVERHEAD:
402       GST_OBJECT_LOCK (splitmux);
403       g_value_set_double (value, splitmux->mux_overhead);
404       GST_OBJECT_UNLOCK (splitmux);
405       break;
406     case PROP_SINK:
407       GST_OBJECT_LOCK (splitmux);
408       g_value_set_object (value, splitmux->provided_sink);
409       GST_OBJECT_UNLOCK (splitmux);
410       break;
411     case PROP_MUXER:
412       GST_OBJECT_LOCK (splitmux);
413       g_value_set_object (value, splitmux->provided_muxer);
414       GST_OBJECT_UNLOCK (splitmux);
415       break;
416     default:
417       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
418       break;
419   }
420 }
421
422 /* Convenience function */
423 static inline GstClockTimeDiff
424 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
425 {
426   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
427
428   if (GST_CLOCK_TIME_IS_VALID (val)) {
429     gboolean sign =
430         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
431     if (sign > 0)
432       res = val;
433     else if (sign < 0)
434       res = -val;
435   }
436   return res;
437 }
438
439 static GstPad *
440 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
441 {
442   gchar *tmp, *sinkname, *srcname;
443   GstPad *mq_src;
444
445   sinkname = gst_pad_get_name (sink_pad);
446   tmp = sinkname + 5;
447   srcname = g_strdup_printf ("src_%s", tmp);
448
449   mq_src = gst_element_get_static_pad (mq, srcname);
450
451   g_free (sinkname);
452   g_free (srcname);
453
454   return mq_src;
455 }
456
457 static gboolean
458 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
459     GstPad ** src_pad)
460 {
461   GstPad *mq_sink;
462   GstPad *mq_src;
463
464   /* Request a pad from multiqueue, then connect this one, then
465    * discover the corresponding output pad and return both */
466   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
467   if (mq_sink == NULL)
468     return FALSE;
469
470   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
471   if (mq_src == NULL)
472     goto fail;
473
474   *sink_pad = mq_sink;
475   *src_pad = mq_src;
476
477   return TRUE;
478
479 fail:
480   gst_element_release_request_pad (splitmux->mq, mq_sink);
481   return FALSE;
482 }
483
484 static MqStreamCtx *
485 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
486 {
487   MqStreamCtx *ctx;
488
489   ctx = g_new0 (MqStreamCtx, 1);
490   g_atomic_int_set (&ctx->refcount, 1);
491   ctx->splitmux = splitmux;
492   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
493   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
494   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
495   g_queue_init (&ctx->queued_bufs);
496   return ctx;
497 }
498
499 static void
500 mq_stream_ctx_free (MqStreamCtx * ctx)
501 {
502   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
503   g_queue_clear (&ctx->queued_bufs);
504   g_free (ctx);
505 }
506
507 static void
508 mq_stream_ctx_unref (MqStreamCtx * ctx)
509 {
510   if (g_atomic_int_dec_and_test (&ctx->refcount))
511     mq_stream_ctx_free (ctx);
512 }
513
514 static void
515 mq_stream_ctx_ref (MqStreamCtx * ctx)
516 {
517   g_atomic_int_inc (&ctx->refcount);
518 }
519
520 static void
521 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
522 {
523   ctx->sink_pad_block_id = 0;
524   mq_stream_ctx_unref (ctx);
525 }
526
527 static void
528 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
529 {
530   ctx->src_pad_block_id = 0;
531   mq_stream_ctx_unref (ctx);
532 }
533
534 static void
535 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
536 {
537   gchar *location = NULL;
538   GstMessage *msg;
539   const gchar *msg_name = opened ?
540       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
541
542   g_object_get (splitmux->sink, "location", &location, NULL);
543
544   msg = gst_message_new_element (GST_OBJECT (splitmux),
545       gst_structure_new (msg_name,
546           "location", G_TYPE_STRING, location,
547           "running-time", GST_TYPE_CLOCK_TIME,
548           splitmux->reference_ctx->out_running_time, NULL));
549   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
550
551   g_free (location);
552 }
553
554 /* Called with lock held, drops the lock to send EOS to the
555  * pad
556  */
557 static void
558 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
559 {
560   GstEvent *eos;
561   GstPad *pad;
562
563   eos = gst_event_new_eos ();
564   pad = gst_pad_get_peer (ctx->srcpad);
565
566   ctx->out_eos = TRUE;
567
568   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
569   GST_SPLITMUX_UNLOCK (splitmux);
570   gst_pad_send_event (pad, eos);
571   GST_SPLITMUX_LOCK (splitmux);
572
573   gst_object_unref (pad);
574 }
575
576 /* Called with splitmux lock held to check if this output
577  * context needs to sleep to wait for the release of the
578  * next GOP, or to send EOS to close out the current file
579  */
580 static void
581 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
582 {
583   do {
584
585     GST_LOG_OBJECT (ctx->srcpad,
586         "Checking running time %" GST_STIME_FORMAT " against max %"
587         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
588         GST_STIME_ARGS (splitmux->max_out_running_time));
589
590     if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
591         ctx->out_running_time < splitmux->max_out_running_time) {
592       splitmux->have_muxed_something = TRUE;
593       return;
594     }
595
596     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
597       return;
598
599     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
600       if (ctx->out_eos == FALSE) {
601         send_eos (splitmux, ctx);
602         continue;
603       }
604     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
605       start_next_fragment (splitmux);
606       continue;
607     }
608
609     GST_INFO_OBJECT (ctx->srcpad,
610         "Sleeping for running time %"
611         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
612         GST_STIME_ARGS (ctx->out_running_time),
613         GST_STIME_ARGS (splitmux->max_out_running_time));
614     ctx->out_blocked = TRUE;
615     /* Expand the mq if needed before sleeping */
616     check_queue_length (splitmux, ctx);
617     GST_SPLITMUX_WAIT (splitmux);
618     ctx->out_blocked = FALSE;
619     GST_INFO_OBJECT (ctx->srcpad,
620         "Woken for new max running time %" GST_STIME_FORMAT,
621         GST_STIME_ARGS (splitmux->max_out_running_time));
622   } while (1);
623 }
624
625 static gboolean
626 request_next_keyframe (GstSplitMuxSink * splitmux)
627 {
628   GstEvent *ev;
629
630   if (splitmux->send_keyframe_requests == FALSE || splitmux->threshold_time == 0
631       || splitmux->threshold_bytes != 0)
632     return TRUE;
633
634   ev = gst_video_event_new_upstream_force_key_unit (splitmux->fragment_id *
635       splitmux->threshold_time, TRUE, 0);
636   GST_DEBUG_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
637       GST_TIME_ARGS (splitmux->fragment_id * splitmux->threshold_time));
638   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
639 }
640
641 static GstPadProbeReturn
642 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
643 {
644   GstSplitMuxSink *splitmux = ctx->splitmux;
645   MqStreamBuf *buf_info = NULL;
646
647   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
648
649   /* FIXME: Handle buffer lists, until then make it clear they won't work */
650   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
651     g_warning ("Buffer list handling not implemented");
652     return GST_PAD_PROBE_DROP;
653   }
654   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
655     GstEvent *event = gst_pad_probe_info_get_event (info);
656
657     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
658
659     switch (GST_EVENT_TYPE (event)) {
660       case GST_EVENT_SEGMENT:
661         gst_event_copy_segment (event, &ctx->out_segment);
662         break;
663       case GST_EVENT_FLUSH_STOP:
664         GST_SPLITMUX_LOCK (splitmux);
665         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
666         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
667         g_queue_clear (&ctx->queued_bufs);
668         ctx->flushing = FALSE;
669         GST_SPLITMUX_UNLOCK (splitmux);
670         break;
671       case GST_EVENT_FLUSH_START:
672         GST_SPLITMUX_LOCK (splitmux);
673         GST_LOG_OBJECT (pad, "Flush start");
674         ctx->flushing = TRUE;
675         GST_SPLITMUX_BROADCAST (splitmux);
676         GST_SPLITMUX_UNLOCK (splitmux);
677         break;
678       case GST_EVENT_EOS:
679         GST_SPLITMUX_LOCK (splitmux);
680         if (splitmux->state == SPLITMUX_STATE_STOPPED)
681           goto beach;
682         ctx->out_eos = TRUE;
683         GST_SPLITMUX_UNLOCK (splitmux);
684         break;
685       case GST_EVENT_GAP:{
686         GstClockTime gap_ts;
687         GstClockTimeDiff rtime;
688
689         gst_event_parse_gap (event, &gap_ts, NULL);
690         if (gap_ts == GST_CLOCK_TIME_NONE)
691           break;
692
693         GST_SPLITMUX_LOCK (splitmux);
694
695         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
696
697         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
698             GST_STIME_ARGS (rtime));
699
700         if (splitmux->state == SPLITMUX_STATE_STOPPED)
701           goto beach;
702
703         if (rtime != GST_CLOCK_STIME_NONE) {
704           ctx->out_running_time = rtime;
705           complete_or_wait_on_out (splitmux, ctx);
706         }
707         GST_SPLITMUX_UNLOCK (splitmux);
708         break;
709       }
710       case GST_EVENT_CUSTOM_DOWNSTREAM:{
711         const GstStructure *s;
712         GstClockTimeDiff ts = 0;
713
714         s = gst_event_get_structure (event);
715         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
716           break;
717
718         gst_structure_get_int64 (s, "timestamp", &ts);
719
720         GST_SPLITMUX_LOCK (splitmux);
721
722         if (splitmux->state == SPLITMUX_STATE_STOPPED)
723           goto beach;
724         ctx->out_running_time = ts;
725         complete_or_wait_on_out (splitmux, ctx);
726         GST_SPLITMUX_UNLOCK (splitmux);
727         return GST_PAD_PROBE_DROP;
728       }
729       default:
730         break;
731     }
732     return GST_PAD_PROBE_PASS;
733   }
734
735   /* Allow everything through until the configured next stopping point */
736   GST_SPLITMUX_LOCK (splitmux);
737
738   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
739   if (buf_info == NULL)
740     /* Can only happen due to a poorly timed flush */
741     goto beach;
742
743   /* If we have popped a keyframe, decrement the queued_gop count */
744   if (buf_info->keyframe && splitmux->queued_gops > 0)
745     splitmux->queued_gops--;
746
747   ctx->out_running_time = buf_info->run_ts;
748
749   GST_LOG_OBJECT (splitmux,
750       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
751       " size %" G_GUINT64_FORMAT,
752       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
753
754   if (splitmux->opening_first_fragment) {
755     if (request_next_keyframe (splitmux) == FALSE)
756       GST_WARNING_OBJECT (splitmux,
757           "Could not request a keyframe. Files may not split at the exact location they should");
758     send_fragment_opened_closed_msg (splitmux, TRUE);
759     splitmux->opening_first_fragment = FALSE;
760   }
761
762   complete_or_wait_on_out (splitmux, ctx);
763
764   if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
765       splitmux->muxed_out_time < buf_info->run_ts)
766     splitmux->muxed_out_time = buf_info->run_ts;
767
768   splitmux->muxed_out_bytes += buf_info->buf_size;
769   splitmux->last_frame_duration = buf_info->duration;
770
771 #ifndef GST_DISABLE_GST_DEBUG
772   {
773     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
774     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
775         " run ts %" GST_STIME_FORMAT, buf,
776         GST_STIME_ARGS (ctx->out_running_time));
777   }
778 #endif
779
780   GST_SPLITMUX_UNLOCK (splitmux);
781
782   mq_stream_buf_free (buf_info);
783
784   return GST_PAD_PROBE_PASS;
785
786 beach:
787   GST_SPLITMUX_UNLOCK (splitmux);
788   return GST_PAD_PROBE_DROP;
789 }
790
791 static gboolean
792 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
793 {
794   return gst_pad_send_event (peer, gst_event_ref (*event));
795 }
796
797 static void
798 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
799 {
800   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
801
802   gst_pad_sticky_events_foreach (ctx->srcpad,
803       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
804
805   /* Clear EOS flag if not actually EOS */
806   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
807
808   gst_object_unref (peer);
809 }
810
811 /* Called with lock held when a fragment
812  * reaches EOS and it is time to restart
813  * a new fragment
814  */
815 static void
816 start_next_fragment (GstSplitMuxSink * splitmux)
817 {
818   /* 1 change to new file */
819   splitmux->switching_fragment = TRUE;
820
821   gst_element_set_locked_state (splitmux->muxer, TRUE);
822   gst_element_set_locked_state (splitmux->active_sink, TRUE);
823   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
824   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
825
826   set_next_filename (splitmux);
827
828   gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
829   gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
830   gst_element_set_locked_state (splitmux->muxer, FALSE);
831   gst_element_set_locked_state (splitmux->active_sink, FALSE);
832
833   splitmux->switching_fragment = FALSE;
834
835   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
836
837   /* Switch state and go back to processing */
838   if (!splitmux->reference_ctx->in_eos) {
839     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
840     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
841   } else {
842     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
843     splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
844     splitmux->have_muxed_something = FALSE;
845   }
846   splitmux->have_muxed_something =
847       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
848
849   /* Store the overflow parameters as the basis for the next fragment */
850   splitmux->mux_start_time = splitmux->muxed_out_time;
851   if (splitmux->last_frame_duration != GST_CLOCK_STIME_NONE)
852     splitmux->mux_start_time += splitmux->last_frame_duration;
853   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
854
855   GST_DEBUG_OBJECT (splitmux,
856       "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
857       GST_STIME_ARGS (splitmux->max_out_running_time));
858
859   send_fragment_opened_closed_msg (splitmux, TRUE);
860   if (request_next_keyframe (splitmux) == FALSE)
861     GST_WARNING_OBJECT (splitmux,
862         "Could not request a keyframe. Files may not split at the exact location they should");
863
864   GST_SPLITMUX_BROADCAST (splitmux);
865 }
866
867 static void
868 bus_handler (GstBin * bin, GstMessage * message)
869 {
870   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
871
872   switch (GST_MESSAGE_TYPE (message)) {
873     case GST_MESSAGE_EOS:
874       /* If the state is draining out the current file, drop this EOS */
875       GST_SPLITMUX_LOCK (splitmux);
876
877       send_fragment_opened_closed_msg (splitmux, FALSE);
878
879       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
880           splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
881         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
882         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
883         GST_SPLITMUX_BROADCAST (splitmux);
884
885         gst_message_unref (message);
886         GST_SPLITMUX_UNLOCK (splitmux);
887         return;
888       }
889       GST_SPLITMUX_UNLOCK (splitmux);
890       break;
891     case GST_MESSAGE_ASYNC_START:
892     case GST_MESSAGE_ASYNC_DONE:
893       /* Ignore state changes from our children while switching */
894       if (splitmux->switching_fragment) {
895         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink ||
896             GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
897           GST_LOG_OBJECT (splitmux,
898               "Ignoring state change from child %" GST_PTR_FORMAT
899               " while switching", GST_MESSAGE_SRC (message));
900           gst_message_unref (message);
901           return;
902         }
903       }
904       break;
905     default:
906       break;
907   }
908
909   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
910 }
911
912 /* Called with splitmux lock held */
913 /* Called when entering ProcessingCompleteGop state
914  * Assess if mq contents overflowed the current file
915  *   -> If yes, need to switch to new file
916  *   -> if no, set max_out_running_time to let this GOP in and
917  *      go to COLLECTING_GOP_START state
918  */
919 static void
920 handle_gathered_gop (GstSplitMuxSink * splitmux)
921 {
922   GList *cur;
923   guint64 queued_bytes = 0;
924   GstClockTimeDiff queued_time = 0;
925
926   /* Assess if the multiqueue contents overflowed the current file */
927   for (cur = g_list_first (splitmux->contexts);
928       cur != NULL; cur = g_list_next (cur)) {
929     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
930     if (tmpctx->in_running_time > queued_time)
931       queued_time = tmpctx->in_running_time;
932     queued_bytes += tmpctx->in_bytes;
933   }
934
935   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT
936       " splitmuxsink->mux_start_bytes %" G_GUINT64_FORMAT, queued_bytes,
937       splitmux->mux_start_bytes);
938   g_assert (queued_bytes >= splitmux->mux_start_bytes);
939   g_assert (queued_time >= splitmux->mux_start_time);
940
941   queued_bytes -= splitmux->mux_start_bytes;
942   queued_time -= splitmux->mux_start_time;
943
944   /* Expand queued bytes estimate by muxer overhead */
945   queued_bytes += (queued_bytes * splitmux->mux_overhead);
946
947   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
948       " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
949
950   /* Check for overrun - have we output at least one byte and overrun
951    * either threshold? */
952   if ((splitmux->have_muxed_something &&
953           ((splitmux->threshold_bytes > 0 &&
954                   queued_bytes > splitmux->threshold_bytes) ||
955               (splitmux->threshold_time > 0 &&
956                   queued_time > splitmux->threshold_time)))) {
957
958     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
959     GST_INFO_OBJECT (splitmux,
960         "mq overflowed since last, draining out. max out TS is %"
961         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
962     GST_SPLITMUX_BROADCAST (splitmux);
963
964   } else {
965     /* No overflow */
966     GST_LOG_OBJECT (splitmux,
967         "This GOP didn't overflow the fragment. Bytes sent %" G_GUINT64_FORMAT
968         " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
969         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
970         queued_bytes, GST_STIME_ARGS (queued_time));
971
972     /* Wake everyone up to push this one GOP, then sleep */
973     splitmux->have_muxed_something = TRUE;
974
975     if (!splitmux->reference_ctx->in_eos) {
976       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
977       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
978     } else {
979       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
980       splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
981     }
982
983     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
984         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
985     GST_SPLITMUX_BROADCAST (splitmux);
986   }
987
988 }
989
990 /* Called with splitmux lock held */
991 /* Called from each input pad when it is has all the pieces
992  * for a GOP or EOS, starting with the reference pad which has set the
993  * splitmux->max_in_running_time
994  */
995 static void
996 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
997 {
998   GList *cur;
999   gboolean ready = TRUE;
1000   GstClockTimeDiff current_max_in_running_time;
1001
1002   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1003     /* Iterate each pad, and check that the input running time is at least
1004      * up to the reference running time, and if so handle the collected GOP */
1005     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
1006         GST_STIME_FORMAT " ctx %p",
1007         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
1008     for (cur = g_list_first (splitmux->contexts); cur != NULL;
1009         cur = g_list_next (cur)) {
1010       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1011
1012       GST_LOG_OBJECT (splitmux,
1013           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
1014           " EOS %d", tmpctx, tmpctx->srcpad,
1015           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
1016
1017       if (splitmux->max_in_running_time != G_MAXINT64 &&
1018           tmpctx->in_running_time < splitmux->max_in_running_time &&
1019           !tmpctx->in_eos) {
1020         GST_LOG_OBJECT (splitmux,
1021             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
1022             tmpctx, tmpctx->srcpad);
1023         ready = FALSE;
1024         break;
1025       }
1026     }
1027     if (ready) {
1028       GST_DEBUG_OBJECT (splitmux,
1029           "Collected GOP is complete. Processing (ctx %p)", ctx);
1030       /* All pads have a complete GOP, release it into the multiqueue */
1031       handle_gathered_gop (splitmux);
1032     }
1033   }
1034
1035   /* If upstream reached EOS we are not expecting more data, no need to wait
1036    * here. */
1037   if (ctx->in_eos)
1038     return;
1039
1040   /* Some pad is not yet ready, or GOP is being pushed
1041    * either way, sleep and wait to get woken */
1042   current_max_in_running_time = splitmux->max_in_running_time;
1043   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
1044           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
1045       !ctx->flushing &&
1046       (current_max_in_running_time == splitmux->max_in_running_time)) {
1047
1048     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
1049         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
1050         "GOP complete" : "EOF draining", ctx);
1051     GST_SPLITMUX_WAIT (splitmux);
1052
1053     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
1054   }
1055 }
1056
1057 static void
1058 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1059 {
1060   GList *cur;
1061   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
1062
1063   GST_DEBUG_OBJECT (ctx->sinkpad,
1064       "Checking queue length len %u cur_max %u queued gops %u",
1065       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
1066
1067   if (cur_len >= splitmux->mq_max_buffers) {
1068     gboolean allow_grow = FALSE;
1069
1070     /* If collecting a GOP and this pad might block,
1071      * and there isn't already a pending GOP in the queue
1072      * then grow
1073      */
1074     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
1075         ctx->in_running_time < splitmux->max_in_running_time &&
1076         splitmux->queued_gops <= 1) {
1077       allow_grow = TRUE;
1078     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
1079         ctx->is_reference && splitmux->queued_gops <= 1) {
1080       allow_grow = TRUE;
1081     }
1082
1083     if (!allow_grow) {
1084       for (cur = g_list_first (splitmux->contexts);
1085           cur != NULL; cur = g_list_next (cur)) {
1086         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1087         GST_DEBUG_OBJECT (tmpctx->sinkpad,
1088             " len %u out_blocked %d",
1089             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1090         /* If another stream is starving, grow */
1091         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1092           allow_grow = TRUE;
1093         }
1094       }
1095     }
1096
1097     if (allow_grow) {
1098       splitmux->mq_max_buffers = cur_len + 1;
1099
1100       GST_INFO_OBJECT (splitmux,
1101           "Multiqueue overrun - enlarging to %u buffers ctx %p",
1102           splitmux->mq_max_buffers, ctx);
1103
1104       g_object_set (splitmux->mq, "max-size-buffers",
1105           splitmux->mq_max_buffers, NULL);
1106     }
1107   }
1108 }
1109
1110 static GstPadProbeReturn
1111 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1112 {
1113   GstSplitMuxSink *splitmux = ctx->splitmux;
1114   GstBuffer *buf;
1115   MqStreamBuf *buf_info = NULL;
1116   GstClockTime ts;
1117   gboolean loop_again;
1118   gboolean keyframe = FALSE;
1119
1120   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1121
1122   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1123   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1124     g_warning ("Buffer list handling not implemented");
1125     return GST_PAD_PROBE_DROP;
1126   }
1127   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1128     GstEvent *event = gst_pad_probe_info_get_event (info);
1129     switch (GST_EVENT_TYPE (event)) {
1130       case GST_EVENT_SEGMENT:
1131         gst_event_copy_segment (event, &ctx->in_segment);
1132         break;
1133       case GST_EVENT_FLUSH_STOP:
1134         GST_SPLITMUX_LOCK (splitmux);
1135         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1136         ctx->in_eos = FALSE;
1137         ctx->in_bytes = 0;
1138         ctx->in_running_time = GST_CLOCK_STIME_NONE;
1139         GST_SPLITMUX_UNLOCK (splitmux);
1140         break;
1141       case GST_EVENT_EOS:
1142         GST_SPLITMUX_LOCK (splitmux);
1143         ctx->in_eos = TRUE;
1144
1145         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1146           goto beach;
1147
1148         if (ctx->is_reference) {
1149           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1150           /* Act as if this is a new keyframe with infinite timestamp */
1151           splitmux->max_in_running_time = G_MAXINT64;
1152           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1153           /* Wake up other input pads to collect this GOP */
1154           GST_SPLITMUX_BROADCAST (splitmux);
1155           check_completed_gop (splitmux, ctx);
1156         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1157           /* If we are waiting for a GOP to be completed (ie, for aux
1158            * pads to catch up), then this pad is complete, so check
1159            * if the whole GOP is.
1160            */
1161           check_completed_gop (splitmux, ctx);
1162         }
1163         GST_SPLITMUX_UNLOCK (splitmux);
1164         break;
1165       default:
1166         break;
1167     }
1168     return GST_PAD_PROBE_PASS;
1169   }
1170
1171   buf = gst_pad_probe_info_get_buffer (info);
1172   buf_info = mq_stream_buf_new ();
1173
1174   if (GST_BUFFER_PTS_IS_VALID (buf))
1175     ts = GST_BUFFER_PTS (buf);
1176   else
1177     ts = GST_BUFFER_DTS (buf);
1178
1179   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1180
1181   GST_SPLITMUX_LOCK (splitmux);
1182
1183   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1184     goto beach;
1185
1186   /* If this buffer has a timestamp, advance the input timestamp of the
1187    * stream */
1188   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1189     GstClockTimeDiff running_time =
1190         my_segment_to_running_time (&ctx->in_segment, ts);
1191
1192     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1193         GST_STIME_ARGS (running_time));
1194
1195     if (GST_CLOCK_STIME_IS_VALID (running_time)
1196         && running_time > ctx->in_running_time)
1197       ctx->in_running_time = running_time;
1198   }
1199
1200   /* Try to make sure we have a valid running time */
1201   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1202     ctx->in_running_time =
1203         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1204   }
1205
1206   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1207       GST_STIME_ARGS (ctx->in_running_time));
1208
1209   buf_info->run_ts = ctx->in_running_time;
1210   buf_info->buf_size = gst_buffer_get_size (buf);
1211   buf_info->duration = GST_BUFFER_DURATION (buf);
1212
1213   /* Update total input byte counter for overflow detect */
1214   ctx->in_bytes += buf_info->buf_size;
1215
1216   /* initialize mux_start_time */
1217   if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
1218     splitmux->mux_start_time = buf_info->run_ts;
1219     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1220         GST_STIME_ARGS (splitmux->mux_start_time));
1221     /* Also take this as the first start time when starting up,
1222      * so that we start counting overflow from the first frame */
1223     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1224       splitmux->max_in_running_time = splitmux->mux_start_time;
1225   }
1226
1227   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1228       " total in_bytes %" G_GUINT64_FORMAT,
1229       GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1230
1231   loop_again = TRUE;
1232   do {
1233     if (ctx->flushing)
1234       break;
1235
1236     switch (splitmux->state) {
1237       case SPLITMUX_STATE_COLLECTING_GOP_START:
1238         if (ctx->is_reference) {
1239           /* If a keyframe, we have a complete GOP */
1240           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1241               !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1242               splitmux->max_in_running_time >= ctx->in_running_time) {
1243             /* Pass this buffer through */
1244             loop_again = FALSE;
1245             break;
1246           }
1247           GST_INFO_OBJECT (pad,
1248               "Have keyframe with running time %" GST_STIME_FORMAT,
1249               GST_STIME_ARGS (ctx->in_running_time));
1250           keyframe = TRUE;
1251           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1252           splitmux->max_in_running_time = ctx->in_running_time;
1253           /* Wake up other input pads to collect this GOP */
1254           GST_SPLITMUX_BROADCAST (splitmux);
1255           check_completed_gop (splitmux, ctx);
1256         } else {
1257           /* We're still waiting for a keyframe on the reference pad, sleep */
1258           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1259           GST_SPLITMUX_WAIT (splitmux);
1260           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1261               splitmux->state);
1262         }
1263         break;
1264       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1265
1266         /* If we overran the target timestamp, it might be time to process
1267          * the GOP, otherwise bail out for more data
1268          */
1269         GST_LOG_OBJECT (pad,
1270             "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
1271             GST_STIME_ARGS (ctx->in_running_time),
1272             GST_STIME_ARGS (splitmux->max_in_running_time));
1273
1274         if (ctx->in_running_time < splitmux->max_in_running_time) {
1275           loop_again = FALSE;
1276           break;
1277         }
1278
1279         GST_LOG_OBJECT (pad,
1280             "Collected last packet of GOP. Checking other pads");
1281         check_completed_gop (splitmux, ctx);
1282         break;
1283       case SPLITMUX_STATE_ENDING_FILE:{
1284         GstEvent *event;
1285
1286         /* If somes streams received no buffer during the last GOP that overran,
1287          * because its next buffer has a timestamp bigger than
1288          * ctx->max_in_running_time, its queue is empty. In that case the only
1289          * way to wakeup the output thread is by injecting an event in the
1290          * queue. This usually happen with subtitle streams.
1291          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1292         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1293         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1294             GST_EVENT_TYPE_SERIALIZED,
1295             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1296                 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1297
1298         GST_SPLITMUX_UNLOCK (splitmux);
1299         gst_pad_send_event (ctx->sinkpad, event);
1300         GST_SPLITMUX_LOCK (splitmux);
1301         /* state may have changed while we were unlocked. Loop again if so */
1302         if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
1303           break;
1304         /* fallthrough */
1305       }
1306       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1307         /* A fragment is ending, wait until that's done before continuing */
1308         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1309         GST_SPLITMUX_WAIT (splitmux);
1310         GST_DEBUG_OBJECT (pad,
1311             "Done sleeping for fragment restart state now %d", splitmux->state);
1312         break;
1313       default:
1314         loop_again = FALSE;
1315         break;
1316     }
1317   } while (loop_again);
1318
1319   if (keyframe) {
1320     splitmux->queued_gops++;
1321     buf_info->keyframe = TRUE;
1322   }
1323
1324   /* Now add this buffer to the queue just before returning */
1325   g_queue_push_head (&ctx->queued_bufs, buf_info);
1326
1327   /* Check the buffer will fit in the mq */
1328   check_queue_length (splitmux, ctx);
1329
1330   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1331       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1332
1333   GST_SPLITMUX_UNLOCK (splitmux);
1334   return GST_PAD_PROBE_PASS;
1335
1336 beach:
1337   GST_SPLITMUX_UNLOCK (splitmux);
1338   if (buf_info)
1339     mq_stream_buf_free (buf_info);
1340   return GST_PAD_PROBE_PASS;
1341 }
1342
1343 static GstPad *
1344 gst_splitmux_sink_request_new_pad (GstElement * element,
1345     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1346 {
1347   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1348   GstPadTemplate *mux_template = NULL;
1349   GstPad *res = NULL;
1350   GstPad *mq_sink, *mq_src;
1351   gchar *gname;
1352   gboolean is_video = FALSE;
1353   MqStreamCtx *ctx;
1354
1355   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1356
1357   GST_SPLITMUX_LOCK (splitmux);
1358   if (!create_elements (splitmux))
1359     goto fail;
1360
1361   if (templ->name_template) {
1362     if (g_str_equal (templ->name_template, "video")) {
1363       /* FIXME: Look for a pad template with matching caps, rather than by name */
1364       mux_template =
1365           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1366           (splitmux->muxer), "video_%u");
1367       is_video = TRUE;
1368       name = NULL;
1369     } else {
1370       mux_template =
1371           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1372           (splitmux->muxer), templ->name_template);
1373     }
1374     if (mux_template == NULL) {
1375       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1376       mux_template =
1377           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1378           (splitmux->muxer), "sink_%d");
1379     }
1380   }
1381
1382   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1383   if (res == NULL)
1384     goto fail;
1385
1386   if (is_video)
1387     gname = g_strdup ("video");
1388   else if (name == NULL)
1389     gname = gst_pad_get_name (res);
1390   else
1391     gname = g_strdup (name);
1392
1393   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1394     gst_element_release_request_pad (splitmux->muxer, res);
1395     gst_object_unref (GST_OBJECT (res));
1396     goto fail;
1397   }
1398
1399   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1400     gst_element_release_request_pad (splitmux->muxer, res);
1401     gst_object_unref (GST_OBJECT (res));
1402     gst_element_release_request_pad (splitmux->mq, mq_sink);
1403     gst_object_unref (GST_OBJECT (mq_sink));
1404     goto fail;
1405   }
1406
1407   gst_object_unref (GST_OBJECT (res));
1408
1409   ctx = mq_stream_ctx_new (splitmux);
1410   ctx->srcpad = mq_src;
1411   ctx->sinkpad = mq_sink;
1412
1413   mq_stream_ctx_ref (ctx);
1414   ctx->src_pad_block_id =
1415       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1416       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1417       _pad_block_destroy_src_notify);
1418   if (is_video && splitmux->reference_ctx != NULL) {
1419     splitmux->reference_ctx->is_reference = FALSE;
1420     splitmux->reference_ctx = NULL;
1421   }
1422   if (splitmux->reference_ctx == NULL) {
1423     splitmux->reference_ctx = ctx;
1424     ctx->is_reference = TRUE;
1425   }
1426
1427   res = gst_ghost_pad_new (gname, mq_sink);
1428   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1429
1430   mq_stream_ctx_ref (ctx);
1431   ctx->sink_pad_block_id =
1432       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1433       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1434       _pad_block_destroy_sink_notify);
1435
1436   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1437       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1438
1439   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1440
1441   g_free (gname);
1442
1443   gst_object_unref (mq_sink);
1444   gst_object_unref (mq_src);
1445
1446   gst_pad_set_active (res, TRUE);
1447   gst_element_add_pad (element, res);
1448   GST_SPLITMUX_UNLOCK (splitmux);
1449
1450   return res;
1451 fail:
1452   GST_SPLITMUX_UNLOCK (splitmux);
1453   return NULL;
1454 }
1455
1456 static void
1457 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1458 {
1459   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1460   GstPad *mqsink, *mqsrc, *muxpad;
1461   MqStreamCtx *ctx =
1462       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1463
1464   GST_SPLITMUX_LOCK (splitmux);
1465
1466   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1467     goto fail;                  /* Elements don't exist yet - nothing to release */
1468
1469   GST_INFO_OBJECT (pad, "releasing request pad");
1470
1471   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1472   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1473   muxpad = gst_pad_get_peer (mqsrc);
1474
1475   /* Remove the context from our consideration */
1476   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1477
1478   if (ctx->sink_pad_block_id)
1479     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1480
1481   if (ctx->src_pad_block_id)
1482     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1483
1484   /* Can release the context now */
1485   mq_stream_ctx_unref (ctx);
1486
1487   /* Release and free the mq input */
1488   gst_element_release_request_pad (splitmux->mq, mqsink);
1489
1490   /* Release and free the muxer input */
1491   gst_element_release_request_pad (splitmux->muxer, muxpad);
1492
1493   gst_object_unref (mqsink);
1494   gst_object_unref (mqsrc);
1495   gst_object_unref (muxpad);
1496
1497   gst_element_remove_pad (element, pad);
1498
1499   /* Reset the internal elements only after all request pads are released */
1500   if (splitmux->contexts == NULL)
1501     gst_splitmux_reset (splitmux);
1502
1503 fail:
1504   GST_SPLITMUX_UNLOCK (splitmux);
1505 }
1506
1507 static GstElement *
1508 create_element (GstSplitMuxSink * splitmux,
1509     const gchar * factory, const gchar * name)
1510 {
1511   GstElement *ret = gst_element_factory_make (factory, name);
1512   if (ret == NULL) {
1513     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1514     return NULL;
1515   }
1516
1517   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1518     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1519     gst_object_unref (ret);
1520     return NULL;
1521   }
1522
1523   return ret;
1524 }
1525
1526 static gboolean
1527 create_elements (GstSplitMuxSink * splitmux)
1528 {
1529   /* Create internal elements */
1530   if (splitmux->mq == NULL) {
1531     if ((splitmux->mq =
1532             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1533       goto fail;
1534
1535     splitmux->mq_max_buffers = 5;
1536     /* No bytes or time limit, we limit buffers manually */
1537     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1538         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1539   }
1540
1541   if (splitmux->muxer == NULL) {
1542     GstElement *provided_muxer = NULL;
1543
1544     GST_OBJECT_LOCK (splitmux);
1545     if (splitmux->provided_muxer != NULL)
1546       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1547     GST_OBJECT_UNLOCK (splitmux);
1548
1549     if (provided_muxer == NULL) {
1550       if ((splitmux->muxer =
1551               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1552         goto fail;
1553     } else {
1554       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1555         g_warning ("Could not add muxer element - splitmuxsink will not work");
1556         gst_object_unref (provided_muxer);
1557         goto fail;
1558       }
1559
1560       splitmux->muxer = provided_muxer;
1561       gst_object_unref (provided_muxer);
1562     }
1563   }
1564
1565   return TRUE;
1566 fail:
1567   return FALSE;
1568 }
1569
1570 static GstElement *
1571 find_sink (GstElement * e)
1572 {
1573   GstElement *res = NULL;
1574   GstIterator *iter;
1575   gboolean done = FALSE;
1576   GValue data = { 0, };
1577
1578   if (!GST_IS_BIN (e))
1579     return e;
1580
1581   iter = gst_bin_iterate_sinks (GST_BIN (e));
1582   while (!done) {
1583     switch (gst_iterator_next (iter, &data)) {
1584       case GST_ITERATOR_OK:
1585       {
1586         GstElement *child = g_value_get_object (&data);
1587         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1588                 "location") != NULL) {
1589           res = child;
1590           done = TRUE;
1591         }
1592         g_value_reset (&data);
1593         break;
1594       }
1595       case GST_ITERATOR_RESYNC:
1596         gst_iterator_resync (iter);
1597         break;
1598       case GST_ITERATOR_DONE:
1599         done = TRUE;
1600         break;
1601       case GST_ITERATOR_ERROR:
1602         g_assert_not_reached ();
1603         break;
1604     }
1605   }
1606   g_value_unset (&data);
1607   gst_iterator_free (iter);
1608
1609   return res;
1610 }
1611
1612 static gboolean
1613 create_sink (GstSplitMuxSink * splitmux)
1614 {
1615   GstElement *provided_sink = NULL;
1616
1617   if (splitmux->active_sink == NULL) {
1618
1619     GST_OBJECT_LOCK (splitmux);
1620     if (splitmux->provided_sink != NULL)
1621       provided_sink = gst_object_ref (splitmux->provided_sink);
1622     GST_OBJECT_UNLOCK (splitmux);
1623
1624     if (provided_sink == NULL) {
1625       if ((splitmux->sink =
1626               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1627         goto fail;
1628       splitmux->active_sink = splitmux->sink;
1629     } else {
1630       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1631         g_warning ("Could not add sink elements - splitmuxsink will not work");
1632         gst_object_unref (provided_sink);
1633         goto fail;
1634       }
1635
1636       splitmux->active_sink = provided_sink;
1637
1638       /* The bin holds a ref now, we can drop our tmp ref */
1639       gst_object_unref (provided_sink);
1640
1641       /* Find the sink element */
1642       splitmux->sink = find_sink (splitmux->active_sink);
1643       if (splitmux->sink == NULL) {
1644         g_warning
1645             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1646         goto fail;
1647       }
1648     }
1649
1650     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1651       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1652       goto fail;
1653     }
1654   }
1655
1656   return TRUE;
1657 fail:
1658   return FALSE;
1659 }
1660
1661 #ifdef __GNUC__
1662 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1663 #endif
1664 static void
1665 set_next_filename (GstSplitMuxSink * splitmux)
1666 {
1667   gchar *fname = NULL;
1668   gst_splitmux_sink_ensure_max_files (splitmux);
1669
1670   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1671       splitmux->fragment_id, &fname);
1672
1673   if (!fname)
1674     fname = splitmux->location ?
1675         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1676
1677   if (fname) {
1678     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1679     g_object_set (splitmux->sink, "location", fname, NULL);
1680     g_free (fname);
1681
1682     splitmux->fragment_id++;
1683   }
1684 }
1685
1686 static GstStateChangeReturn
1687 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1688 {
1689   GstStateChangeReturn ret;
1690   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1691
1692   switch (transition) {
1693     case GST_STATE_CHANGE_NULL_TO_READY:{
1694       GST_SPLITMUX_LOCK (splitmux);
1695       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1696         ret = GST_STATE_CHANGE_FAILURE;
1697         GST_SPLITMUX_UNLOCK (splitmux);
1698         goto beach;
1699       }
1700       GST_SPLITMUX_UNLOCK (splitmux);
1701       splitmux->fragment_id = 0;
1702       set_next_filename (splitmux);
1703       break;
1704     }
1705     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1706       GST_SPLITMUX_LOCK (splitmux);
1707       /* Start by collecting one input on each pad */
1708       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1709       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
1710       splitmux->muxed_out_time = splitmux->mux_start_time =
1711           splitmux->last_frame_duration = GST_CLOCK_STIME_NONE;
1712       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1713       splitmux->opening_first_fragment = TRUE;
1714       GST_SPLITMUX_UNLOCK (splitmux);
1715       break;
1716     }
1717     case GST_STATE_CHANGE_PAUSED_TO_READY:
1718     case GST_STATE_CHANGE_READY_TO_NULL:
1719       GST_SPLITMUX_LOCK (splitmux);
1720       splitmux->state = SPLITMUX_STATE_STOPPED;
1721       /* Wake up any blocked threads */
1722       GST_LOG_OBJECT (splitmux,
1723           "State change -> NULL or READY. Waking threads");
1724       GST_SPLITMUX_BROADCAST (splitmux);
1725       GST_SPLITMUX_UNLOCK (splitmux);
1726       break;
1727     default:
1728       break;
1729   }
1730
1731   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1732   if (ret == GST_STATE_CHANGE_FAILURE)
1733     goto beach;
1734
1735   switch (transition) {
1736     case GST_STATE_CHANGE_READY_TO_NULL:
1737       GST_SPLITMUX_LOCK (splitmux);
1738       splitmux->fragment_id = 0;
1739       /* Reset internal elements only if no pad contexts are using them */
1740       if (splitmux->contexts == NULL)
1741         gst_splitmux_reset (splitmux);
1742       GST_SPLITMUX_UNLOCK (splitmux);
1743       break;
1744     default:
1745       break;
1746   }
1747
1748 beach:
1749
1750   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1751       ret == GST_STATE_CHANGE_FAILURE) {
1752     /* Cleanup elements on failed transition out of NULL */
1753     gst_splitmux_reset (splitmux);
1754   }
1755   return ret;
1756 }
1757
1758 gboolean
1759 register_splitmuxsink (GstPlugin * plugin)
1760 {
1761   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1762       "Split File Muxing Sink");
1763
1764   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1765       GST_TYPE_SPLITMUX_SINK);
1766 }
1767
1768 static void
1769 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1770 {
1771   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1772     splitmux->fragment_id = 0;
1773   }
1774 }