spitmuxsink: Avoid creating small file at EOS
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MAX_FILES,
75   PROP_MUXER_OVERHEAD,
76   PROP_MUXER,
77   PROP_SINK
78 };
79
80 #define DEFAULT_MAX_SIZE_TIME       0
81 #define DEFAULT_MAX_SIZE_BYTES      0
82 #define DEFAULT_MAX_FILES           0
83 #define DEFAULT_MUXER_OVERHEAD      0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
86
87 enum
88 {
89   SIGNAL_FORMAT_LOCATION,
90   SIGNAL_LAST
91 };
92
93 static guint signals[SIGNAL_LAST];
94
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
102     GST_PAD_SINK,
103     GST_PAD_REQUEST,
104     GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
107     GST_PAD_SINK,
108     GST_PAD_REQUEST,
109     GST_STATIC_CAPS_ANY);
110
111 static GQuark PAD_CONTEXT;
112
113 static void
114 _do_init (void)
115 {
116   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
117 }
118
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
121     _do_init ());
122
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126     const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128     GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
131
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
135
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137     element, GstStateChange transition);
138
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
144
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
146
147 static MqStreamBuf *
148 mq_stream_buf_new (void)
149 {
150   return g_slice_new0 (MqStreamBuf);
151 }
152
153 static void
154 mq_stream_buf_free (MqStreamBuf * data)
155 {
156   g_slice_free (MqStreamBuf, data);
157 }
158
159 static void
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
161 {
162   GObjectClass *gobject_class = (GObjectClass *) klass;
163   GstElementClass *gstelement_class = (GstElementClass *) klass;
164   GstBinClass *gstbin_class = (GstBinClass *) klass;
165
166   gobject_class->set_property = gst_splitmux_sink_set_property;
167   gobject_class->get_property = gst_splitmux_sink_get_property;
168   gobject_class->dispose = gst_splitmux_sink_dispose;
169   gobject_class->finalize = gst_splitmux_sink_finalize;
170
171   gst_element_class_set_static_metadata (gstelement_class,
172       "Split Muxing Bin", "Generic/Bin/Muxer",
173       "Convenience bin that muxes incoming streams into multiple time/size limited files",
174       "Jan Schmidt <jan@centricular.com>");
175
176   gst_element_class_add_static_pad_template (gstelement_class,
177       &video_sink_template);
178   gst_element_class_add_static_pad_template (gstelement_class,
179       &audio_sink_template);
180   gst_element_class_add_static_pad_template (gstelement_class,
181       &subtitle_sink_template);
182
183   gstelement_class->change_state =
184       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185   gstelement_class->request_new_pad =
186       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187   gstelement_class->release_pad =
188       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
189
190   gstbin_class->handle_message = bus_handler;
191
192   g_object_class_install_property (gobject_class, PROP_LOCATION,
193       g_param_spec_string ("location", "File Output Pattern",
194           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197       g_param_spec_double ("mux-overhead", "Muxing Overhead",
198           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199           DEFAULT_MUXER_OVERHEAD,
200           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
201
202   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211       g_param_spec_uint ("max-files", "Max files",
212           "Maximum number of files to keep on disk. Once the maximum is reached,"
213           "old files start to be deleted to make room for new ones.",
214           0, G_MAXUINT, DEFAULT_MAX_FILES,
215           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
216
217
218   g_object_class_install_property (gobject_class, PROP_MUXER,
219       g_param_spec_object ("muxer", "Muxer",
220           "The muxer element to use (NULL = default mp4mux)",
221           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_SINK,
223       g_param_spec_object ("sink", "Sink",
224           "The sink element (or element chain) to use (NULL = default filesink)",
225           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   /**
228    * GstSplitMuxSink::format-location:
229    * @splitmux: the #GstSplitMuxSink
230    * @fragment_id: the sequence number of the file to be created
231    *
232    * Returns: the location to be used for the next output file
233    */
234   signals[SIGNAL_FORMAT_LOCATION] =
235       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
237 }
238
239 static void
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
241 {
242   g_mutex_init (&splitmux->lock);
243   g_cond_init (&splitmux->data_cond);
244
245   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248   splitmux->max_files = DEFAULT_MAX_FILES;
249
250   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251 }
252
253 static void
254 gst_splitmux_reset (GstSplitMuxSink * splitmux)
255 {
256   if (splitmux->mq)
257     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
258   if (splitmux->muxer)
259     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
260   if (splitmux->active_sink)
261     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
262
263   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
264       NULL;
265 }
266
267 static void
268 gst_splitmux_sink_dispose (GObject * object)
269 {
270   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
271
272   G_OBJECT_CLASS (parent_class)->dispose (object);
273
274   /* Calling parent dispose invalidates all child pointers */
275   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
276       NULL;
277 }
278
279 static void
280 gst_splitmux_sink_finalize (GObject * object)
281 {
282   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
283   g_cond_clear (&splitmux->data_cond);
284   g_mutex_clear (&splitmux->lock);
285   if (splitmux->provided_sink)
286     gst_object_unref (splitmux->provided_sink);
287   if (splitmux->provided_muxer)
288     gst_object_unref (splitmux->provided_muxer);
289
290   g_free (splitmux->location);
291
292   /* Make sure to free any un-released contexts */
293   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
294   g_list_free (splitmux->contexts);
295
296   G_OBJECT_CLASS (parent_class)->finalize (object);
297 }
298
299 static void
300 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
301     const GValue * value, GParamSpec * pspec)
302 {
303   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
304
305   switch (prop_id) {
306     case PROP_LOCATION:{
307       GST_OBJECT_LOCK (splitmux);
308       g_free (splitmux->location);
309       splitmux->location = g_value_dup_string (value);
310       GST_OBJECT_UNLOCK (splitmux);
311       break;
312     }
313     case PROP_MAX_SIZE_BYTES:
314       GST_OBJECT_LOCK (splitmux);
315       splitmux->threshold_bytes = g_value_get_uint64 (value);
316       GST_OBJECT_UNLOCK (splitmux);
317       break;
318     case PROP_MAX_SIZE_TIME:
319       GST_OBJECT_LOCK (splitmux);
320       splitmux->threshold_time = g_value_get_uint64 (value);
321       GST_OBJECT_UNLOCK (splitmux);
322       break;
323     case PROP_MAX_FILES:
324       GST_OBJECT_LOCK (splitmux);
325       splitmux->max_files = g_value_get_uint (value);
326       GST_OBJECT_UNLOCK (splitmux);
327       break;
328     case PROP_MUXER_OVERHEAD:
329       GST_OBJECT_LOCK (splitmux);
330       splitmux->mux_overhead = g_value_get_double (value);
331       GST_OBJECT_UNLOCK (splitmux);
332       break;
333     case PROP_SINK:
334       GST_OBJECT_LOCK (splitmux);
335       if (splitmux->provided_sink)
336         gst_object_unref (splitmux->provided_sink);
337       splitmux->provided_sink = g_value_dup_object (value);
338       GST_OBJECT_UNLOCK (splitmux);
339       break;
340     case PROP_MUXER:
341       GST_OBJECT_LOCK (splitmux);
342       if (splitmux->provided_muxer)
343         gst_object_unref (splitmux->provided_muxer);
344       splitmux->provided_muxer = g_value_dup_object (value);
345       GST_OBJECT_UNLOCK (splitmux);
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
349       break;
350   }
351 }
352
353 static void
354 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
355     GValue * value, GParamSpec * pspec)
356 {
357   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
358
359   switch (prop_id) {
360     case PROP_LOCATION:
361       GST_OBJECT_LOCK (splitmux);
362       g_value_set_string (value, splitmux->location);
363       GST_OBJECT_UNLOCK (splitmux);
364       break;
365     case PROP_MAX_SIZE_BYTES:
366       GST_OBJECT_LOCK (splitmux);
367       g_value_set_uint64 (value, splitmux->threshold_bytes);
368       GST_OBJECT_UNLOCK (splitmux);
369       break;
370     case PROP_MAX_SIZE_TIME:
371       GST_OBJECT_LOCK (splitmux);
372       g_value_set_uint64 (value, splitmux->threshold_time);
373       GST_OBJECT_UNLOCK (splitmux);
374       break;
375     case PROP_MAX_FILES:
376       GST_OBJECT_LOCK (splitmux);
377       g_value_set_uint (value, splitmux->max_files);
378       GST_OBJECT_UNLOCK (splitmux);
379       break;
380     case PROP_MUXER_OVERHEAD:
381       GST_OBJECT_LOCK (splitmux);
382       g_value_set_double (value, splitmux->mux_overhead);
383       GST_OBJECT_UNLOCK (splitmux);
384       break;
385     case PROP_SINK:
386       GST_OBJECT_LOCK (splitmux);
387       g_value_set_object (value, splitmux->provided_sink);
388       GST_OBJECT_UNLOCK (splitmux);
389       break;
390     case PROP_MUXER:
391       GST_OBJECT_LOCK (splitmux);
392       g_value_set_object (value, splitmux->provided_muxer);
393       GST_OBJECT_UNLOCK (splitmux);
394       break;
395     default:
396       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
397       break;
398   }
399 }
400
401 static GstPad *
402 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
403 {
404   gchar *tmp, *sinkname, *srcname;
405   GstPad *mq_src;
406
407   sinkname = gst_pad_get_name (sink_pad);
408   tmp = sinkname + 5;
409   srcname = g_strdup_printf ("src_%s", tmp);
410
411   mq_src = gst_element_get_static_pad (mq, srcname);
412
413   g_free (sinkname);
414   g_free (srcname);
415
416   return mq_src;
417 }
418
419 static gboolean
420 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
421     GstPad ** src_pad)
422 {
423   GstPad *mq_sink;
424   GstPad *mq_src;
425
426   /* Request a pad from multiqueue, then connect this one, then
427    * discover the corresponding output pad and return both */
428   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
429   if (mq_sink == NULL)
430     return FALSE;
431
432   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
433   if (mq_src == NULL)
434     goto fail;
435
436   *sink_pad = mq_sink;
437   *src_pad = mq_src;
438
439   return TRUE;
440
441 fail:
442   gst_element_release_request_pad (splitmux->mq, mq_sink);
443   return FALSE;
444 }
445
446 static MqStreamCtx *
447 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
448 {
449   MqStreamCtx *ctx;
450
451   ctx = g_new0 (MqStreamCtx, 1);
452   g_atomic_int_set (&ctx->refcount, 1);
453   ctx->splitmux = splitmux;
454   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
455   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
456   ctx->in_running_time = ctx->out_running_time = 0;
457   g_queue_init (&ctx->queued_bufs);
458   return ctx;
459 }
460
461 static void
462 mq_stream_ctx_free (MqStreamCtx * ctx)
463 {
464   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
465   g_queue_clear (&ctx->queued_bufs);
466   g_free (ctx);
467 }
468
469 static void
470 mq_stream_ctx_unref (MqStreamCtx * ctx)
471 {
472   if (g_atomic_int_dec_and_test (&ctx->refcount))
473     mq_stream_ctx_free (ctx);
474 }
475
476 static void
477 mq_stream_ctx_ref (MqStreamCtx * ctx)
478 {
479   g_atomic_int_inc (&ctx->refcount);
480 }
481
482 static void
483 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
484 {
485   ctx->sink_pad_block_id = 0;
486   mq_stream_ctx_unref (ctx);
487 }
488
489 static void
490 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
491 {
492   ctx->src_pad_block_id = 0;
493   mq_stream_ctx_unref (ctx);
494 }
495
496 static void
497 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
498 {
499   gchar *location = NULL;
500   GstMessage *msg;
501   const gchar *msg_name = opened ?
502       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
503
504   g_object_get (splitmux->sink, "location", &location, NULL);
505
506   msg = gst_message_new_element (GST_OBJECT (splitmux),
507       gst_structure_new (msg_name,
508           "location", G_TYPE_STRING, location,
509           "running-time", GST_TYPE_CLOCK_TIME,
510           splitmux->reference_ctx->out_running_time, NULL));
511   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
512
513   g_free (location);
514 }
515
516 /* Called with lock held, drops the lock to send EOS to the
517  * pad
518  */
519 static void
520 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
521 {
522   GstEvent *eos;
523   GstPad *pad;
524
525   eos = gst_event_new_eos ();
526   pad = gst_pad_get_peer (ctx->srcpad);
527
528   ctx->out_eos = TRUE;
529
530   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
531   GST_SPLITMUX_UNLOCK (splitmux);
532   gst_pad_send_event (pad, eos);
533   GST_SPLITMUX_LOCK (splitmux);
534
535   gst_object_unref (pad);
536 }
537
538 /* Called with splitmux lock held to check if this output
539  * context needs to sleep to wait for the release of the
540  * next GOP, or to send EOS to close out the current file
541  */
542 static void
543 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
544 {
545   do {
546
547     GST_LOG_OBJECT (ctx->srcpad,
548         "Checking running time %" GST_TIME_FORMAT " against max %"
549         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
550         GST_TIME_ARGS (splitmux->max_out_running_time));
551
552     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
553         ctx->out_running_time < splitmux->max_out_running_time) {
554       splitmux->have_muxed_something = TRUE;
555       return;
556     }
557
558     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
559       return;
560
561     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
562       if (ctx->out_eos == FALSE) {
563         send_eos (splitmux, ctx);
564         continue;
565       }
566     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
567       start_next_fragment (splitmux);
568       continue;
569     }
570
571     GST_INFO_OBJECT (ctx->srcpad,
572         "Sleeping for running time %"
573         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
574         GST_TIME_ARGS (ctx->out_running_time),
575         GST_TIME_ARGS (splitmux->max_out_running_time));
576     ctx->out_blocked = TRUE;
577     /* Expand the mq if needed before sleeping */
578     check_queue_length (splitmux, ctx);
579     GST_SPLITMUX_WAIT (splitmux);
580     ctx->out_blocked = FALSE;
581     GST_INFO_OBJECT (ctx->srcpad,
582         "Woken for new max running time %" GST_TIME_FORMAT,
583         GST_TIME_ARGS (splitmux->max_out_running_time));
584   } while (1);
585 }
586
587 static GstPadProbeReturn
588 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
589 {
590   GstSplitMuxSink *splitmux = ctx->splitmux;
591   MqStreamBuf *buf_info = NULL;
592
593   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
594
595   /* FIXME: Handle buffer lists, until then make it clear they won't work */
596   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
597     g_warning ("Buffer list handling not implemented");
598     return GST_PAD_PROBE_DROP;
599   }
600   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
601     GstEvent *event = gst_pad_probe_info_get_event (info);
602
603     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
604
605     switch (GST_EVENT_TYPE (event)) {
606       case GST_EVENT_SEGMENT:
607         gst_event_copy_segment (event, &ctx->out_segment);
608         break;
609       case GST_EVENT_FLUSH_STOP:
610         GST_SPLITMUX_LOCK (splitmux);
611         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
612         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
613         g_queue_clear (&ctx->queued_bufs);
614         ctx->flushing = FALSE;
615         GST_SPLITMUX_UNLOCK (splitmux);
616         break;
617       case GST_EVENT_FLUSH_START:
618         GST_SPLITMUX_LOCK (splitmux);
619         GST_LOG_OBJECT (pad, "Flush start");
620         ctx->flushing = TRUE;
621         GST_SPLITMUX_BROADCAST (splitmux);
622         GST_SPLITMUX_UNLOCK (splitmux);
623         break;
624       case GST_EVENT_EOS:
625         GST_SPLITMUX_LOCK (splitmux);
626         if (splitmux->state == SPLITMUX_STATE_STOPPED)
627           goto beach;
628         ctx->out_eos = TRUE;
629         GST_SPLITMUX_UNLOCK (splitmux);
630         break;
631       case GST_EVENT_GAP:{
632         GstClockTime gap_ts;
633
634         gst_event_parse_gap (event, &gap_ts, NULL);
635         if (gap_ts == GST_CLOCK_TIME_NONE)
636           break;
637
638         GST_SPLITMUX_LOCK (splitmux);
639
640         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
641             GST_FORMAT_TIME, gap_ts);
642
643         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
644             GST_TIME_ARGS (gap_ts));
645
646         if (splitmux->state == SPLITMUX_STATE_STOPPED)
647           goto beach;
648         ctx->out_running_time = gap_ts;
649         complete_or_wait_on_out (splitmux, ctx);
650         GST_SPLITMUX_UNLOCK (splitmux);
651         break;
652       }
653       case GST_EVENT_CUSTOM_DOWNSTREAM:{
654         const GstStructure *s;
655         GstClockTime ts = 0;
656
657         s = gst_event_get_structure (event);
658         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
659           break;
660
661         gst_structure_get_uint64 (s, "timestamp", &ts);
662
663         GST_SPLITMUX_LOCK (splitmux);
664
665         if (splitmux->state == SPLITMUX_STATE_STOPPED)
666           goto beach;
667         ctx->out_running_time = ts;
668         complete_or_wait_on_out (splitmux, ctx);
669         GST_SPLITMUX_UNLOCK (splitmux);
670         return GST_PAD_PROBE_DROP;
671       }
672       default:
673         break;
674     }
675     return GST_PAD_PROBE_PASS;
676   }
677
678   /* Allow everything through until the configured next stopping point */
679   GST_SPLITMUX_LOCK (splitmux);
680
681   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
682   if (buf_info == NULL)
683     /* Can only happen due to a poorly timed flush */
684     goto beach;
685
686   /* If we have popped a keyframe, decrement the queued_gop count */
687   if (buf_info->keyframe && splitmux->queued_gops > 0)
688     splitmux->queued_gops--;
689
690   ctx->out_running_time = buf_info->run_ts;
691
692   GST_LOG_OBJECT (splitmux,
693       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
694       " size %" G_GSIZE_FORMAT,
695       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
696
697   if (splitmux->opening_first_fragment) {
698     send_fragment_opened_closed_msg (splitmux, TRUE);
699     splitmux->opening_first_fragment = FALSE;
700   }
701
702   complete_or_wait_on_out (splitmux, ctx);
703
704   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
705       splitmux->muxed_out_time < buf_info->run_ts)
706     splitmux->muxed_out_time = buf_info->run_ts;
707
708   splitmux->muxed_out_bytes += buf_info->buf_size;
709
710 #ifndef GST_DISABLE_GST_DEBUG
711   {
712     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
713     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
714         " run ts %" GST_TIME_FORMAT, buf,
715         GST_TIME_ARGS (ctx->out_running_time));
716   }
717 #endif
718
719   GST_SPLITMUX_UNLOCK (splitmux);
720
721   mq_stream_buf_free (buf_info);
722
723   return GST_PAD_PROBE_PASS;
724
725 beach:
726   GST_SPLITMUX_UNLOCK (splitmux);
727   return GST_PAD_PROBE_DROP;
728 }
729
730 static gboolean
731 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
732 {
733   return gst_pad_send_event (peer, gst_event_ref (*event));
734 }
735
736 static void
737 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
738 {
739   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
740
741   gst_pad_sticky_events_foreach (ctx->srcpad,
742       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
743
744   /* Clear EOS flag if not actually EOS */
745   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
746
747   gst_object_unref (peer);
748 }
749
750 /* Called with lock held when a fragment
751  * reaches EOS and it is time to restart
752  * a new fragment
753  */
754 static void
755 start_next_fragment (GstSplitMuxSink * splitmux)
756 {
757   /* 1 change to new file */
758   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
759   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
760
761   set_next_filename (splitmux);
762
763   gst_element_sync_state_with_parent (splitmux->active_sink);
764   gst_element_sync_state_with_parent (splitmux->muxer);
765
766   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
767
768   /* Switch state and go back to processing */
769   if (!splitmux->reference_ctx->in_eos) {
770     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
771     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
772   } else {
773     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
774     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
775     splitmux->have_muxed_something = FALSE;
776   }
777   splitmux->have_muxed_something =
778       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
779
780   /* Store the overflow parameters as the basis for the next fragment */
781   splitmux->mux_start_time = splitmux->muxed_out_time;
782   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
783
784   GST_DEBUG_OBJECT (splitmux,
785       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
786       GST_TIME_ARGS (splitmux->max_out_running_time));
787
788   send_fragment_opened_closed_msg (splitmux, TRUE);
789
790   GST_SPLITMUX_BROADCAST (splitmux);
791 }
792
793 static void
794 bus_handler (GstBin * bin, GstMessage * message)
795 {
796   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
797
798   switch (GST_MESSAGE_TYPE (message)) {
799     case GST_MESSAGE_EOS:
800       /* If the state is draining out the current file, drop this EOS */
801       GST_SPLITMUX_LOCK (splitmux);
802
803       send_fragment_opened_closed_msg (splitmux, FALSE);
804
805       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
806           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
807         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
808         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
809         GST_SPLITMUX_BROADCAST (splitmux);
810
811         gst_message_unref (message);
812         GST_SPLITMUX_UNLOCK (splitmux);
813         return;
814       }
815       GST_SPLITMUX_UNLOCK (splitmux);
816       break;
817     default:
818       break;
819   }
820
821   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
822 }
823
824 /* Called with splitmux lock held */
825 /* Called when entering ProcessingCompleteGop state
826  * Assess if mq contents overflowed the current file
827  *   -> If yes, need to switch to new file
828  *   -> if no, set max_out_running_time to let this GOP in and
829  *      go to COLLECTING_GOP_START state
830  */
831 static void
832 handle_gathered_gop (GstSplitMuxSink * splitmux)
833 {
834   GList *cur;
835   gsize queued_bytes = 0;
836   GstClockTime queued_time = 0;
837
838   /* Assess if the multiqueue contents overflowed the current file */
839   for (cur = g_list_first (splitmux->contexts);
840       cur != NULL; cur = g_list_next (cur)) {
841     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
842     if (tmpctx->in_running_time > queued_time)
843       queued_time = tmpctx->in_running_time;
844     queued_bytes += tmpctx->in_bytes;
845   }
846
847   g_assert (queued_bytes >= splitmux->mux_start_bytes);
848   g_assert (queued_time >= splitmux->mux_start_time);
849
850   queued_bytes -= splitmux->mux_start_bytes;
851   queued_time -= splitmux->mux_start_time;
852
853   /* Expand queued bytes estimate by muxer overhead */
854   queued_bytes += (queued_bytes * splitmux->mux_overhead);
855
856   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
857       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
858
859   /* Check for overrun - have we output at least one byte and overrun
860    * either threshold? */
861   if ((splitmux->have_muxed_something &&
862           ((splitmux->threshold_bytes > 0 &&
863                   queued_bytes >= splitmux->threshold_bytes) ||
864               (splitmux->threshold_time > 0 &&
865                   queued_time >= splitmux->threshold_time)))) {
866
867     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
868
869     GST_INFO_OBJECT (splitmux,
870         "mq overflowed since last, draining out. max out TS is %"
871         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
872     GST_SPLITMUX_BROADCAST (splitmux);
873
874   } else {
875     /* No overflow */
876     GST_LOG_OBJECT (splitmux,
877         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
878         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
879         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
880         queued_bytes, GST_TIME_ARGS (queued_time));
881
882     /* Wake everyone up to push this one GOP, then sleep */
883     splitmux->have_muxed_something = TRUE;
884
885     if (!splitmux->reference_ctx->in_eos) {
886       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
887       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
888     } else {
889       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
890       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
891     }
892
893     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
894         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
895     GST_SPLITMUX_BROADCAST (splitmux);
896   }
897
898 }
899
900 /* Called with splitmux lock held */
901 /* Called from each input pad when it is has all the pieces
902  * for a GOP or EOS, starting with the reference pad which has set the
903  * splitmux->max_in_running_time
904  */
905 static void
906 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
907 {
908   GList *cur;
909   gboolean ready = TRUE;
910   GstClockTime current_max_in_running_time;
911
912   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
913     /* Iterate each pad, and check that the input running time is at least
914      * up to the reference running time, and if so handle the collected GOP */
915     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
916         GST_TIME_FORMAT " ctx %p",
917         GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
918     for (cur = g_list_first (splitmux->contexts); cur != NULL;
919         cur = g_list_next (cur)) {
920       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
921
922       GST_LOG_OBJECT (splitmux,
923           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
924           " EOS %d", tmpctx, tmpctx->srcpad,
925           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
926
927       if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
928           tmpctx->in_running_time < splitmux->max_in_running_time &&
929           !tmpctx->in_eos) {
930         GST_LOG_OBJECT (splitmux,
931             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
932             tmpctx, tmpctx->srcpad);
933         ready = FALSE;
934         break;
935       }
936     }
937     if (ready) {
938       GST_DEBUG_OBJECT (splitmux,
939           "Collected GOP is complete. Processing (ctx %p)", ctx);
940       /* All pads have a complete GOP, release it into the multiqueue */
941       handle_gathered_gop (splitmux);
942     }
943   }
944
945   /* Some pad is not yet ready, or GOP is being pushed
946    * either way, sleep and wait to get woken */
947   current_max_in_running_time = splitmux->max_in_running_time;
948   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
949           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
950       !ctx->flushing &&
951       (current_max_in_running_time == splitmux->max_in_running_time)) {
952
953     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
954         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
955         "GOP complete" : "EOF draining", ctx);
956     GST_SPLITMUX_WAIT (splitmux);
957
958     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
959   }
960 }
961
962 static void
963 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
964 {
965   GList *cur;
966   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
967
968   GST_DEBUG_OBJECT (ctx->sinkpad,
969       "Checking queue length len %u cur_max %u queued gops %u",
970       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
971
972   if (cur_len >= splitmux->mq_max_buffers) {
973     gboolean allow_grow = FALSE;
974
975     /* If collecting a GOP and this pad might block,
976      * and there isn't already a pending GOP in the queue
977      * then grow
978      */
979     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
980         ctx->in_running_time < splitmux->max_in_running_time &&
981         splitmux->queued_gops <= 1) {
982       allow_grow = TRUE;
983     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
984         ctx->is_reference && splitmux->queued_gops <= 1) {
985       allow_grow = TRUE;
986     }
987
988     if (!allow_grow) {
989       for (cur = g_list_first (splitmux->contexts);
990           cur != NULL; cur = g_list_next (cur)) {
991         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
992         GST_DEBUG_OBJECT (tmpctx->sinkpad,
993             " len %u out_blocked %d",
994             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
995         /* If another stream is starving, grow */
996         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
997           allow_grow = TRUE;
998         }
999       }
1000     }
1001
1002     if (allow_grow) {
1003       splitmux->mq_max_buffers = cur_len + 1;
1004
1005       GST_INFO_OBJECT (splitmux,
1006           "Multiqueue overrun - enlarging to %u buffers ctx %p",
1007           splitmux->mq_max_buffers, ctx);
1008
1009       g_object_set (splitmux->mq, "max-size-buffers",
1010           splitmux->mq_max_buffers, NULL);
1011     }
1012   }
1013 }
1014
1015 static GstPadProbeReturn
1016 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1017 {
1018   GstSplitMuxSink *splitmux = ctx->splitmux;
1019   GstBuffer *buf;
1020   MqStreamBuf *buf_info = NULL;
1021   GstClockTime ts;
1022   gboolean loop_again;
1023   gboolean keyframe = FALSE;
1024
1025   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1026
1027   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1028   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1029     g_warning ("Buffer list handling not implemented");
1030     return GST_PAD_PROBE_DROP;
1031   }
1032   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1033     GstEvent *event = gst_pad_probe_info_get_event (info);
1034     switch (GST_EVENT_TYPE (event)) {
1035       case GST_EVENT_SEGMENT:
1036         gst_event_copy_segment (event, &ctx->in_segment);
1037         break;
1038       case GST_EVENT_FLUSH_STOP:
1039         GST_SPLITMUX_LOCK (splitmux);
1040         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1041         ctx->in_eos = FALSE;
1042         ctx->in_bytes = 0;
1043         ctx->in_running_time = 0;
1044         GST_SPLITMUX_UNLOCK (splitmux);
1045         break;
1046       case GST_EVENT_EOS:
1047         GST_SPLITMUX_LOCK (splitmux);
1048         ctx->in_eos = TRUE;
1049
1050         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1051           goto beach;
1052
1053         if (ctx->is_reference) {
1054           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1055           /* Act as if this is a new keyframe with infinite timestamp */
1056           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
1057           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1058           /* Wake up other input pads to collect this GOP */
1059           GST_SPLITMUX_BROADCAST (splitmux);
1060           check_completed_gop (splitmux, ctx);
1061         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1062           /* If we are waiting for a GOP to be completed (ie, for aux
1063            * pads to catch up), then this pad is complete, so check
1064            * if the whole GOP is.
1065            */
1066           check_completed_gop (splitmux, ctx);
1067         }
1068         GST_SPLITMUX_UNLOCK (splitmux);
1069         break;
1070       default:
1071         break;
1072     }
1073     return GST_PAD_PROBE_PASS;
1074   }
1075
1076   buf = gst_pad_probe_info_get_buffer (info);
1077   buf_info = mq_stream_buf_new ();
1078
1079   if (GST_BUFFER_PTS_IS_VALID (buf))
1080     ts = GST_BUFFER_PTS (buf);
1081   else
1082     ts = GST_BUFFER_DTS (buf);
1083
1084   GST_SPLITMUX_LOCK (splitmux);
1085
1086   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1087     goto beach;
1088
1089   /* If this buffer has a timestamp, advance the input timestamp of the
1090    * stream */
1091   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1092     GstClockTime running_time =
1093         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1094         GST_BUFFER_TIMESTAMP (buf));
1095
1096     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1097         (ctx->in_running_time == GST_CLOCK_TIME_NONE
1098             || running_time > ctx->in_running_time))
1099       ctx->in_running_time = running_time;
1100   }
1101
1102   /* Try to make sure we have a valid running time */
1103   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1104     ctx->in_running_time =
1105         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1106         ctx->in_segment.start);
1107   }
1108
1109   buf_info->run_ts = ctx->in_running_time;
1110   buf_info->buf_size = gst_buffer_get_size (buf);
1111
1112   /* Update total input byte counter for overflow detect */
1113   ctx->in_bytes += buf_info->buf_size;
1114
1115   /* initialize mux_start_time */
1116   if (ctx->is_reference && splitmux->mux_start_time == 0)
1117     splitmux->mux_start_time = buf_info->run_ts;
1118
1119   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1120       " total in_bytes %" G_GSIZE_FORMAT,
1121       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1122
1123   loop_again = TRUE;
1124   do {
1125     if (ctx->flushing)
1126       break;
1127
1128     switch (splitmux->state) {
1129       case SPLITMUX_STATE_COLLECTING_GOP_START:
1130         if (ctx->is_reference) {
1131           /* If a keyframe, we have a complete GOP */
1132           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1133               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1134               splitmux->max_in_running_time >= ctx->in_running_time) {
1135             /* Pass this buffer through */
1136             loop_again = FALSE;
1137             break;
1138           }
1139           GST_INFO_OBJECT (pad,
1140               "Have keyframe with running time %" GST_TIME_FORMAT,
1141               GST_TIME_ARGS (ctx->in_running_time));
1142           keyframe = TRUE;
1143           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1144           splitmux->max_in_running_time = ctx->in_running_time;
1145           /* Wake up other input pads to collect this GOP */
1146           GST_SPLITMUX_BROADCAST (splitmux);
1147           check_completed_gop (splitmux, ctx);
1148         } else {
1149           /* We're still waiting for a keyframe on the reference pad, sleep */
1150           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1151           GST_SPLITMUX_WAIT (splitmux);
1152           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1153               splitmux->state);
1154         }
1155         break;
1156       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1157         /* After a GOP start is found, this buffer might complete the GOP */
1158         /* If we overran the target timestamp, it might be time to process
1159          * the GOP, otherwise bail out for more data
1160          */
1161         GST_LOG_OBJECT (pad,
1162             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1163             GST_TIME_ARGS (ctx->in_running_time),
1164             GST_TIME_ARGS (splitmux->max_in_running_time));
1165
1166         if (ctx->in_running_time < splitmux->max_in_running_time) {
1167           loop_again = FALSE;
1168           break;
1169         }
1170
1171         GST_LOG_OBJECT (pad,
1172             "Collected last packet of GOP. Checking other pads");
1173         check_completed_gop (splitmux, ctx);
1174         break;
1175       case SPLITMUX_STATE_ENDING_FILE:{
1176         GstEvent *event;
1177
1178         /* If somes streams received no buffer during the last GOP that overran,
1179          * because its next buffer has a timestamp bigger than
1180          * ctx->max_in_running_time, its queue is empty. In that case the only
1181          * way to wakeup the output thread is by injecting an event in the
1182          * queue. This usually happen with subtitle streams.
1183          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1184         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1185         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1186             GST_EVENT_TYPE_SERIALIZED,
1187             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1188                 G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
1189         gst_pad_send_event (ctx->sinkpad, event);
1190         /* fallthrough */
1191       }
1192       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1193         /* A fragment is ending, wait until that's done before continuing */
1194         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1195         GST_SPLITMUX_WAIT (splitmux);
1196         GST_DEBUG_OBJECT (pad,
1197             "Done sleeping for fragment restart state now %d", splitmux->state);
1198         break;
1199       default:
1200         loop_again = FALSE;
1201         break;
1202     }
1203   } while (loop_again);
1204
1205   if (keyframe) {
1206     splitmux->queued_gops++;
1207     buf_info->keyframe = TRUE;
1208   }
1209
1210   /* Now add this buffer to the queue just before returning */
1211   g_queue_push_head (&ctx->queued_bufs, buf_info);
1212
1213   /* Check the buffer will fit in the mq */
1214   check_queue_length (splitmux, ctx);
1215
1216   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1217       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1218
1219   GST_SPLITMUX_UNLOCK (splitmux);
1220   return GST_PAD_PROBE_PASS;
1221
1222 beach:
1223   GST_SPLITMUX_UNLOCK (splitmux);
1224   if (buf_info)
1225     mq_stream_buf_free (buf_info);
1226   return GST_PAD_PROBE_PASS;
1227 }
1228
1229 static GstPad *
1230 gst_splitmux_sink_request_new_pad (GstElement * element,
1231     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1232 {
1233   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1234   GstPadTemplate *mux_template = NULL;
1235   GstPad *res = NULL;
1236   GstPad *mq_sink, *mq_src;
1237   gchar *gname;
1238   gboolean is_video = FALSE;
1239   MqStreamCtx *ctx;
1240
1241   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1242
1243   GST_SPLITMUX_LOCK (splitmux);
1244   if (!create_elements (splitmux))
1245     goto fail;
1246
1247   if (templ->name_template) {
1248     if (g_str_equal (templ->name_template, "video")) {
1249       /* FIXME: Look for a pad template with matching caps, rather than by name */
1250       mux_template =
1251           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1252           (splitmux->muxer), "video_%u");
1253       is_video = TRUE;
1254       name = NULL;
1255     } else {
1256       mux_template =
1257           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1258           (splitmux->muxer), templ->name_template);
1259     }
1260     if (mux_template == NULL) {
1261       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1262       mux_template =
1263           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1264           (splitmux->muxer), "sink_%d");
1265     }
1266   }
1267
1268   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1269   if (res == NULL)
1270     goto fail;
1271
1272   if (is_video)
1273     gname = g_strdup ("video");
1274   else if (name == NULL)
1275     gname = gst_pad_get_name (res);
1276   else
1277     gname = g_strdup (name);
1278
1279   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1280     gst_element_release_request_pad (splitmux->muxer, res);
1281     gst_object_unref (GST_OBJECT (res));
1282     goto fail;
1283   }
1284
1285   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1286     gst_element_release_request_pad (splitmux->muxer, res);
1287     gst_object_unref (GST_OBJECT (res));
1288     gst_element_release_request_pad (splitmux->mq, mq_sink);
1289     gst_object_unref (GST_OBJECT (mq_sink));
1290     goto fail;
1291   }
1292
1293   gst_object_unref (GST_OBJECT (res));
1294
1295   ctx = mq_stream_ctx_new (splitmux);
1296   ctx->srcpad = mq_src;
1297   ctx->sinkpad = mq_sink;
1298
1299   mq_stream_ctx_ref (ctx);
1300   ctx->src_pad_block_id =
1301       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1302       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1303       _pad_block_destroy_src_notify);
1304   if (is_video && splitmux->reference_ctx != NULL) {
1305     splitmux->reference_ctx->is_reference = FALSE;
1306     splitmux->reference_ctx = NULL;
1307   }
1308   if (splitmux->reference_ctx == NULL) {
1309     splitmux->reference_ctx = ctx;
1310     ctx->is_reference = TRUE;
1311   }
1312
1313   res = gst_ghost_pad_new (gname, mq_sink);
1314   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1315
1316   mq_stream_ctx_ref (ctx);
1317   ctx->sink_pad_block_id =
1318       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1319       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1320       _pad_block_destroy_sink_notify);
1321
1322   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1323       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1324
1325   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1326
1327   g_free (gname);
1328
1329   gst_object_unref (mq_sink);
1330   gst_object_unref (mq_src);
1331
1332   gst_pad_set_active (res, TRUE);
1333   gst_element_add_pad (element, res);
1334   GST_SPLITMUX_UNLOCK (splitmux);
1335
1336   return res;
1337 fail:
1338   GST_SPLITMUX_UNLOCK (splitmux);
1339   return NULL;
1340 }
1341
1342 static void
1343 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1344 {
1345   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1346   GstPad *mqsink, *mqsrc, *muxpad;
1347   MqStreamCtx *ctx =
1348       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1349
1350   GST_SPLITMUX_LOCK (splitmux);
1351
1352   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1353     goto fail;                  /* Elements don't exist yet - nothing to release */
1354
1355   GST_INFO_OBJECT (pad, "releasing request pad");
1356
1357   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1358   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1359   muxpad = gst_pad_get_peer (mqsrc);
1360
1361   /* Remove the context from our consideration */
1362   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1363
1364   if (ctx->sink_pad_block_id)
1365     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1366
1367   if (ctx->src_pad_block_id)
1368     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1369
1370   /* Can release the context now */
1371   mq_stream_ctx_unref (ctx);
1372
1373   /* Release and free the mq input */
1374   gst_element_release_request_pad (splitmux->mq, mqsink);
1375
1376   /* Release and free the muxer input */
1377   gst_element_release_request_pad (splitmux->muxer, muxpad);
1378
1379   gst_object_unref (mqsink);
1380   gst_object_unref (mqsrc);
1381   gst_object_unref (muxpad);
1382
1383   gst_element_remove_pad (element, pad);
1384
1385   /* Reset the internal elements only after all request pads are released */
1386   if (splitmux->contexts == NULL)
1387     gst_splitmux_reset (splitmux);
1388
1389 fail:
1390   GST_SPLITMUX_UNLOCK (splitmux);
1391 }
1392
1393 static GstElement *
1394 create_element (GstSplitMuxSink * splitmux,
1395     const gchar * factory, const gchar * name)
1396 {
1397   GstElement *ret = gst_element_factory_make (factory, name);
1398   if (ret == NULL) {
1399     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1400     return NULL;
1401   }
1402
1403   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1404     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1405     gst_object_unref (ret);
1406     return NULL;
1407   }
1408
1409   return ret;
1410 }
1411
1412 static gboolean
1413 create_elements (GstSplitMuxSink * splitmux)
1414 {
1415   /* Create internal elements */
1416   if (splitmux->mq == NULL) {
1417     if ((splitmux->mq =
1418             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1419       goto fail;
1420
1421     splitmux->mq_max_buffers = 5;
1422     /* No bytes or time limit, we limit buffers manually */
1423     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1424         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1425   }
1426
1427   if (splitmux->muxer == NULL) {
1428     GstElement *provided_muxer = NULL;
1429
1430     GST_OBJECT_LOCK (splitmux);
1431     if (splitmux->provided_muxer != NULL)
1432       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1433     GST_OBJECT_UNLOCK (splitmux);
1434
1435     if (provided_muxer == NULL) {
1436       if ((splitmux->muxer =
1437               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1438         goto fail;
1439     } else {
1440       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1441         g_warning ("Could not add muxer element - splitmuxsink will not work");
1442         gst_object_unref (provided_muxer);
1443         goto fail;
1444       }
1445
1446       splitmux->muxer = provided_muxer;
1447       gst_object_unref (provided_muxer);
1448     }
1449   }
1450
1451   return TRUE;
1452 fail:
1453   return FALSE;
1454 }
1455
1456 static GstElement *
1457 find_sink (GstElement * e)
1458 {
1459   GstElement *res = NULL;
1460   GstIterator *iter;
1461   gboolean done = FALSE;
1462   GValue data = { 0, };
1463
1464   if (!GST_IS_BIN (e))
1465     return e;
1466
1467   iter = gst_bin_iterate_sinks (GST_BIN (e));
1468   while (!done) {
1469     switch (gst_iterator_next (iter, &data)) {
1470       case GST_ITERATOR_OK:
1471       {
1472         GstElement *child = g_value_get_object (&data);
1473         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1474                 "location") != NULL) {
1475           res = child;
1476           done = TRUE;
1477         }
1478         g_value_reset (&data);
1479         break;
1480       }
1481       case GST_ITERATOR_RESYNC:
1482         gst_iterator_resync (iter);
1483         break;
1484       case GST_ITERATOR_DONE:
1485         done = TRUE;
1486         break;
1487       case GST_ITERATOR_ERROR:
1488         g_assert_not_reached ();
1489         break;
1490     }
1491   }
1492   g_value_unset (&data);
1493   gst_iterator_free (iter);
1494
1495   return res;
1496 }
1497
1498 static gboolean
1499 create_sink (GstSplitMuxSink * splitmux)
1500 {
1501   GstElement *provided_sink = NULL;
1502
1503   if (splitmux->active_sink == NULL) {
1504
1505     GST_OBJECT_LOCK (splitmux);
1506     if (splitmux->provided_sink != NULL)
1507       provided_sink = gst_object_ref (splitmux->provided_sink);
1508     GST_OBJECT_UNLOCK (splitmux);
1509
1510     if (provided_sink == NULL) {
1511       if ((splitmux->sink =
1512               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1513         goto fail;
1514       splitmux->active_sink = splitmux->sink;
1515     } else {
1516       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1517         g_warning ("Could not add sink elements - splitmuxsink will not work");
1518         gst_object_unref (provided_sink);
1519         goto fail;
1520       }
1521
1522       splitmux->active_sink = provided_sink;
1523
1524       /* The bin holds a ref now, we can drop our tmp ref */
1525       gst_object_unref (provided_sink);
1526
1527       /* Find the sink element */
1528       splitmux->sink = find_sink (splitmux->active_sink);
1529       if (splitmux->sink == NULL) {
1530         g_warning
1531             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1532         goto fail;
1533       }
1534     }
1535
1536     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1537       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1538       goto fail;
1539     }
1540   }
1541
1542   return TRUE;
1543 fail:
1544   return FALSE;
1545 }
1546
1547 #ifdef __GNUC__
1548 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1549 #endif
1550 static void
1551 set_next_filename (GstSplitMuxSink * splitmux)
1552 {
1553   gchar *fname = NULL;
1554   gst_splitmux_sink_ensure_max_files (splitmux);
1555
1556   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1557       splitmux->fragment_id, &fname);
1558
1559   if (!fname)
1560     fname = splitmux->location ?
1561         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1562
1563   if (fname) {
1564     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1565     g_object_set (splitmux->sink, "location", fname, NULL);
1566     g_free (fname);
1567
1568     splitmux->fragment_id++;
1569   }
1570 }
1571
1572 static GstStateChangeReturn
1573 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1574 {
1575   GstStateChangeReturn ret;
1576   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1577
1578   switch (transition) {
1579     case GST_STATE_CHANGE_NULL_TO_READY:{
1580       GST_SPLITMUX_LOCK (splitmux);
1581       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1582         ret = GST_STATE_CHANGE_FAILURE;
1583         GST_SPLITMUX_UNLOCK (splitmux);
1584         goto beach;
1585       }
1586       GST_SPLITMUX_UNLOCK (splitmux);
1587       splitmux->fragment_id = 0;
1588       set_next_filename (splitmux);
1589       break;
1590     }
1591     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1592       GST_SPLITMUX_LOCK (splitmux);
1593       /* Start by collecting one input on each pad */
1594       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1595       splitmux->max_in_running_time = 0;
1596       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1597       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1598       splitmux->opening_first_fragment = TRUE;
1599       GST_SPLITMUX_UNLOCK (splitmux);
1600       break;
1601     }
1602     case GST_STATE_CHANGE_PAUSED_TO_READY:
1603     case GST_STATE_CHANGE_READY_TO_NULL:
1604       GST_SPLITMUX_LOCK (splitmux);
1605       splitmux->state = SPLITMUX_STATE_STOPPED;
1606       /* Wake up any blocked threads */
1607       GST_LOG_OBJECT (splitmux,
1608           "State change -> NULL or READY. Waking threads");
1609       GST_SPLITMUX_BROADCAST (splitmux);
1610       GST_SPLITMUX_UNLOCK (splitmux);
1611       break;
1612     default:
1613       break;
1614   }
1615
1616   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1617   if (ret == GST_STATE_CHANGE_FAILURE)
1618     goto beach;
1619
1620   switch (transition) {
1621     case GST_STATE_CHANGE_READY_TO_NULL:
1622       GST_SPLITMUX_LOCK (splitmux);
1623       splitmux->fragment_id = 0;
1624       /* Reset internal elements only if no pad contexts are using them */
1625       if (splitmux->contexts == NULL)
1626         gst_splitmux_reset (splitmux);
1627       GST_SPLITMUX_UNLOCK (splitmux);
1628       break;
1629     default:
1630       break;
1631   }
1632
1633 beach:
1634
1635   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1636       ret == GST_STATE_CHANGE_FAILURE) {
1637     /* Cleanup elements on failed transition out of NULL */
1638     gst_splitmux_reset (splitmux);
1639   }
1640   return ret;
1641 }
1642
1643 gboolean
1644 register_splitmuxsink (GstPlugin * plugin)
1645 {
1646   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1647       "Split File Muxing Sink");
1648
1649   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1650       GST_TYPE_SPLITMUX_SINK);
1651 }
1652
1653 static void
1654 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1655 {
1656   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1657     splitmux->fragment_id = 0;
1658   }
1659 }