d98a98426116f7713492d1523b41ef7341d3477a
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include <gst/video/video.h>
59 #include "gstsplitmuxsink.h"
60
61 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
62 #define GST_CAT_DEFAULT splitmux_debug
63
64 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
65 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
66 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
67 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
68
69 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
70 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
71
72 enum
73 {
74   PROP_0,
75   PROP_LOCATION,
76   PROP_MAX_SIZE_TIME,
77   PROP_MAX_SIZE_BYTES,
78   PROP_MAX_SIZE_TIMECODE,
79   PROP_SEND_KEYFRAME_REQUESTS,
80   PROP_MAX_FILES,
81   PROP_MUXER_OVERHEAD,
82   PROP_MUXER,
83   PROP_SINK
84 };
85
86 #define DEFAULT_MAX_SIZE_TIME       0
87 #define DEFAULT_MAX_SIZE_BYTES      0
88 #define DEFAULT_MAX_FILES           0
89 #define DEFAULT_MUXER_OVERHEAD      0.02
90 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
91 #define DEFAULT_MUXER "mp4mux"
92 #define DEFAULT_SINK "filesink"
93
94 enum
95 {
96   SIGNAL_FORMAT_LOCATION,
97   SIGNAL_FORMAT_LOCATION_FULL,
98   SIGNAL_LAST
99 };
100
101 static guint signals[SIGNAL_LAST];
102
103 static GstStaticPadTemplate video_sink_template =
104 GST_STATIC_PAD_TEMPLATE ("video",
105     GST_PAD_SINK,
106     GST_PAD_REQUEST,
107     GST_STATIC_CAPS_ANY);
108 static GstStaticPadTemplate audio_sink_template =
109 GST_STATIC_PAD_TEMPLATE ("audio_%u",
110     GST_PAD_SINK,
111     GST_PAD_REQUEST,
112     GST_STATIC_CAPS_ANY);
113 static GstStaticPadTemplate subtitle_sink_template =
114 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
115     GST_PAD_SINK,
116     GST_PAD_REQUEST,
117     GST_STATIC_CAPS_ANY);
118
119 static GQuark PAD_CONTEXT;
120
121 static void
122 _do_init (void)
123 {
124   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
125 }
126
127 #define gst_splitmux_sink_parent_class parent_class
128 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
129     _do_init ());
130
131 static gboolean create_muxer (GstSplitMuxSink * splitmux);
132 static gboolean create_sink (GstSplitMuxSink * splitmux);
133 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
134     const GValue * value, GParamSpec * pspec);
135 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
136     GValue * value, GParamSpec * pspec);
137 static void gst_splitmux_sink_dispose (GObject * object);
138 static void gst_splitmux_sink_finalize (GObject * object);
139
140 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
141     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
142 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
143
144 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
145     element, GstStateChange transition);
146
147 static void bus_handler (GstBin * bin, GstMessage * msg);
148 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
149 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
150 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
151 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
152
153 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
154 static GstElement *create_element (GstSplitMuxSink * splitmux,
155     const gchar * factory, const gchar * name, gboolean locked);
156
157 static void do_async_done (GstSplitMuxSink * splitmux);
158
159 static MqStreamBuf *
160 mq_stream_buf_new (void)
161 {
162   return g_slice_new0 (MqStreamBuf);
163 }
164
165 static void
166 mq_stream_buf_free (MqStreamBuf * data)
167 {
168   g_slice_free (MqStreamBuf, data);
169 }
170
171 static SplitMuxOutputCommand *
172 out_cmd_buf_new (void)
173 {
174   return g_slice_new0 (SplitMuxOutputCommand);
175 }
176
177 static void
178 out_cmd_buf_free (SplitMuxOutputCommand * data)
179 {
180   g_slice_free (SplitMuxOutputCommand, data);
181 }
182
183 static void
184 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
185 {
186   GObjectClass *gobject_class = (GObjectClass *) klass;
187   GstElementClass *gstelement_class = (GstElementClass *) klass;
188   GstBinClass *gstbin_class = (GstBinClass *) klass;
189
190   gobject_class->set_property = gst_splitmux_sink_set_property;
191   gobject_class->get_property = gst_splitmux_sink_get_property;
192   gobject_class->dispose = gst_splitmux_sink_dispose;
193   gobject_class->finalize = gst_splitmux_sink_finalize;
194
195   gst_element_class_set_static_metadata (gstelement_class,
196       "Split Muxing Bin", "Generic/Bin/Muxer",
197       "Convenience bin that muxes incoming streams into multiple time/size limited files",
198       "Jan Schmidt <jan@centricular.com>");
199
200   gst_element_class_add_static_pad_template (gstelement_class,
201       &video_sink_template);
202   gst_element_class_add_static_pad_template (gstelement_class,
203       &audio_sink_template);
204   gst_element_class_add_static_pad_template (gstelement_class,
205       &subtitle_sink_template);
206
207   gstelement_class->change_state =
208       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
209   gstelement_class->request_new_pad =
210       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
211   gstelement_class->release_pad =
212       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
213
214   gstbin_class->handle_message = bus_handler;
215
216   g_object_class_install_property (gobject_class, PROP_LOCATION,
217       g_param_spec_string ("location", "File Output Pattern",
218           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
219           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
221       g_param_spec_double ("mux-overhead", "Muxing Overhead",
222           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
223           DEFAULT_MUXER_OVERHEAD,
224           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
225
226   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
227       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
228           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
229           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
231       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
232           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
233           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
234   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
235       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
236           "Maximum difference in timecode between first and last frame. "
237           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
238           "Will only be effective if a timecode track is present.",
239           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
240   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
241       g_param_spec_boolean ("send-keyframe-requests",
242           "Request keyframes at max-size-time",
243           "Request a keyframe every max-size-time ns to try splitting at that point. "
244           "Needs max-size-bytes to be 0 in order to be effective.",
245           DEFAULT_SEND_KEYFRAME_REQUESTS,
246           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
247   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
248       g_param_spec_uint ("max-files", "Max files",
249           "Maximum number of files to keep on disk. Once the maximum is reached,"
250           "old files start to be deleted to make room for new ones.", 0,
251           G_MAXUINT, DEFAULT_MAX_FILES,
252           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
253
254
255   g_object_class_install_property (gobject_class, PROP_MUXER,
256       g_param_spec_object ("muxer", "Muxer",
257           "The muxer element to use (NULL = default mp4mux)",
258           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259   g_object_class_install_property (gobject_class, PROP_SINK,
260       g_param_spec_object ("sink", "Sink",
261           "The sink element (or element chain) to use (NULL = default filesink)",
262           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
263
264   /**
265    * GstSplitMuxSink::format-location:
266    * @splitmux: the #GstSplitMuxSink
267    * @fragment_id: the sequence number of the file to be created
268    *
269    * Returns: the location to be used for the next output file
270    */
271   signals[SIGNAL_FORMAT_LOCATION] =
272       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
273       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
274
275   /**
276    * GstSplitMuxSink::format-location-full:
277    * @splitmux: the #GstSplitMuxSink
278    * @fragment_id: the sequence number of the file to be created
279    * @first_sample: A #GstSample containing the first buffer
280    *   from the reference stream in the new file
281    *
282    * Returns: the location to be used for the next output file
283    */
284   signals[SIGNAL_FORMAT_LOCATION_FULL] =
285       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
286       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
287       GST_TYPE_SAMPLE);
288 }
289
290 static void
291 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
292 {
293   g_mutex_init (&splitmux->lock);
294   g_cond_init (&splitmux->input_cond);
295   g_cond_init (&splitmux->output_cond);
296   g_queue_init (&splitmux->out_cmd_q);
297
298   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
299   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
300   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
301   splitmux->max_files = DEFAULT_MAX_FILES;
302   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
303   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
304
305   splitmux->threshold_timecode_str = NULL;
306
307   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
308 }
309
310 static void
311 gst_splitmux_reset (GstSplitMuxSink * splitmux)
312 {
313   if (splitmux->muxer) {
314     gst_element_set_locked_state (splitmux->muxer, TRUE);
315     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
316     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
317   }
318   if (splitmux->active_sink) {
319     gst_element_set_locked_state (splitmux->active_sink, TRUE);
320     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
321     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
322   }
323
324   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
325 }
326
327 static void
328 gst_splitmux_sink_dispose (GObject * object)
329 {
330   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
331
332   /* Calling parent dispose invalidates all child pointers */
333   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
334
335   G_OBJECT_CLASS (parent_class)->dispose (object);
336 }
337
338 static void
339 gst_splitmux_sink_finalize (GObject * object)
340 {
341   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
342   g_cond_clear (&splitmux->input_cond);
343   g_cond_clear (&splitmux->output_cond);
344   g_mutex_clear (&splitmux->lock);
345   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
346   g_queue_clear (&splitmux->out_cmd_q);
347
348   if (splitmux->provided_sink)
349     gst_object_unref (splitmux->provided_sink);
350   if (splitmux->provided_muxer)
351     gst_object_unref (splitmux->provided_muxer);
352
353   if (splitmux->threshold_timecode_str)
354     g_free (splitmux->threshold_timecode_str);
355
356   g_free (splitmux->location);
357
358   /* Make sure to free any un-released contexts */
359   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
360   g_list_free (splitmux->contexts);
361
362   G_OBJECT_CLASS (parent_class)->finalize (object);
363 }
364
365 static void
366 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
367     const GValue * value, GParamSpec * pspec)
368 {
369   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
370
371   switch (prop_id) {
372     case PROP_LOCATION:{
373       GST_OBJECT_LOCK (splitmux);
374       g_free (splitmux->location);
375       splitmux->location = g_value_dup_string (value);
376       GST_OBJECT_UNLOCK (splitmux);
377       break;
378     }
379     case PROP_MAX_SIZE_BYTES:
380       GST_OBJECT_LOCK (splitmux);
381       splitmux->threshold_bytes = g_value_get_uint64 (value);
382       GST_OBJECT_UNLOCK (splitmux);
383       break;
384     case PROP_MAX_SIZE_TIME:
385       GST_OBJECT_LOCK (splitmux);
386       splitmux->threshold_time = g_value_get_uint64 (value);
387       GST_OBJECT_UNLOCK (splitmux);
388       break;
389     case PROP_MAX_SIZE_TIMECODE:
390       GST_OBJECT_LOCK (splitmux);
391       splitmux->threshold_timecode_str = g_value_dup_string (value);
392       GST_OBJECT_UNLOCK (splitmux);
393       break;
394     case PROP_SEND_KEYFRAME_REQUESTS:
395       GST_OBJECT_LOCK (splitmux);
396       splitmux->send_keyframe_requests = g_value_get_boolean (value);
397       GST_OBJECT_UNLOCK (splitmux);
398       break;
399     case PROP_MAX_FILES:
400       GST_OBJECT_LOCK (splitmux);
401       splitmux->max_files = g_value_get_uint (value);
402       GST_OBJECT_UNLOCK (splitmux);
403       break;
404     case PROP_MUXER_OVERHEAD:
405       GST_OBJECT_LOCK (splitmux);
406       splitmux->mux_overhead = g_value_get_double (value);
407       GST_OBJECT_UNLOCK (splitmux);
408       break;
409     case PROP_SINK:
410       GST_OBJECT_LOCK (splitmux);
411       if (splitmux->provided_sink)
412         gst_object_unref (splitmux->provided_sink);
413       splitmux->provided_sink = g_value_get_object (value);
414       gst_object_ref_sink (splitmux->provided_sink);
415       GST_OBJECT_UNLOCK (splitmux);
416       break;
417     case PROP_MUXER:
418       GST_OBJECT_LOCK (splitmux);
419       if (splitmux->provided_muxer)
420         gst_object_unref (splitmux->provided_muxer);
421       splitmux->provided_muxer = g_value_get_object (value);
422       gst_object_ref_sink (splitmux->provided_muxer);
423       GST_OBJECT_UNLOCK (splitmux);
424       break;
425     default:
426       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
427       break;
428   }
429 }
430
431 static void
432 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
433     GValue * value, GParamSpec * pspec)
434 {
435   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
436
437   switch (prop_id) {
438     case PROP_LOCATION:
439       GST_OBJECT_LOCK (splitmux);
440       g_value_set_string (value, splitmux->location);
441       GST_OBJECT_UNLOCK (splitmux);
442       break;
443     case PROP_MAX_SIZE_BYTES:
444       GST_OBJECT_LOCK (splitmux);
445       g_value_set_uint64 (value, splitmux->threshold_bytes);
446       GST_OBJECT_UNLOCK (splitmux);
447       break;
448     case PROP_MAX_SIZE_TIME:
449       GST_OBJECT_LOCK (splitmux);
450       g_value_set_uint64 (value, splitmux->threshold_time);
451       GST_OBJECT_UNLOCK (splitmux);
452       break;
453     case PROP_MAX_SIZE_TIMECODE:
454       GST_OBJECT_LOCK (splitmux);
455       g_value_set_string (value, splitmux->threshold_timecode_str);
456       GST_OBJECT_UNLOCK (splitmux);
457       break;
458     case PROP_SEND_KEYFRAME_REQUESTS:
459       GST_OBJECT_LOCK (splitmux);
460       g_value_set_boolean (value, splitmux->send_keyframe_requests);
461       GST_OBJECT_UNLOCK (splitmux);
462       break;
463     case PROP_MAX_FILES:
464       GST_OBJECT_LOCK (splitmux);
465       g_value_set_uint (value, splitmux->max_files);
466       GST_OBJECT_UNLOCK (splitmux);
467       break;
468     case PROP_MUXER_OVERHEAD:
469       GST_OBJECT_LOCK (splitmux);
470       g_value_set_double (value, splitmux->mux_overhead);
471       GST_OBJECT_UNLOCK (splitmux);
472       break;
473     case PROP_SINK:
474       GST_OBJECT_LOCK (splitmux);
475       g_value_set_object (value, splitmux->provided_sink);
476       GST_OBJECT_UNLOCK (splitmux);
477       break;
478     case PROP_MUXER:
479       GST_OBJECT_LOCK (splitmux);
480       g_value_set_object (value, splitmux->provided_muxer);
481       GST_OBJECT_UNLOCK (splitmux);
482       break;
483     default:
484       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
485       break;
486   }
487 }
488
489 /* Convenience function */
490 static inline GstClockTimeDiff
491 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
492 {
493   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
494
495   if (GST_CLOCK_TIME_IS_VALID (val)) {
496     gboolean sign =
497         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
498     if (sign > 0)
499       res = val;
500     else if (sign < 0)
501       res = -val;
502   }
503   return res;
504 }
505
506 static MqStreamCtx *
507 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
508 {
509   MqStreamCtx *ctx;
510
511   ctx = g_new0 (MqStreamCtx, 1);
512   g_atomic_int_set (&ctx->refcount, 1);
513   ctx->splitmux = splitmux;
514   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
515   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
516   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
517   g_queue_init (&ctx->queued_bufs);
518   return ctx;
519 }
520
521 static void
522 mq_stream_ctx_free (MqStreamCtx * ctx)
523 {
524   if (ctx->q) {
525     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
526     gst_element_set_locked_state (ctx->q, TRUE);
527     gst_element_set_state (ctx->q, GST_STATE_NULL);
528     gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
529     gst_object_unref (ctx->q);
530   }
531   gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
532   gst_object_unref (ctx->sinkpad);
533   gst_object_unref (ctx->srcpad);
534   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
535   g_queue_clear (&ctx->queued_bufs);
536   g_free (ctx);
537 }
538
539 static void
540 mq_stream_ctx_unref (MqStreamCtx * ctx)
541 {
542   if (g_atomic_int_dec_and_test (&ctx->refcount))
543     mq_stream_ctx_free (ctx);
544 }
545
546 static void
547 mq_stream_ctx_ref (MqStreamCtx * ctx)
548 {
549   g_atomic_int_inc (&ctx->refcount);
550 }
551
552 static void
553 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
554 {
555   ctx->sink_pad_block_id = 0;
556   mq_stream_ctx_unref (ctx);
557 }
558
559 static void
560 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
561 {
562   ctx->src_pad_block_id = 0;
563   mq_stream_ctx_unref (ctx);
564 }
565
566 static void
567 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
568 {
569   gchar *location = NULL;
570   GstMessage *msg;
571   const gchar *msg_name = opened ?
572       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
573
574   g_object_get (splitmux->sink, "location", &location, NULL);
575
576   msg = gst_message_new_element (GST_OBJECT (splitmux),
577       gst_structure_new (msg_name,
578           "location", G_TYPE_STRING, location,
579           "running-time", GST_TYPE_CLOCK_TIME,
580           splitmux->reference_ctx->out_running_time, NULL));
581   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
582
583   g_free (location);
584 }
585
586 /* Called with lock held, drops the lock to send EOS to the
587  * pad
588  */
589 static void
590 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
591 {
592   GstEvent *eos;
593   GstPad *pad;
594
595   eos = gst_event_new_eos ();
596   pad = gst_pad_get_peer (ctx->srcpad);
597
598   ctx->out_eos = TRUE;
599
600   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
601   GST_SPLITMUX_UNLOCK (splitmux);
602   gst_pad_send_event (pad, eos);
603   GST_SPLITMUX_LOCK (splitmux);
604
605   gst_object_unref (pad);
606 }
607
608 /* Called with splitmux lock held to check if this output
609  * context needs to sleep to wait for the release of the
610  * next GOP, or to send EOS to close out the current file
611  */
612 static void
613 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
614 {
615   if (ctx->caps_change)
616     return;
617
618   do {
619     /* When first starting up, the reference stream has to output
620      * the first buffer to prepare the muxer and sink */
621     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
622
623     if (ctx->flushing
624         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
625       return;
626
627     GST_LOG_OBJECT (ctx->srcpad,
628         "Checking running time %" GST_STIME_FORMAT " against max %"
629         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
630         GST_STIME_ARGS (splitmux->max_out_running_time));
631
632     if (can_output) {
633       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
634           ctx->out_running_time < splitmux->max_out_running_time) {
635         return;
636       }
637
638       switch (splitmux->output_state) {
639         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
640           /* We only get here if we've finished outputting a GOP and need to know
641            * what to do next */
642           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
643           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
644           continue;
645
646         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
647           /* We've reached the max out running_time to get here, so end this file now */
648           if (ctx->out_eos == FALSE) {
649             send_eos (splitmux, ctx);
650             continue;
651           }
652           break;
653         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
654           if (ctx->is_reference) {
655             /* Special handling on the reference ctx to start new fragments
656              * and collect commands from the command queue */
657             /* drops the splitmux lock briefly: */
658             start_next_fragment (splitmux, ctx);
659             continue;
660           }
661           break;
662
663         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
664           do {
665             SplitMuxOutputCommand *cmd =
666                 g_queue_pop_tail (&splitmux->out_cmd_q);
667             if (cmd != NULL) {
668               /* If we pop the last command, we need to make our queues bigger */
669               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
670                 grow_blocked_queues (splitmux);
671
672               if (cmd->start_new_fragment) {
673                 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
674                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
675               } else {
676                 GST_DEBUG_OBJECT (splitmux,
677                     "Got new output cmd for time %" GST_STIME_FORMAT,
678                     GST_STIME_ARGS (cmd->max_output_ts));
679
680                 /* Extend the output range immediately */
681                 splitmux->max_out_running_time = cmd->max_output_ts;
682                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
683               }
684               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
685
686               out_cmd_buf_free (cmd);
687               break;
688             } else {
689               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
690             }
691           } while (splitmux->output_state ==
692               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
693           /* loop and re-check the state */
694           continue;
695         }
696         case SPLITMUX_OUTPUT_STATE_STOPPED:
697           return;
698       }
699     }
700
701     GST_INFO_OBJECT (ctx->srcpad,
702         "Sleeping for running time %"
703         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
704         GST_STIME_ARGS (ctx->out_running_time),
705         GST_STIME_ARGS (splitmux->max_out_running_time));
706     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
707     GST_INFO_OBJECT (ctx->srcpad,
708         "Woken for new max running time %" GST_STIME_FORMAT,
709         GST_STIME_ARGS (splitmux->max_out_running_time));
710   }
711   while (1);
712 }
713
714 static GstClockTime
715 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
716     const GstVideoTimeCode * cur_tc)
717 {
718   GstVideoTimeCode *target_tc;
719   GstVideoTimeCodeInterval *tc_inter;
720   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
721
722   if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
723     return GST_CLOCK_TIME_NONE;
724
725   tc_inter =
726       gst_video_time_code_interval_new_from_string
727       (splitmux->threshold_timecode_str);
728   target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
729   gst_video_time_code_interval_free (tc_inter);
730
731   /* Convert to ns */
732   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
733   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
734
735   /* Add fragment_start_time, accounting for wraparound */
736   if (target_tc_time >= cur_tc_time) {
737     next_max_tc_time =
738         target_tc_time - cur_tc_time + splitmux->fragment_start_time;
739   } else {
740     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
741
742     next_max_tc_time =
743         day_in_ns - cur_tc_time + target_tc_time +
744         splitmux->fragment_start_time;
745   }
746   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
747       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
748       GST_TIME_ARGS (cur_tc_time));
749   gst_video_time_code_free (target_tc);
750
751   return next_max_tc_time;
752 }
753
754 static gboolean
755 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
756 {
757   GstEvent *ev;
758   GstClockTime target_time;
759   gboolean timecode_based = FALSE;
760
761   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
762   if (splitmux->threshold_timecode_str) {
763     GstVideoTimeCodeMeta *tc_meta;
764
765     if (buffer != NULL) {
766       tc_meta = gst_buffer_get_video_time_code_meta (buffer);
767       if (tc_meta) {
768         splitmux->next_max_tc_time =
769             calculate_next_max_timecode (splitmux, &tc_meta->tc);
770         timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
771       }
772     } else {
773       /* This can happen in the presence of GAP events that trigger
774        * a new fragment start */
775       GST_WARNING_OBJECT (splitmux,
776           "No buffer available to calculate next timecode");
777     }
778   }
779
780   if (splitmux->send_keyframe_requests == FALSE
781       || (splitmux->threshold_time == 0 && !timecode_based)
782       || splitmux->threshold_bytes != 0)
783     return TRUE;
784
785   if (timecode_based) {
786     /* We might have rounding errors: aim slightly earlier */
787     target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
788   } else {
789     target_time = splitmux->fragment_start_time + splitmux->threshold_time;
790   }
791   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
792   GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
793       GST_TIME_ARGS (target_time));
794   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
795 }
796
797 static GstPadProbeReturn
798 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
799 {
800   GstSplitMuxSink *splitmux = ctx->splitmux;
801   MqStreamBuf *buf_info = NULL;
802
803   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
804
805   /* FIXME: Handle buffer lists, until then make it clear they won't work */
806   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
807     g_warning ("Buffer list handling not implemented");
808     return GST_PAD_PROBE_DROP;
809   }
810   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
811       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
812     GstEvent *event = gst_pad_probe_info_get_event (info);
813     gboolean locked = FALSE;
814
815     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
816
817     switch (GST_EVENT_TYPE (event)) {
818       case GST_EVENT_SEGMENT:
819         gst_event_copy_segment (event, &ctx->out_segment);
820         break;
821       case GST_EVENT_FLUSH_STOP:
822         GST_SPLITMUX_LOCK (splitmux);
823         locked = TRUE;
824         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
825         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
826         g_queue_clear (&ctx->queued_bufs);
827         ctx->flushing = FALSE;
828         break;
829       case GST_EVENT_FLUSH_START:
830         GST_SPLITMUX_LOCK (splitmux);
831         locked = TRUE;
832         GST_LOG_OBJECT (pad, "Flush start");
833         ctx->flushing = TRUE;
834         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
835         break;
836       case GST_EVENT_EOS:
837         GST_SPLITMUX_LOCK (splitmux);
838         locked = TRUE;
839         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
840           goto beach;
841         ctx->out_eos = TRUE;
842         break;
843       case GST_EVENT_GAP:{
844         GstClockTime gap_ts;
845         GstClockTimeDiff rtime;
846
847         gst_event_parse_gap (event, &gap_ts, NULL);
848         if (gap_ts == GST_CLOCK_TIME_NONE)
849           break;
850
851         GST_SPLITMUX_LOCK (splitmux);
852         locked = TRUE;
853
854         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
855           goto beach;
856
857         /* When we get a gap event on the
858          * reference stream and we're trying to open a
859          * new file, we need to store it until we get
860          * the buffer afterwards
861          */
862         if (ctx->is_reference &&
863             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
864           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
865           gst_event_replace (&ctx->pending_gap, event);
866           GST_SPLITMUX_UNLOCK (splitmux);
867           return GST_PAD_PROBE_HANDLED;
868         }
869
870         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
871
872         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
873             GST_STIME_ARGS (rtime));
874
875         if (rtime != GST_CLOCK_STIME_NONE) {
876           ctx->out_running_time = rtime;
877           complete_or_wait_on_out (splitmux, ctx);
878         }
879         break;
880       }
881       case GST_EVENT_CUSTOM_DOWNSTREAM:{
882         const GstStructure *s;
883         GstClockTimeDiff ts = 0;
884
885         s = gst_event_get_structure (event);
886         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
887           break;
888
889         gst_structure_get_int64 (s, "timestamp", &ts);
890
891         GST_SPLITMUX_LOCK (splitmux);
892         locked = TRUE;
893
894         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
895           goto beach;
896         ctx->out_running_time = ts;
897         if (!ctx->is_reference)
898           complete_or_wait_on_out (splitmux, ctx);
899         GST_SPLITMUX_UNLOCK (splitmux);
900         return GST_PAD_PROBE_DROP;
901       }
902       case GST_EVENT_CAPS:{
903         GstPad *peer;
904
905         if (!ctx->is_reference)
906           break;
907
908         peer = gst_pad_get_peer (pad);
909         if (peer) {
910           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
911
912           gst_object_unref (peer);
913
914           if (ok)
915             break;
916         } else {
917           break;
918         }
919         /* This is in the case the muxer doesn't allow this change of caps */
920
921         GST_SPLITMUX_LOCK (splitmux);
922         locked = TRUE;
923         ctx->caps_change = TRUE;
924         splitmux->ready_for_output = FALSE;
925
926         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
927
928           if (ctx->out_eos == FALSE) {
929             send_eos (splitmux, ctx);
930           }
931           splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
932         }
933
934         /* Lets it fall through, if it fails again, then the muxer just can't
935          * support this format, but at least we have a closed file.
936          */
937         break;
938       }
939       default:
940         break;
941     }
942
943     /* We need to make sure events aren't passed
944      * until the muxer / sink are ready for it */
945     if (!locked)
946       GST_SPLITMUX_LOCK (splitmux);
947     if (!ctx->is_reference)
948       complete_or_wait_on_out (splitmux, ctx);
949     GST_SPLITMUX_UNLOCK (splitmux);
950
951     /* Don't try to forward sticky events before the next buffer is there
952      * because it would cause a new file to be created without the first
953      * buffer being available.
954      */
955     if (ctx->caps_change && GST_EVENT_IS_STICKY (event))
956       return GST_PAD_PROBE_DROP;
957     else
958       return GST_PAD_PROBE_PASS;
959   }
960
961   /* Allow everything through until the configured next stopping point */
962   GST_SPLITMUX_LOCK (splitmux);
963
964   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
965   if (buf_info == NULL)
966     /* Can only happen due to a poorly timed flush */
967     goto beach;
968
969   /* If we have popped a keyframe, decrement the queued_gop count */
970   if (buf_info->keyframe && splitmux->queued_keyframes > 0)
971     splitmux->queued_keyframes--;
972
973   ctx->out_running_time = buf_info->run_ts;
974   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
975
976   GST_LOG_OBJECT (splitmux,
977       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
978       " size %" G_GUINT64_FORMAT,
979       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
980
981   ctx->caps_change = FALSE;
982
983   complete_or_wait_on_out (splitmux, ctx);
984
985   splitmux->muxed_out_bytes += buf_info->buf_size;
986
987 #ifndef GST_DISABLE_GST_DEBUG
988   {
989     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
990     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
991         " run ts %" GST_STIME_FORMAT, buf,
992         GST_STIME_ARGS (ctx->out_running_time));
993   }
994 #endif
995
996   ctx->cur_out_buffer = NULL;
997   GST_SPLITMUX_UNLOCK (splitmux);
998
999   /* pending_gap is protected by the STREAM lock */
1000   if (ctx->pending_gap) {
1001     /* If we previously stored a gap event, send it now */
1002     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1003
1004     GST_DEBUG_OBJECT (splitmux,
1005         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1006
1007     gst_pad_send_event (peer, ctx->pending_gap);
1008     ctx->pending_gap = NULL;
1009
1010     gst_object_unref (peer);
1011   }
1012
1013   mq_stream_buf_free (buf_info);
1014
1015   return GST_PAD_PROBE_PASS;
1016
1017 beach:
1018   GST_SPLITMUX_UNLOCK (splitmux);
1019   return GST_PAD_PROBE_DROP;
1020 }
1021
1022 static gboolean
1023 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1024 {
1025   return gst_pad_send_event (peer, gst_event_ref (*event));
1026 }
1027
1028 static void
1029 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1030 {
1031   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1032
1033   gst_pad_sticky_events_foreach (ctx->srcpad,
1034       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1035
1036   /* Clear EOS flag if not actually EOS */
1037   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1038
1039   gst_object_unref (peer);
1040 }
1041
1042 /* Called with lock held when a fragment
1043  * reaches EOS and it is time to restart
1044  * a new fragment
1045  */
1046 static void
1047 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1048 {
1049   GstElement *muxer, *sink;
1050
1051   /* 1 change to new file */
1052   splitmux->switching_fragment = TRUE;
1053
1054   /* We need to drop the splitmux lock to acquire the state lock
1055    * here and ensure there's no racy state change going on elsewhere */
1056   muxer = gst_object_ref (splitmux->muxer);
1057   sink = gst_object_ref (splitmux->active_sink);
1058
1059   GST_SPLITMUX_UNLOCK (splitmux);
1060   GST_STATE_LOCK (splitmux);
1061
1062   gst_element_set_locked_state (muxer, TRUE);
1063   gst_element_set_locked_state (sink, TRUE);
1064   gst_element_set_state (muxer, GST_STATE_NULL);
1065   gst_element_set_state (sink, GST_STATE_NULL);
1066
1067   GST_SPLITMUX_LOCK (splitmux);
1068   if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1069     set_next_filename (splitmux, ctx);
1070   splitmux->muxed_out_bytes = 0;
1071   GST_SPLITMUX_UNLOCK (splitmux);
1072
1073   gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1074   gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1075   gst_element_set_locked_state (muxer, FALSE);
1076   gst_element_set_locked_state (sink, FALSE);
1077
1078   gst_object_unref (sink);
1079   gst_object_unref (muxer);
1080
1081   GST_SPLITMUX_LOCK (splitmux);
1082   GST_STATE_UNLOCK (splitmux);
1083   splitmux->switching_fragment = FALSE;
1084   do_async_done (splitmux);
1085
1086   splitmux->ready_for_output = TRUE;
1087
1088   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1089
1090   send_fragment_opened_closed_msg (splitmux, TRUE);
1091
1092   /* FIXME: Is this always the correct next state? */
1093   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1094   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1095 }
1096
1097 static void
1098 bus_handler (GstBin * bin, GstMessage * message)
1099 {
1100   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1101
1102   switch (GST_MESSAGE_TYPE (message)) {
1103     case GST_MESSAGE_EOS:
1104       /* If the state is draining out the current file, drop this EOS */
1105       GST_SPLITMUX_LOCK (splitmux);
1106
1107       send_fragment_opened_closed_msg (splitmux, FALSE);
1108
1109       if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1110         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1111         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1112         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1113
1114         gst_message_unref (message);
1115         GST_SPLITMUX_UNLOCK (splitmux);
1116         return;
1117       } else {
1118         GST_DEBUG_OBJECT (splitmux,
1119             "Passing EOS message. Output state %d max_out_running_time %"
1120             GST_STIME_FORMAT, splitmux->output_state,
1121             GST_STIME_ARGS (splitmux->max_out_running_time));
1122       }
1123       GST_SPLITMUX_UNLOCK (splitmux);
1124       break;
1125     case GST_MESSAGE_ASYNC_START:
1126     case GST_MESSAGE_ASYNC_DONE:
1127       /* Ignore state changes from our children while switching */
1128       if (splitmux->switching_fragment) {
1129         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1130             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1131           GST_LOG_OBJECT (splitmux,
1132               "Ignoring state change from child %" GST_PTR_FORMAT
1133               " while switching", GST_MESSAGE_SRC (message));
1134           gst_message_unref (message);
1135           return;
1136         }
1137       }
1138       break;
1139     default:
1140       break;
1141   }
1142
1143   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1144 }
1145
1146 static void
1147 ctx_set_unblock (MqStreamCtx * ctx)
1148 {
1149   ctx->need_unblock = TRUE;
1150 }
1151
1152 /* Called with splitmux lock held */
1153 /* Called when entering ProcessingCompleteGop state
1154  * Assess if mq contents overflowed the current file
1155  *   -> If yes, need to switch to new file
1156  *   -> if no, set max_out_running_time to let this GOP in and
1157  *      go to COLLECTING_GOP_START state
1158  */
1159 static void
1160 handle_gathered_gop (GstSplitMuxSink * splitmux)
1161 {
1162   guint64 queued_bytes;
1163   GstClockTimeDiff queued_time = 0;
1164   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1165   SplitMuxOutputCommand *cmd;
1166
1167   /* Assess if the multiqueue contents overflowed the current file */
1168   /* When considering if a newly gathered GOP overflows
1169    * the time limit for the file, only consider the running time of the
1170    * reference stream. Other streams might have run ahead a little bit,
1171    * but extra pieces won't be released to the muxer beyond the reference
1172    * stream cut-off anyway - so it forms the limit. */
1173   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1174   queued_time = splitmux->reference_ctx->in_running_time;
1175
1176   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
1177
1178   g_assert (queued_time >= splitmux->fragment_start_time);
1179
1180   queued_time -= splitmux->fragment_start_time;
1181
1182   /* Expand queued bytes estimate by muxer overhead */
1183   queued_bytes += (queued_bytes * splitmux->mux_overhead);
1184
1185   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
1186       " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
1187   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
1188     GST_LOG_OBJECT (splitmux,
1189         "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
1190         GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
1191         GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
1192   }
1193
1194   /* Check for overrun - have we output at least one byte and overrun
1195    * either threshold? */
1196   /* Timecode-based threshold accounts for possible rounding errors:
1197    * 5us should be bigger than all possible rounding errors but nowhere near
1198    * big enough to skip to another frame */
1199   if ((splitmux->fragment_total_bytes > 0 &&
1200           ((splitmux->threshold_bytes > 0 &&
1201                   queued_bytes > splitmux->threshold_bytes) ||
1202               (splitmux->threshold_time > 0 &&
1203                   queued_time > splitmux->threshold_time) ||
1204               (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1205                   splitmux->reference_ctx->in_running_time >
1206                   splitmux->next_max_tc_time + 5 * GST_USECOND)))) {
1207
1208     /* Tell the output side to start a new fragment */
1209     GST_INFO_OBJECT (splitmux,
1210         "This GOP (dur %" GST_STIME_FORMAT
1211         ") would overflow the fragment, Sending start_new_fragment cmd",
1212         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
1213             splitmux->gop_start_time));
1214     cmd = out_cmd_buf_new ();
1215     cmd->start_new_fragment = TRUE;
1216     g_queue_push_head (&splitmux->out_cmd_q, cmd);
1217     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1218
1219     new_out_ts = splitmux->reference_ctx->in_running_time;
1220     splitmux->fragment_start_time = splitmux->gop_start_time;
1221     splitmux->fragment_total_bytes = 0;
1222
1223     if (request_next_keyframe (splitmux,
1224             splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
1225       GST_WARNING_OBJECT (splitmux,
1226           "Could not request a keyframe. Files may not split at the exact location they should");
1227     }
1228     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1229   }
1230
1231   /* And set up to collect the next GOP */
1232   if (!splitmux->reference_ctx->in_eos) {
1233     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
1234     splitmux->gop_start_time = new_out_ts;
1235   } else {
1236     /* This is probably already the current state, but just in case: */
1237     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
1238     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
1239   }
1240
1241   /* And wake all input contexts to send a wake-up event */
1242   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
1243   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1244
1245   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
1246   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
1247
1248   if (splitmux->gop_total_bytes > 0) {
1249     GST_LOG_OBJECT (splitmux,
1250         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
1251         " time %" GST_STIME_FORMAT,
1252         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
1253
1254     /* Send this GOP to the output command queue */
1255     cmd = out_cmd_buf_new ();
1256     cmd->start_new_fragment = FALSE;
1257     cmd->max_output_ts = new_out_ts;
1258     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
1259         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
1260     g_queue_push_head (&splitmux->out_cmd_q, cmd);
1261
1262     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1263   }
1264
1265   splitmux->gop_total_bytes = 0;
1266 }
1267
1268 /* Called with splitmux lock held */
1269 /* Called from each input pad when it is has all the pieces
1270  * for a GOP or EOS, starting with the reference pad which has set the
1271  * splitmux->max_in_running_time
1272  */
1273 static void
1274 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1275 {
1276   GList *cur;
1277   GstEvent *event;
1278
1279   /* On ENDING_FILE, the reference stream sends a command to start a new
1280    * fragment, then releases the GOP for output in the new fragment.
1281    *  If somes streams received no buffer during the last GOP that overran,
1282    * because its next buffer has a timestamp bigger than
1283    * ctx->max_in_running_time, its queue is empty. In that case the only
1284    * way to wakeup the output thread is by injecting an event in the
1285    * queue. This usually happen with subtitle streams.
1286    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1287   if (ctx->need_unblock) {
1288     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
1289     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1290         GST_EVENT_TYPE_SERIALIZED,
1291         gst_structure_new ("splitmuxsink-unblock", "timestamp",
1292             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1293
1294     GST_SPLITMUX_UNLOCK (splitmux);
1295     gst_pad_send_event (ctx->sinkpad, event);
1296     GST_SPLITMUX_LOCK (splitmux);
1297
1298     ctx->need_unblock = FALSE;
1299     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1300     /* state may have changed while we were unlocked. Loop again if so */
1301     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
1302       return;
1303   }
1304
1305   if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
1306     gboolean ready = TRUE;
1307
1308     /* Iterate each pad, and check that the input running time is at least
1309      * up to the reference running time, and if so handle the collected GOP */
1310     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
1311         GST_STIME_FORMAT " ctx %p",
1312         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
1313     for (cur = g_list_first (splitmux->contexts); cur != NULL;
1314         cur = g_list_next (cur)) {
1315       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1316
1317       GST_LOG_OBJECT (splitmux,
1318           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
1319           " EOS %d", tmpctx, tmpctx->srcpad,
1320           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
1321
1322       if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
1323           tmpctx->in_running_time < splitmux->max_in_running_time &&
1324           !tmpctx->in_eos) {
1325         GST_LOG_OBJECT (splitmux,
1326             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
1327             tmpctx, tmpctx->srcpad);
1328         ready = FALSE;
1329         break;
1330       }
1331     }
1332     if (ready) {
1333       GST_DEBUG_OBJECT (splitmux,
1334           "Collected GOP is complete. Processing (ctx %p)", ctx);
1335       /* All pads have a complete GOP, release it into the multiqueue */
1336       handle_gathered_gop (splitmux);
1337     }
1338   }
1339
1340   /* If upstream reached EOS we are not expecting more data, no need to wait
1341    * here. */
1342   if (ctx->in_eos)
1343     return;
1344
1345   /* Some pad is not yet ready, or GOP is being pushed
1346    * either way, sleep and wait to get woken */
1347   while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
1348       !ctx->flushing &&
1349       (ctx->in_running_time >= splitmux->max_in_running_time) &&
1350       (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
1351
1352     GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
1353     GST_SPLITMUX_WAIT_INPUT (splitmux);
1354     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
1355   }
1356 }
1357
1358 static GstPadProbeReturn
1359 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1360 {
1361   GstSplitMuxSink *splitmux = ctx->splitmux;
1362   GstBuffer *buf;
1363   MqStreamBuf *buf_info = NULL;
1364   GstClockTime ts;
1365   gboolean loop_again;
1366   gboolean keyframe = FALSE;
1367
1368   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1369
1370   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1371   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1372     g_warning ("Buffer list handling not implemented");
1373     return GST_PAD_PROBE_DROP;
1374   }
1375   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1376       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1377     GstEvent *event = gst_pad_probe_info_get_event (info);
1378
1379     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1380
1381     switch (GST_EVENT_TYPE (event)) {
1382       case GST_EVENT_SEGMENT:
1383         gst_event_copy_segment (event, &ctx->in_segment);
1384         break;
1385       case GST_EVENT_FLUSH_STOP:
1386         GST_SPLITMUX_LOCK (splitmux);
1387         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1388         ctx->in_eos = FALSE;
1389         ctx->in_running_time = GST_CLOCK_STIME_NONE;
1390         GST_SPLITMUX_UNLOCK (splitmux);
1391         break;
1392       case GST_EVENT_EOS:
1393         GST_SPLITMUX_LOCK (splitmux);
1394         ctx->in_eos = TRUE;
1395
1396         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1397           goto beach;
1398
1399         if (ctx->is_reference) {
1400           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1401           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
1402           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
1403           /* Wake up other input pads to collect this GOP */
1404           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1405           check_completed_gop (splitmux, ctx);
1406         } else if (splitmux->input_state ==
1407             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
1408           /* If we are waiting for a GOP to be completed (ie, for aux
1409            * pads to catch up), then this pad is complete, so check
1410            * if the whole GOP is.
1411            */
1412           check_completed_gop (splitmux, ctx);
1413         }
1414         GST_SPLITMUX_UNLOCK (splitmux);
1415         break;
1416       case GST_EVENT_GAP:{
1417         GstClockTime gap_ts;
1418         GstClockTimeDiff rtime;
1419
1420         gst_event_parse_gap (event, &gap_ts, NULL);
1421         if (gap_ts == GST_CLOCK_TIME_NONE)
1422           break;
1423
1424         GST_SPLITMUX_LOCK (splitmux);
1425
1426         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1427           goto beach;
1428         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
1429
1430         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1431             GST_STIME_ARGS (rtime));
1432
1433         if (ctx->is_reference
1434             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
1435           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
1436           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1437               GST_STIME_ARGS (splitmux->fragment_start_time));
1438           /* Also take this as the first start time when starting up,
1439            * so that we start counting overflow from the first frame */
1440           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1441             splitmux->max_in_running_time = splitmux->fragment_start_time;
1442         }
1443
1444         GST_SPLITMUX_UNLOCK (splitmux);
1445         break;
1446       }
1447       default:
1448         break;
1449     }
1450     return GST_PAD_PROBE_PASS;
1451   }
1452
1453   buf = gst_pad_probe_info_get_buffer (info);
1454   buf_info = mq_stream_buf_new ();
1455
1456   if (GST_BUFFER_PTS_IS_VALID (buf))
1457     ts = GST_BUFFER_PTS (buf);
1458   else
1459     ts = GST_BUFFER_DTS (buf);
1460
1461   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1462
1463   GST_SPLITMUX_LOCK (splitmux);
1464
1465   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1466     goto beach;
1467
1468   /* If this buffer has a timestamp, advance the input timestamp of the
1469    * stream */
1470   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1471     GstClockTimeDiff running_time =
1472         my_segment_to_running_time (&ctx->in_segment, ts);
1473
1474     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1475         GST_STIME_ARGS (running_time));
1476
1477     if (GST_CLOCK_STIME_IS_VALID (running_time)
1478         && running_time > ctx->in_running_time)
1479       ctx->in_running_time = running_time;
1480   }
1481
1482   /* Try to make sure we have a valid running time */
1483   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1484     ctx->in_running_time =
1485         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1486   }
1487
1488   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1489       GST_STIME_ARGS (ctx->in_running_time));
1490
1491   buf_info->run_ts = ctx->in_running_time;
1492   buf_info->buf_size = gst_buffer_get_size (buf);
1493   buf_info->duration = GST_BUFFER_DURATION (buf);
1494
1495   /* initialize fragment_start_time */
1496   if (ctx->is_reference
1497       && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
1498     splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
1499     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1500         GST_STIME_ARGS (splitmux->fragment_start_time));
1501     gst_buffer_replace (&ctx->prev_in_keyframe, buf);
1502
1503     /* Also take this as the first start time when starting up,
1504      * so that we start counting overflow from the first frame */
1505     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1506       splitmux->max_in_running_time = splitmux->fragment_start_time;
1507     if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
1508       GST_WARNING_OBJECT (splitmux,
1509           "Could not request a keyframe. Files may not split at the exact location they should");
1510     }
1511     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1512   }
1513
1514   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1515       " total GOP bytes %" G_GUINT64_FORMAT,
1516       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
1517
1518   loop_again = TRUE;
1519   do {
1520     if (ctx->flushing)
1521       break;
1522
1523     switch (splitmux->input_state) {
1524       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
1525         if (ctx->is_reference) {
1526           /* If a keyframe, we have a complete GOP */
1527           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1528               !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1529               splitmux->max_in_running_time >= ctx->in_running_time) {
1530             /* Pass this buffer through */
1531             loop_again = FALSE;
1532             /* Allow other input pads to catch up to here too */
1533             splitmux->max_in_running_time = ctx->in_running_time;
1534             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1535             break;
1536           }
1537           GST_INFO_OBJECT (pad,
1538               "Have keyframe with running time %" GST_STIME_FORMAT,
1539               GST_STIME_ARGS (ctx->in_running_time));
1540           keyframe = TRUE;
1541           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
1542           splitmux->max_in_running_time = ctx->in_running_time;
1543           /* Wake up other input pads to collect this GOP */
1544           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1545           check_completed_gop (splitmux, ctx);
1546           /* Store this new keyframe to remember the start of GOP */
1547           gst_buffer_replace (&ctx->prev_in_keyframe, buf);
1548         } else {
1549           /* Pass this buffer if the reference ctx is far enough ahead */
1550           if (ctx->in_running_time < splitmux->max_in_running_time) {
1551             loop_again = FALSE;
1552             break;
1553           }
1554
1555           /* We're still waiting for a keyframe on the reference pad, sleep */
1556           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1557           GST_SPLITMUX_WAIT_INPUT (splitmux);
1558           GST_LOG_OBJECT (pad,
1559               "Done sleeping for GOP start input state now %d",
1560               splitmux->input_state);
1561         }
1562         break;
1563       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
1564         /* We're collecting a GOP. If this is the reference context,
1565          * we need to check if this is a keyframe that marks the start
1566          * of the next GOP. If it is, it marks the end of the GOP we're
1567          * collecting, so sleep and wait until all the other pads also
1568          * reach that timestamp - at which point, we have an entire GOP
1569          * and either go to ENDING_FILE or release this GOP to the muxer and
1570          * go back to COLLECT_GOP_START. */
1571
1572         /* If we overran the target timestamp, it might be time to process
1573          * the GOP, otherwise bail out for more data
1574          */
1575         GST_LOG_OBJECT (pad,
1576             "Checking TS %" GST_STIME_FORMAT " against max %"
1577             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
1578             GST_STIME_ARGS (splitmux->max_in_running_time));
1579
1580         if (ctx->in_running_time < splitmux->max_in_running_time) {
1581           loop_again = FALSE;
1582           break;
1583         }
1584
1585         GST_LOG_OBJECT (pad,
1586             "Collected last packet of GOP. Checking other pads");
1587         check_completed_gop (splitmux, ctx);
1588         break;
1589       }
1590       case SPLITMUX_INPUT_STATE_FINISHING_UP:
1591         loop_again = FALSE;
1592         break;
1593       default:
1594         loop_again = FALSE;
1595         break;
1596     }
1597   }
1598   while (loop_again);
1599
1600   if (keyframe) {
1601     splitmux->queued_keyframes++;
1602     buf_info->keyframe = TRUE;
1603   }
1604
1605   /* Update total input byte counter for overflow detect */
1606   splitmux->gop_total_bytes += buf_info->buf_size;
1607
1608   /* Now add this buffer to the queue just before returning */
1609   g_queue_push_head (&ctx->queued_bufs, buf_info);
1610
1611   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1612       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1613
1614   GST_SPLITMUX_UNLOCK (splitmux);
1615   return GST_PAD_PROBE_PASS;
1616
1617 beach:
1618   GST_SPLITMUX_UNLOCK (splitmux);
1619   if (buf_info)
1620     mq_stream_buf_free (buf_info);
1621   return GST_PAD_PROBE_PASS;
1622 }
1623
1624 static void
1625 grow_blocked_queues (GstSplitMuxSink * splitmux)
1626 {
1627   GList *cur;
1628
1629   /* Scan other queues for full-ness and grow them */
1630   for (cur = g_list_first (splitmux->contexts);
1631       cur != NULL; cur = g_list_next (cur)) {
1632     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1633     guint cur_limit;
1634     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
1635
1636     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
1637     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
1638
1639     if (cur_len >= cur_limit) {
1640       cur_limit = cur_len + 1;
1641       GST_DEBUG_OBJECT (tmpctx->q,
1642           "Queue overflowed and needs enlarging. Growing to %u buffers",
1643           cur_limit);
1644       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
1645     }
1646   }
1647 }
1648
1649 static void
1650 handle_q_underrun (GstElement * q, gpointer user_data)
1651 {
1652   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
1653   GstSplitMuxSink *splitmux = ctx->splitmux;
1654
1655   GST_SPLITMUX_LOCK (splitmux);
1656   GST_DEBUG_OBJECT (q,
1657       "Queue reported underrun with %d keyframes and %d cmds enqueued",
1658       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
1659   grow_blocked_queues (splitmux);
1660   GST_SPLITMUX_UNLOCK (splitmux);
1661 }
1662
1663 static void
1664 handle_q_overrun (GstElement * q, gpointer user_data)
1665 {
1666   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
1667   GstSplitMuxSink *splitmux = ctx->splitmux;
1668   gboolean allow_grow = FALSE;
1669
1670   GST_SPLITMUX_LOCK (splitmux);
1671   GST_DEBUG_OBJECT (q,
1672       "Queue reported overrun with %d keyframes and %d cmds enqueued",
1673       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
1674
1675   if (splitmux->queued_keyframes < 2) {
1676     /* Less than a full GOP queued, grow the queue */
1677     allow_grow = TRUE;
1678   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
1679     allow_grow = TRUE;
1680   } else {
1681     /* If another queue is starved, grow */
1682     GList *cur;
1683     for (cur = g_list_first (splitmux->contexts);
1684         cur != NULL; cur = g_list_next (cur)) {
1685       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1686       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1687         allow_grow = TRUE;
1688       }
1689     }
1690   }
1691   GST_SPLITMUX_UNLOCK (splitmux);
1692
1693   if (allow_grow) {
1694     guint cur_limit;
1695
1696     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
1697     cur_limit++;
1698
1699     GST_DEBUG_OBJECT (q,
1700         "Queue overflowed and needs enlarging. Growing to %u buffers",
1701         cur_limit);
1702
1703     g_object_set (q, "max-size-buffers", cur_limit, NULL);
1704   }
1705 }
1706
1707 static GstPad *
1708 gst_splitmux_sink_request_new_pad (GstElement * element,
1709     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1710 {
1711   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1712   GstPadTemplate *mux_template = NULL;
1713   GstPad *res = NULL;
1714   GstElement *q;
1715   GstPad *q_sink = NULL, *q_src = NULL;
1716   gchar *gname;
1717   gboolean is_video = FALSE;
1718   MqStreamCtx *ctx;
1719
1720   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1721
1722   GST_SPLITMUX_LOCK (splitmux);
1723   if (!create_muxer (splitmux))
1724     goto fail;
1725
1726   if (templ->name_template) {
1727     if (g_str_equal (templ->name_template, "video")) {
1728       if (splitmux->have_video)
1729         goto already_have_video;
1730
1731       /* FIXME: Look for a pad template with matching caps, rather than by name */
1732       mux_template =
1733           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1734           (splitmux->muxer), "video_%u");
1735
1736       /* Fallback to find sink pad templates named 'video' (flvmux) */
1737       if (!mux_template) {
1738         mux_template =
1739             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1740             (splitmux->muxer), "video");
1741       }
1742       is_video = TRUE;
1743       name = NULL;
1744     } else {
1745       mux_template =
1746           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1747           (splitmux->muxer), templ->name_template);
1748
1749       /* Fallback to find sink pad templates named 'audio' (flvmux) */
1750       if (!mux_template) {
1751         mux_template =
1752             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1753             (splitmux->muxer), "audio");
1754       }
1755     }
1756     if (mux_template == NULL) {
1757       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1758       mux_template =
1759           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1760           (splitmux->muxer), "sink_%d");
1761     }
1762   }
1763
1764   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1765   if (res == NULL)
1766     goto fail;
1767
1768   if (is_video)
1769     gname = g_strdup ("video");
1770   else if (name == NULL)
1771     gname = gst_pad_get_name (res);
1772   else
1773     gname = g_strdup (name);
1774
1775   if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
1776     goto fail;
1777
1778   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
1779
1780   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
1781       "max-size-buffers", 5, NULL);
1782
1783   q_sink = gst_element_get_static_pad (q, "sink");
1784   q_src = gst_element_get_static_pad (q, "src");
1785
1786   if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
1787     gst_element_release_request_pad (splitmux->muxer, res);
1788     gst_object_unref (GST_OBJECT (res));
1789     goto fail;
1790   }
1791
1792   gst_object_unref (GST_OBJECT (res));
1793
1794   ctx = mq_stream_ctx_new (splitmux);
1795   /* Context holds a ref: */
1796   ctx->q = gst_object_ref (q);
1797   ctx->srcpad = q_src;
1798   ctx->sinkpad = q_sink;
1799   ctx->q_overrun_id =
1800       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
1801   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
1802
1803   mq_stream_ctx_ref (ctx);
1804   ctx->src_pad_block_id =
1805       gst_pad_add_probe (q_src,
1806       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
1807       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1808       _pad_block_destroy_src_notify);
1809   if (is_video && splitmux->reference_ctx != NULL) {
1810     splitmux->reference_ctx->is_reference = FALSE;
1811     splitmux->reference_ctx = NULL;
1812   }
1813   if (splitmux->reference_ctx == NULL) {
1814     splitmux->reference_ctx = ctx;
1815     ctx->is_reference = TRUE;
1816   }
1817
1818   res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
1819   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1820
1821   mq_stream_ctx_ref (ctx);
1822   ctx->sink_pad_block_id =
1823       gst_pad_add_probe (q_sink,
1824       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
1825       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1826       _pad_block_destroy_sink_notify);
1827
1828   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1829       " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
1830
1831   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1832
1833   g_free (gname);
1834
1835   if (is_video)
1836     splitmux->have_video = TRUE;
1837
1838   gst_pad_set_active (res, TRUE);
1839   gst_element_add_pad (element, res);
1840
1841   GST_SPLITMUX_UNLOCK (splitmux);
1842
1843   return res;
1844 fail:
1845   GST_SPLITMUX_UNLOCK (splitmux);
1846
1847   if (q_sink)
1848     gst_object_unref (q_sink);
1849   if (q_src)
1850     gst_object_unref (q_src);
1851   return NULL;
1852 already_have_video:
1853   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
1854   GST_SPLITMUX_UNLOCK (splitmux);
1855   return NULL;
1856 }
1857
1858 static void
1859 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1860 {
1861   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1862   GstPad *muxpad = NULL;
1863   MqStreamCtx *ctx =
1864       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1865
1866   GST_SPLITMUX_LOCK (splitmux);
1867
1868   if (splitmux->muxer == NULL)
1869     goto fail;                  /* Elements don't exist yet - nothing to release */
1870
1871   GST_INFO_OBJECT (pad, "releasing request pad");
1872
1873   muxpad = gst_pad_get_peer (ctx->srcpad);
1874
1875   /* Remove the context from our consideration */
1876   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1877
1878   if (ctx->sink_pad_block_id)
1879     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1880
1881   if (ctx->src_pad_block_id)
1882     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1883
1884   /* Can release the context now */
1885   mq_stream_ctx_unref (ctx);
1886   if (ctx == splitmux->reference_ctx)
1887     splitmux->reference_ctx = NULL;
1888
1889   /* Release and free the muxer input */
1890   if (muxpad) {
1891     gst_element_release_request_pad (splitmux->muxer, muxpad);
1892     gst_object_unref (muxpad);
1893   }
1894
1895   if (GST_PAD_PAD_TEMPLATE (pad) &&
1896       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
1897               (pad)), "video"))
1898     splitmux->have_video = FALSE;
1899
1900   gst_element_remove_pad (element, pad);
1901
1902   /* Reset the internal elements only after all request pads are released */
1903   if (splitmux->contexts == NULL)
1904     gst_splitmux_reset (splitmux);
1905
1906 fail:
1907   GST_SPLITMUX_UNLOCK (splitmux);
1908 }
1909
1910 static GstElement *
1911 create_element (GstSplitMuxSink * splitmux,
1912     const gchar * factory, const gchar * name, gboolean locked)
1913 {
1914   GstElement *ret = gst_element_factory_make (factory, name);
1915   if (ret == NULL) {
1916     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1917     return NULL;
1918   }
1919
1920   if (locked) {
1921     /* Ensure the sink starts in locked state and NULL - it will be changed
1922      * by the filename setting code */
1923     gst_element_set_locked_state (ret, TRUE);
1924     gst_element_set_state (ret, GST_STATE_NULL);
1925   }
1926
1927   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1928     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1929     gst_object_unref (ret);
1930     return NULL;
1931   }
1932
1933   return ret;
1934 }
1935
1936 static gboolean
1937 create_muxer (GstSplitMuxSink * splitmux)
1938 {
1939   /* Create internal elements */
1940   if (splitmux->muxer == NULL) {
1941     GstElement *provided_muxer = NULL;
1942
1943     GST_OBJECT_LOCK (splitmux);
1944     if (splitmux->provided_muxer != NULL)
1945       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1946     GST_OBJECT_UNLOCK (splitmux);
1947
1948     if (provided_muxer == NULL) {
1949       if ((splitmux->muxer =
1950               create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
1951         goto fail;
1952     } else {
1953       /* Ensure it's not in locked state (we might be reusing an old element) */
1954       gst_element_set_locked_state (provided_muxer, FALSE);
1955       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1956         g_warning ("Could not add muxer element - splitmuxsink will not work");
1957         gst_object_unref (provided_muxer);
1958         goto fail;
1959       }
1960
1961       splitmux->muxer = provided_muxer;
1962       gst_object_unref (provided_muxer);
1963     }
1964   }
1965
1966   return TRUE;
1967 fail:
1968   return FALSE;
1969 }
1970
1971 static GstElement *
1972 find_sink (GstElement * e)
1973 {
1974   GstElement *res = NULL;
1975   GstIterator *iter;
1976   gboolean done = FALSE;
1977   GValue data = { 0, };
1978
1979   if (!GST_IS_BIN (e))
1980     return e;
1981
1982   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
1983     return e;
1984
1985   iter = gst_bin_iterate_sinks (GST_BIN (e));
1986   while (!done) {
1987     switch (gst_iterator_next (iter, &data)) {
1988       case GST_ITERATOR_OK:
1989       {
1990         GstElement *child = g_value_get_object (&data);
1991         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1992                 "location") != NULL) {
1993           res = child;
1994           done = TRUE;
1995         }
1996         g_value_reset (&data);
1997         break;
1998       }
1999       case GST_ITERATOR_RESYNC:
2000         gst_iterator_resync (iter);
2001         break;
2002       case GST_ITERATOR_DONE:
2003         done = TRUE;
2004         break;
2005       case GST_ITERATOR_ERROR:
2006         g_assert_not_reached ();
2007         break;
2008     }
2009   }
2010   g_value_unset (&data);
2011   gst_iterator_free (iter);
2012
2013   return res;
2014 }
2015
2016 static gboolean
2017 create_sink (GstSplitMuxSink * splitmux)
2018 {
2019   GstElement *provided_sink = NULL;
2020
2021   if (splitmux->active_sink == NULL) {
2022
2023     GST_OBJECT_LOCK (splitmux);
2024     if (splitmux->provided_sink != NULL)
2025       provided_sink = gst_object_ref (splitmux->provided_sink);
2026     GST_OBJECT_UNLOCK (splitmux);
2027
2028     if (provided_sink == NULL) {
2029       if ((splitmux->sink =
2030               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2031         goto fail;
2032       splitmux->active_sink = splitmux->sink;
2033     } else {
2034       /* Ensure the sink starts in locked state and NULL - it will be changed
2035        * by the filename setting code */
2036       gst_element_set_locked_state (provided_sink, TRUE);
2037       gst_element_set_state (provided_sink, GST_STATE_NULL);
2038       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2039         g_warning ("Could not add sink elements - splitmuxsink will not work");
2040         gst_object_unref (provided_sink);
2041         goto fail;
2042       }
2043
2044       splitmux->active_sink = provided_sink;
2045
2046       /* The bin holds a ref now, we can drop our tmp ref */
2047       gst_object_unref (provided_sink);
2048
2049       /* Find the sink element */
2050       splitmux->sink = find_sink (splitmux->active_sink);
2051       if (splitmux->sink == NULL) {
2052         g_warning
2053             ("Could not locate sink element in provided sink - splitmuxsink will not work");
2054         goto fail;
2055       }
2056     }
2057
2058 #if 1
2059     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2060             "async") != NULL) {
2061       /* async child elements are causing state change races and weird
2062        * failures, so let's try and turn that off */
2063       g_object_set (splitmux->sink, "async", FALSE, NULL);
2064     }
2065 #endif
2066
2067     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2068       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2069       goto fail;
2070     }
2071   }
2072
2073   return TRUE;
2074 fail:
2075   return FALSE;
2076 }
2077
2078 #ifdef __GNUC__
2079 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2080 #endif
2081 static void
2082 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2083 {
2084   gchar *fname = NULL;
2085   GstSample *sample;
2086   GstCaps *caps;
2087
2088   gst_splitmux_sink_ensure_max_files (splitmux);
2089
2090   if (ctx->cur_out_buffer == NULL) {
2091     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
2092   }
2093
2094   caps = gst_pad_get_current_caps (ctx->srcpad);
2095   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
2096   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
2097       splitmux->fragment_id, sample, &fname);
2098   gst_sample_unref (sample);
2099   if (caps)
2100     gst_caps_unref (caps);
2101
2102   if (fname == NULL) {
2103     /* Fallback to the old signal if the new one returned nothing */
2104     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
2105         splitmux->fragment_id, &fname);
2106   }
2107
2108   if (!fname)
2109     fname = splitmux->location ?
2110         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
2111
2112   if (fname) {
2113     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
2114     g_object_set (splitmux->sink, "location", fname, NULL);
2115     g_free (fname);
2116
2117     splitmux->fragment_id++;
2118   }
2119 }
2120
2121 static void
2122 do_async_start (GstSplitMuxSink * splitmux)
2123 {
2124   GstMessage *message;
2125
2126   if (!splitmux->need_async_start) {
2127     GST_INFO_OBJECT (splitmux, "no async_start needed");
2128     return;
2129   }
2130
2131   splitmux->async_pending = TRUE;
2132
2133   GST_INFO_OBJECT (splitmux, "Sending async_start message");
2134   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
2135   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2136       (splitmux), message);
2137 }
2138
2139 static void
2140 do_async_done (GstSplitMuxSink * splitmux)
2141 {
2142   GstMessage *message;
2143
2144   if (splitmux->async_pending) {
2145     GST_INFO_OBJECT (splitmux, "Sending async_done message");
2146     message =
2147         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
2148         GST_CLOCK_TIME_NONE);
2149     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2150         (splitmux), message);
2151
2152     splitmux->async_pending = FALSE;
2153   }
2154
2155   splitmux->need_async_start = FALSE;
2156 }
2157
2158 static GstStateChangeReturn
2159 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
2160 {
2161   GstStateChangeReturn ret;
2162   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2163
2164   switch (transition) {
2165     case GST_STATE_CHANGE_NULL_TO_READY:{
2166       GST_SPLITMUX_LOCK (splitmux);
2167       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
2168         ret = GST_STATE_CHANGE_FAILURE;
2169         GST_SPLITMUX_UNLOCK (splitmux);
2170         goto beach;
2171       }
2172       GST_SPLITMUX_UNLOCK (splitmux);
2173       splitmux->fragment_id = 0;
2174       break;
2175     }
2176     case GST_STATE_CHANGE_READY_TO_PAUSED:{
2177       GST_SPLITMUX_LOCK (splitmux);
2178       /* Start by collecting one input on each pad */
2179       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2180       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2181       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
2182       splitmux->gop_start_time = splitmux->fragment_start_time =
2183           GST_CLOCK_STIME_NONE;
2184       splitmux->muxed_out_bytes = 0;
2185       splitmux->ready_for_output = FALSE;
2186       GST_SPLITMUX_UNLOCK (splitmux);
2187       break;
2188     }
2189     case GST_STATE_CHANGE_PAUSED_TO_READY:
2190     case GST_STATE_CHANGE_READY_TO_NULL:
2191       GST_SPLITMUX_LOCK (splitmux);
2192       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
2193       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
2194       /* Wake up any blocked threads */
2195       GST_LOG_OBJECT (splitmux,
2196           "State change -> NULL or READY. Waking threads");
2197       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2198       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2199       GST_SPLITMUX_UNLOCK (splitmux);
2200       break;
2201     default:
2202       break;
2203   }
2204
2205   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2206   if (ret == GST_STATE_CHANGE_FAILURE)
2207     goto beach;
2208
2209   switch (transition) {
2210     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2211       splitmux->need_async_start = TRUE;
2212       break;
2213     case GST_STATE_CHANGE_READY_TO_PAUSED:{
2214       /* Change state async, because our child sink might not
2215        * be ready to do that for us yet if it's state is still locked */
2216
2217       splitmux->need_async_start = TRUE;
2218       /* we want to go async to PAUSED until we managed to configure and add the
2219        * sink */
2220       GST_SPLITMUX_LOCK (splitmux);
2221       do_async_start (splitmux);
2222       GST_SPLITMUX_UNLOCK (splitmux);
2223       ret = GST_STATE_CHANGE_ASYNC;
2224       break;
2225     }
2226     case GST_STATE_CHANGE_READY_TO_NULL:
2227       GST_SPLITMUX_LOCK (splitmux);
2228       splitmux->fragment_id = 0;
2229       /* Reset internal elements only if no pad contexts are using them */
2230       if (splitmux->contexts == NULL)
2231         gst_splitmux_reset (splitmux);
2232       do_async_done (splitmux);
2233       GST_SPLITMUX_UNLOCK (splitmux);
2234       break;
2235     default:
2236       break;
2237   }
2238
2239 beach:
2240   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
2241       ret == GST_STATE_CHANGE_FAILURE) {
2242     /* Cleanup elements on failed transition out of NULL */
2243     gst_splitmux_reset (splitmux);
2244     GST_SPLITMUX_LOCK (splitmux);
2245     do_async_done (splitmux);
2246     GST_SPLITMUX_UNLOCK (splitmux);
2247   }
2248   return ret;
2249 }
2250
2251 gboolean
2252 register_splitmuxsink (GstPlugin * plugin)
2253 {
2254   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
2255       "Split File Muxing Sink");
2256
2257   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
2258       GST_TYPE_SPLITMUX_SINK);
2259 }
2260
2261 static void
2262 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
2263 {
2264   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
2265     splitmux->fragment_id = 0;
2266   }
2267 }