98f744ff6e32ce3986eddec6b22813cbef299707
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MAX_FILES,
75   PROP_MUXER_OVERHEAD,
76   PROP_MUXER,
77   PROP_SINK
78 };
79
80 #define DEFAULT_MAX_SIZE_TIME       0
81 #define DEFAULT_MAX_SIZE_BYTES      0
82 #define DEFAULT_MAX_FILES           0
83 #define DEFAULT_MUXER_OVERHEAD      0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
86
87 enum
88 {
89   SIGNAL_FORMAT_LOCATION,
90   SIGNAL_LAST
91 };
92
93 static guint signals[SIGNAL_LAST];
94
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
102     GST_PAD_SINK,
103     GST_PAD_REQUEST,
104     GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
107     GST_PAD_SINK,
108     GST_PAD_REQUEST,
109     GST_STATIC_CAPS_ANY);
110
111 static GQuark PAD_CONTEXT;
112
113 static void
114 _do_init (void)
115 {
116   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
117 }
118
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
121     _do_init ());
122
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126     const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128     GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
131
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
135
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137     element, GstStateChange transition);
138
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
144
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
146
147 static MqStreamBuf *
148 mq_stream_buf_new (void)
149 {
150   return g_slice_new0 (MqStreamBuf);
151 }
152
153 static void
154 mq_stream_buf_free (MqStreamBuf * data)
155 {
156   g_slice_free (MqStreamBuf, data);
157 }
158
159 static void
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
161 {
162   GObjectClass *gobject_class = (GObjectClass *) klass;
163   GstElementClass *gstelement_class = (GstElementClass *) klass;
164   GstBinClass *gstbin_class = (GstBinClass *) klass;
165
166   gobject_class->set_property = gst_splitmux_sink_set_property;
167   gobject_class->get_property = gst_splitmux_sink_get_property;
168   gobject_class->dispose = gst_splitmux_sink_dispose;
169   gobject_class->finalize = gst_splitmux_sink_finalize;
170
171   gst_element_class_set_static_metadata (gstelement_class,
172       "Split Muxing Bin", "Generic/Bin/Muxer",
173       "Convenience bin that muxes incoming streams into multiple time/size limited files",
174       "Jan Schmidt <jan@centricular.com>");
175
176   gst_element_class_add_static_pad_template (gstelement_class,
177       &video_sink_template);
178   gst_element_class_add_static_pad_template (gstelement_class,
179       &audio_sink_template);
180   gst_element_class_add_static_pad_template (gstelement_class,
181       &subtitle_sink_template);
182
183   gstelement_class->change_state =
184       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185   gstelement_class->request_new_pad =
186       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187   gstelement_class->release_pad =
188       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
189
190   gstbin_class->handle_message = bus_handler;
191
192   g_object_class_install_property (gobject_class, PROP_LOCATION,
193       g_param_spec_string ("location", "File Output Pattern",
194           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197       g_param_spec_double ("mux-overhead", "Muxing Overhead",
198           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199           DEFAULT_MUXER_OVERHEAD,
200           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
201
202   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211       g_param_spec_uint ("max-files", "Max files",
212           "Maximum number of files to keep on disk. Once the maximum is reached,"
213           "old files start to be deleted to make room for new ones.",
214           0, G_MAXUINT, DEFAULT_MAX_FILES,
215           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
216
217
218   g_object_class_install_property (gobject_class, PROP_MUXER,
219       g_param_spec_object ("muxer", "Muxer",
220           "The muxer element to use (NULL = default mp4mux)",
221           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_SINK,
223       g_param_spec_object ("sink", "Sink",
224           "The sink element (or element chain) to use (NULL = default filesink)",
225           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   /**
228    * GstSplitMuxSink::format-location:
229    * @splitmux: the #GstSplitMuxSink
230    * @fragment_id: the sequence number of the file to be created
231    *
232    * Returns: the location to be used for the next output file
233    */
234   signals[SIGNAL_FORMAT_LOCATION] =
235       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
237 }
238
239 static void
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
241 {
242   g_mutex_init (&splitmux->lock);
243   g_cond_init (&splitmux->data_cond);
244
245   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248   splitmux->max_files = DEFAULT_MAX_FILES;
249
250   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251 }
252
253 static void
254 gst_splitmux_reset (GstSplitMuxSink * splitmux)
255 {
256   if (splitmux->mq)
257     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
258   if (splitmux->muxer)
259     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
260   if (splitmux->active_sink)
261     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
262
263   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
264       NULL;
265 }
266
267 static void
268 gst_splitmux_sink_dispose (GObject * object)
269 {
270   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
271
272   G_OBJECT_CLASS (parent_class)->dispose (object);
273
274   /* Calling parent dispose invalidates all child pointers */
275   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
276       NULL;
277 }
278
279 static void
280 gst_splitmux_sink_finalize (GObject * object)
281 {
282   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
283   g_cond_clear (&splitmux->data_cond);
284   g_mutex_clear (&splitmux->lock);
285   if (splitmux->provided_sink)
286     gst_object_unref (splitmux->provided_sink);
287   if (splitmux->provided_muxer)
288     gst_object_unref (splitmux->provided_muxer);
289
290   g_free (splitmux->location);
291
292   /* Make sure to free any un-released contexts */
293   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
294   g_list_free (splitmux->contexts);
295
296   G_OBJECT_CLASS (parent_class)->finalize (object);
297 }
298
299 static void
300 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
301     const GValue * value, GParamSpec * pspec)
302 {
303   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
304
305   switch (prop_id) {
306     case PROP_LOCATION:{
307       GST_OBJECT_LOCK (splitmux);
308       g_free (splitmux->location);
309       splitmux->location = g_value_dup_string (value);
310       GST_OBJECT_UNLOCK (splitmux);
311       break;
312     }
313     case PROP_MAX_SIZE_BYTES:
314       GST_OBJECT_LOCK (splitmux);
315       splitmux->threshold_bytes = g_value_get_uint64 (value);
316       GST_OBJECT_UNLOCK (splitmux);
317       break;
318     case PROP_MAX_SIZE_TIME:
319       GST_OBJECT_LOCK (splitmux);
320       splitmux->threshold_time = g_value_get_uint64 (value);
321       GST_OBJECT_UNLOCK (splitmux);
322       break;
323     case PROP_MAX_FILES:
324       GST_OBJECT_LOCK (splitmux);
325       splitmux->max_files = g_value_get_uint (value);
326       GST_OBJECT_UNLOCK (splitmux);
327       break;
328     case PROP_MUXER_OVERHEAD:
329       GST_OBJECT_LOCK (splitmux);
330       splitmux->mux_overhead = g_value_get_double (value);
331       GST_OBJECT_UNLOCK (splitmux);
332       break;
333     case PROP_SINK:
334       GST_OBJECT_LOCK (splitmux);
335       if (splitmux->provided_sink)
336         gst_object_unref (splitmux->provided_sink);
337       splitmux->provided_sink = g_value_dup_object (value);
338       GST_OBJECT_UNLOCK (splitmux);
339       break;
340     case PROP_MUXER:
341       GST_OBJECT_LOCK (splitmux);
342       if (splitmux->provided_muxer)
343         gst_object_unref (splitmux->provided_muxer);
344       splitmux->provided_muxer = g_value_dup_object (value);
345       GST_OBJECT_UNLOCK (splitmux);
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
349       break;
350   }
351 }
352
353 static void
354 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
355     GValue * value, GParamSpec * pspec)
356 {
357   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
358
359   switch (prop_id) {
360     case PROP_LOCATION:
361       GST_OBJECT_LOCK (splitmux);
362       g_value_set_string (value, splitmux->location);
363       GST_OBJECT_UNLOCK (splitmux);
364       break;
365     case PROP_MAX_SIZE_BYTES:
366       GST_OBJECT_LOCK (splitmux);
367       g_value_set_uint64 (value, splitmux->threshold_bytes);
368       GST_OBJECT_UNLOCK (splitmux);
369       break;
370     case PROP_MAX_SIZE_TIME:
371       GST_OBJECT_LOCK (splitmux);
372       g_value_set_uint64 (value, splitmux->threshold_time);
373       GST_OBJECT_UNLOCK (splitmux);
374       break;
375     case PROP_MAX_FILES:
376       GST_OBJECT_LOCK (splitmux);
377       g_value_set_uint (value, splitmux->max_files);
378       GST_OBJECT_UNLOCK (splitmux);
379       break;
380     case PROP_MUXER_OVERHEAD:
381       GST_OBJECT_LOCK (splitmux);
382       g_value_set_double (value, splitmux->mux_overhead);
383       GST_OBJECT_UNLOCK (splitmux);
384       break;
385     case PROP_SINK:
386       GST_OBJECT_LOCK (splitmux);
387       g_value_set_object (value, splitmux->provided_sink);
388       GST_OBJECT_UNLOCK (splitmux);
389       break;
390     case PROP_MUXER:
391       GST_OBJECT_LOCK (splitmux);
392       g_value_set_object (value, splitmux->provided_muxer);
393       GST_OBJECT_UNLOCK (splitmux);
394       break;
395     default:
396       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
397       break;
398   }
399 }
400
401 /* Convenience function */
402 static inline GstClockTimeDiff
403 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
404 {
405   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
406
407   if (GST_CLOCK_TIME_IS_VALID (val)) {
408     gboolean sign =
409         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
410     if (sign > 0)
411       res = val;
412     else if (sign < 0)
413       res = -val;
414   }
415   return res;
416 }
417
418 static GstPad *
419 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
420 {
421   gchar *tmp, *sinkname, *srcname;
422   GstPad *mq_src;
423
424   sinkname = gst_pad_get_name (sink_pad);
425   tmp = sinkname + 5;
426   srcname = g_strdup_printf ("src_%s", tmp);
427
428   mq_src = gst_element_get_static_pad (mq, srcname);
429
430   g_free (sinkname);
431   g_free (srcname);
432
433   return mq_src;
434 }
435
436 static gboolean
437 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
438     GstPad ** src_pad)
439 {
440   GstPad *mq_sink;
441   GstPad *mq_src;
442
443   /* Request a pad from multiqueue, then connect this one, then
444    * discover the corresponding output pad and return both */
445   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
446   if (mq_sink == NULL)
447     return FALSE;
448
449   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
450   if (mq_src == NULL)
451     goto fail;
452
453   *sink_pad = mq_sink;
454   *src_pad = mq_src;
455
456   return TRUE;
457
458 fail:
459   gst_element_release_request_pad (splitmux->mq, mq_sink);
460   return FALSE;
461 }
462
463 static MqStreamCtx *
464 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
465 {
466   MqStreamCtx *ctx;
467
468   ctx = g_new0 (MqStreamCtx, 1);
469   g_atomic_int_set (&ctx->refcount, 1);
470   ctx->splitmux = splitmux;
471   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
472   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
473   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
474   g_queue_init (&ctx->queued_bufs);
475   return ctx;
476 }
477
478 static void
479 mq_stream_ctx_free (MqStreamCtx * ctx)
480 {
481   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
482   g_queue_clear (&ctx->queued_bufs);
483   g_free (ctx);
484 }
485
486 static void
487 mq_stream_ctx_unref (MqStreamCtx * ctx)
488 {
489   if (g_atomic_int_dec_and_test (&ctx->refcount))
490     mq_stream_ctx_free (ctx);
491 }
492
493 static void
494 mq_stream_ctx_ref (MqStreamCtx * ctx)
495 {
496   g_atomic_int_inc (&ctx->refcount);
497 }
498
499 static void
500 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
501 {
502   ctx->sink_pad_block_id = 0;
503   mq_stream_ctx_unref (ctx);
504 }
505
506 static void
507 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
508 {
509   ctx->src_pad_block_id = 0;
510   mq_stream_ctx_unref (ctx);
511 }
512
513 static void
514 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
515 {
516   gchar *location = NULL;
517   GstMessage *msg;
518   const gchar *msg_name = opened ?
519       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
520
521   g_object_get (splitmux->sink, "location", &location, NULL);
522
523   msg = gst_message_new_element (GST_OBJECT (splitmux),
524       gst_structure_new (msg_name,
525           "location", G_TYPE_STRING, location,
526           "running-time", GST_TYPE_CLOCK_TIME,
527           splitmux->reference_ctx->out_running_time, NULL));
528   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
529
530   g_free (location);
531 }
532
533 /* Called with lock held, drops the lock to send EOS to the
534  * pad
535  */
536 static void
537 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
538 {
539   GstEvent *eos;
540   GstPad *pad;
541
542   eos = gst_event_new_eos ();
543   pad = gst_pad_get_peer (ctx->srcpad);
544
545   ctx->out_eos = TRUE;
546
547   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
548   GST_SPLITMUX_UNLOCK (splitmux);
549   gst_pad_send_event (pad, eos);
550   GST_SPLITMUX_LOCK (splitmux);
551
552   gst_object_unref (pad);
553 }
554
555 /* Called with splitmux lock held to check if this output
556  * context needs to sleep to wait for the release of the
557  * next GOP, or to send EOS to close out the current file
558  */
559 static void
560 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
561 {
562   do {
563
564     GST_LOG_OBJECT (ctx->srcpad,
565         "Checking running time %" GST_STIME_FORMAT " against max %"
566         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
567         GST_STIME_ARGS (splitmux->max_out_running_time));
568
569     if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
570         ctx->out_running_time < splitmux->max_out_running_time) {
571       splitmux->have_muxed_something = TRUE;
572       return;
573     }
574
575     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
576       return;
577
578     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
579       if (ctx->out_eos == FALSE) {
580         send_eos (splitmux, ctx);
581         continue;
582       }
583     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
584       start_next_fragment (splitmux);
585       continue;
586     }
587
588     GST_INFO_OBJECT (ctx->srcpad,
589         "Sleeping for running time %"
590         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
591         GST_STIME_ARGS (ctx->out_running_time),
592         GST_STIME_ARGS (splitmux->max_out_running_time));
593     ctx->out_blocked = TRUE;
594     /* Expand the mq if needed before sleeping */
595     check_queue_length (splitmux, ctx);
596     GST_SPLITMUX_WAIT (splitmux);
597     ctx->out_blocked = FALSE;
598     GST_INFO_OBJECT (ctx->srcpad,
599         "Woken for new max running time %" GST_STIME_FORMAT,
600         GST_STIME_ARGS (splitmux->max_out_running_time));
601   } while (1);
602 }
603
604 static GstPadProbeReturn
605 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
606 {
607   GstSplitMuxSink *splitmux = ctx->splitmux;
608   MqStreamBuf *buf_info = NULL;
609
610   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
611
612   /* FIXME: Handle buffer lists, until then make it clear they won't work */
613   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
614     g_warning ("Buffer list handling not implemented");
615     return GST_PAD_PROBE_DROP;
616   }
617   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
618     GstEvent *event = gst_pad_probe_info_get_event (info);
619
620     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
621
622     switch (GST_EVENT_TYPE (event)) {
623       case GST_EVENT_SEGMENT:
624         gst_event_copy_segment (event, &ctx->out_segment);
625         break;
626       case GST_EVENT_FLUSH_STOP:
627         GST_SPLITMUX_LOCK (splitmux);
628         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
629         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
630         g_queue_clear (&ctx->queued_bufs);
631         ctx->flushing = FALSE;
632         GST_SPLITMUX_UNLOCK (splitmux);
633         break;
634       case GST_EVENT_FLUSH_START:
635         GST_SPLITMUX_LOCK (splitmux);
636         GST_LOG_OBJECT (pad, "Flush start");
637         ctx->flushing = TRUE;
638         GST_SPLITMUX_BROADCAST (splitmux);
639         GST_SPLITMUX_UNLOCK (splitmux);
640         break;
641       case GST_EVENT_EOS:
642         GST_SPLITMUX_LOCK (splitmux);
643         if (splitmux->state == SPLITMUX_STATE_STOPPED)
644           goto beach;
645         ctx->out_eos = TRUE;
646         GST_SPLITMUX_UNLOCK (splitmux);
647         break;
648       case GST_EVENT_GAP:{
649         GstClockTime gap_ts;
650         GstClockTimeDiff rtime;
651
652         gst_event_parse_gap (event, &gap_ts, NULL);
653         if (gap_ts == GST_CLOCK_TIME_NONE)
654           break;
655
656         GST_SPLITMUX_LOCK (splitmux);
657
658         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
659
660         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
661             GST_STIME_ARGS (rtime));
662
663         if (splitmux->state == SPLITMUX_STATE_STOPPED)
664           goto beach;
665
666         if (rtime != GST_CLOCK_STIME_NONE) {
667           ctx->out_running_time = rtime;
668           complete_or_wait_on_out (splitmux, ctx);
669         }
670         GST_SPLITMUX_UNLOCK (splitmux);
671         break;
672       }
673       case GST_EVENT_CUSTOM_DOWNSTREAM:{
674         const GstStructure *s;
675         GstClockTimeDiff ts = 0;
676
677         s = gst_event_get_structure (event);
678         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
679           break;
680
681         gst_structure_get_int64 (s, "timestamp", &ts);
682
683         GST_SPLITMUX_LOCK (splitmux);
684
685         if (splitmux->state == SPLITMUX_STATE_STOPPED)
686           goto beach;
687         ctx->out_running_time = ts;
688         complete_or_wait_on_out (splitmux, ctx);
689         GST_SPLITMUX_UNLOCK (splitmux);
690         return GST_PAD_PROBE_DROP;
691       }
692       default:
693         break;
694     }
695     return GST_PAD_PROBE_PASS;
696   }
697
698   /* Allow everything through until the configured next stopping point */
699   GST_SPLITMUX_LOCK (splitmux);
700
701   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
702   if (buf_info == NULL)
703     /* Can only happen due to a poorly timed flush */
704     goto beach;
705
706   /* If we have popped a keyframe, decrement the queued_gop count */
707   if (buf_info->keyframe && splitmux->queued_gops > 0)
708     splitmux->queued_gops--;
709
710   ctx->out_running_time = buf_info->run_ts;
711
712   GST_LOG_OBJECT (splitmux,
713       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
714       " size %" G_GSIZE_FORMAT,
715       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
716
717   if (splitmux->opening_first_fragment) {
718     send_fragment_opened_closed_msg (splitmux, TRUE);
719     splitmux->opening_first_fragment = FALSE;
720   }
721
722   complete_or_wait_on_out (splitmux, ctx);
723
724   if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
725       splitmux->muxed_out_time < buf_info->run_ts)
726     splitmux->muxed_out_time = buf_info->run_ts;
727
728   splitmux->muxed_out_bytes += buf_info->buf_size;
729
730 #ifndef GST_DISABLE_GST_DEBUG
731   {
732     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
733     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
734         " run ts %" GST_STIME_FORMAT, buf,
735         GST_STIME_ARGS (ctx->out_running_time));
736   }
737 #endif
738
739   GST_SPLITMUX_UNLOCK (splitmux);
740
741   mq_stream_buf_free (buf_info);
742
743   return GST_PAD_PROBE_PASS;
744
745 beach:
746   GST_SPLITMUX_UNLOCK (splitmux);
747   return GST_PAD_PROBE_DROP;
748 }
749
750 static gboolean
751 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
752 {
753   return gst_pad_send_event (peer, gst_event_ref (*event));
754 }
755
756 static void
757 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
758 {
759   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
760
761   gst_pad_sticky_events_foreach (ctx->srcpad,
762       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
763
764   /* Clear EOS flag if not actually EOS */
765   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
766
767   gst_object_unref (peer);
768 }
769
770 /* Called with lock held when a fragment
771  * reaches EOS and it is time to restart
772  * a new fragment
773  */
774 static void
775 start_next_fragment (GstSplitMuxSink * splitmux)
776 {
777   /* 1 change to new file */
778   splitmux->switching_fragment = TRUE;
779
780   gst_element_set_locked_state (splitmux->muxer, TRUE);
781   gst_element_set_locked_state (splitmux->active_sink, TRUE);
782   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
783   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
784
785   set_next_filename (splitmux);
786
787   gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
788   gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
789   gst_element_set_locked_state (splitmux->muxer, FALSE);
790   gst_element_set_locked_state (splitmux->active_sink, FALSE);
791
792   splitmux->switching_fragment = FALSE;
793
794   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
795
796   /* Switch state and go back to processing */
797   if (!splitmux->reference_ctx->in_eos) {
798     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
799     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
800   } else {
801     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
802     splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
803     splitmux->have_muxed_something = FALSE;
804   }
805   splitmux->have_muxed_something =
806       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
807
808   /* Store the overflow parameters as the basis for the next fragment */
809   splitmux->mux_start_time = splitmux->muxed_out_time;
810   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
811
812   GST_DEBUG_OBJECT (splitmux,
813       "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
814       GST_STIME_ARGS (splitmux->max_out_running_time));
815
816   send_fragment_opened_closed_msg (splitmux, TRUE);
817
818   GST_SPLITMUX_BROADCAST (splitmux);
819 }
820
821 static void
822 bus_handler (GstBin * bin, GstMessage * message)
823 {
824   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
825
826   switch (GST_MESSAGE_TYPE (message)) {
827     case GST_MESSAGE_EOS:
828       /* If the state is draining out the current file, drop this EOS */
829       GST_SPLITMUX_LOCK (splitmux);
830
831       send_fragment_opened_closed_msg (splitmux, FALSE);
832
833       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
834           splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
835         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
836         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
837         GST_SPLITMUX_BROADCAST (splitmux);
838
839         gst_message_unref (message);
840         GST_SPLITMUX_UNLOCK (splitmux);
841         return;
842       }
843       GST_SPLITMUX_UNLOCK (splitmux);
844       break;
845     case GST_MESSAGE_ASYNC_START:
846     case GST_MESSAGE_ASYNC_DONE:
847       /* Ignore state changes from our children while switching */
848       if (splitmux->switching_fragment) {
849         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink ||
850             GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
851           GST_LOG_OBJECT (splitmux,
852               "Ignoring state change from child %" GST_PTR_FORMAT
853               " while switching", GST_MESSAGE_SRC (message));
854           gst_message_unref (message);
855           return;
856         }
857       }
858       break;
859     default:
860       break;
861   }
862
863   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
864 }
865
866 /* Called with splitmux lock held */
867 /* Called when entering ProcessingCompleteGop state
868  * Assess if mq contents overflowed the current file
869  *   -> If yes, need to switch to new file
870  *   -> if no, set max_out_running_time to let this GOP in and
871  *      go to COLLECTING_GOP_START state
872  */
873 static void
874 handle_gathered_gop (GstSplitMuxSink * splitmux)
875 {
876   GList *cur;
877   gsize queued_bytes = 0;
878   GstClockTimeDiff queued_time = 0;
879
880   /* Assess if the multiqueue contents overflowed the current file */
881   for (cur = g_list_first (splitmux->contexts);
882       cur != NULL; cur = g_list_next (cur)) {
883     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
884     if (tmpctx->in_running_time > queued_time)
885       queued_time = tmpctx->in_running_time;
886     queued_bytes += tmpctx->in_bytes;
887   }
888
889   g_assert (queued_bytes >= splitmux->mux_start_bytes);
890   g_assert (queued_time >= splitmux->mux_start_time);
891
892   queued_bytes -= splitmux->mux_start_bytes;
893   queued_time -= splitmux->mux_start_time;
894
895   /* Expand queued bytes estimate by muxer overhead */
896   queued_bytes += (queued_bytes * splitmux->mux_overhead);
897
898   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
899       " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
900
901   /* Check for overrun - have we output at least one byte and overrun
902    * either threshold? */
903   if ((splitmux->have_muxed_something &&
904           ((splitmux->threshold_bytes > 0 &&
905                   queued_bytes > splitmux->threshold_bytes) ||
906               (splitmux->threshold_time > 0 &&
907                   queued_time > splitmux->threshold_time)))) {
908
909     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
910
911     GST_INFO_OBJECT (splitmux,
912         "mq overflowed since last, draining out. max out TS is %"
913         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
914     GST_SPLITMUX_BROADCAST (splitmux);
915
916   } else {
917     /* No overflow */
918     GST_LOG_OBJECT (splitmux,
919         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
920         " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
921         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
922         queued_bytes, GST_STIME_ARGS (queued_time));
923
924     /* Wake everyone up to push this one GOP, then sleep */
925     splitmux->have_muxed_something = TRUE;
926
927     if (!splitmux->reference_ctx->in_eos) {
928       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
929       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
930     } else {
931       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
932       splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
933     }
934
935     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
936         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
937     GST_SPLITMUX_BROADCAST (splitmux);
938   }
939
940 }
941
942 /* Called with splitmux lock held */
943 /* Called from each input pad when it is has all the pieces
944  * for a GOP or EOS, starting with the reference pad which has set the
945  * splitmux->max_in_running_time
946  */
947 static void
948 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
949 {
950   GList *cur;
951   gboolean ready = TRUE;
952   GstClockTimeDiff current_max_in_running_time;
953
954   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
955     /* Iterate each pad, and check that the input running time is at least
956      * up to the reference running time, and if so handle the collected GOP */
957     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
958         GST_STIME_FORMAT " ctx %p",
959         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
960     for (cur = g_list_first (splitmux->contexts); cur != NULL;
961         cur = g_list_next (cur)) {
962       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
963
964       GST_LOG_OBJECT (splitmux,
965           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
966           " EOS %d", tmpctx, tmpctx->srcpad,
967           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
968
969       if (splitmux->max_in_running_time != G_MAXINT64 &&
970           tmpctx->in_running_time < splitmux->max_in_running_time &&
971           !tmpctx->in_eos) {
972         GST_LOG_OBJECT (splitmux,
973             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
974             tmpctx, tmpctx->srcpad);
975         ready = FALSE;
976         break;
977       }
978     }
979     if (ready) {
980       GST_DEBUG_OBJECT (splitmux,
981           "Collected GOP is complete. Processing (ctx %p)", ctx);
982       /* All pads have a complete GOP, release it into the multiqueue */
983       handle_gathered_gop (splitmux);
984     }
985   }
986
987   /* If upstream reached EOS we are not expecting more data, no need to wait
988    * here. */
989   if (ctx->in_eos)
990     return;
991
992   /* Some pad is not yet ready, or GOP is being pushed
993    * either way, sleep and wait to get woken */
994   current_max_in_running_time = splitmux->max_in_running_time;
995   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
996           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
997       !ctx->flushing &&
998       (current_max_in_running_time == splitmux->max_in_running_time)) {
999
1000     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
1001         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
1002         "GOP complete" : "EOF draining", ctx);
1003     GST_SPLITMUX_WAIT (splitmux);
1004
1005     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
1006   }
1007 }
1008
1009 static void
1010 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1011 {
1012   GList *cur;
1013   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
1014
1015   GST_DEBUG_OBJECT (ctx->sinkpad,
1016       "Checking queue length len %u cur_max %u queued gops %u",
1017       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
1018
1019   if (cur_len >= splitmux->mq_max_buffers) {
1020     gboolean allow_grow = FALSE;
1021
1022     /* If collecting a GOP and this pad might block,
1023      * and there isn't already a pending GOP in the queue
1024      * then grow
1025      */
1026     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
1027         ctx->in_running_time < splitmux->max_in_running_time &&
1028         splitmux->queued_gops <= 1) {
1029       allow_grow = TRUE;
1030     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
1031         ctx->is_reference && splitmux->queued_gops <= 1) {
1032       allow_grow = TRUE;
1033     }
1034
1035     if (!allow_grow) {
1036       for (cur = g_list_first (splitmux->contexts);
1037           cur != NULL; cur = g_list_next (cur)) {
1038         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1039         GST_DEBUG_OBJECT (tmpctx->sinkpad,
1040             " len %u out_blocked %d",
1041             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1042         /* If another stream is starving, grow */
1043         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1044           allow_grow = TRUE;
1045         }
1046       }
1047     }
1048
1049     if (allow_grow) {
1050       splitmux->mq_max_buffers = cur_len + 1;
1051
1052       GST_INFO_OBJECT (splitmux,
1053           "Multiqueue overrun - enlarging to %u buffers ctx %p",
1054           splitmux->mq_max_buffers, ctx);
1055
1056       g_object_set (splitmux->mq, "max-size-buffers",
1057           splitmux->mq_max_buffers, NULL);
1058     }
1059   }
1060 }
1061
1062 static GstPadProbeReturn
1063 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1064 {
1065   GstSplitMuxSink *splitmux = ctx->splitmux;
1066   GstBuffer *buf;
1067   MqStreamBuf *buf_info = NULL;
1068   GstClockTime ts;
1069   gboolean loop_again;
1070   gboolean keyframe = FALSE;
1071
1072   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1073
1074   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1075   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1076     g_warning ("Buffer list handling not implemented");
1077     return GST_PAD_PROBE_DROP;
1078   }
1079   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1080     GstEvent *event = gst_pad_probe_info_get_event (info);
1081     switch (GST_EVENT_TYPE (event)) {
1082       case GST_EVENT_SEGMENT:
1083         gst_event_copy_segment (event, &ctx->in_segment);
1084         break;
1085       case GST_EVENT_FLUSH_STOP:
1086         GST_SPLITMUX_LOCK (splitmux);
1087         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1088         ctx->in_eos = FALSE;
1089         ctx->in_bytes = 0;
1090         ctx->in_running_time = GST_CLOCK_STIME_NONE;
1091         GST_SPLITMUX_UNLOCK (splitmux);
1092         break;
1093       case GST_EVENT_EOS:
1094         GST_SPLITMUX_LOCK (splitmux);
1095         ctx->in_eos = TRUE;
1096
1097         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1098           goto beach;
1099
1100         if (ctx->is_reference) {
1101           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1102           /* Act as if this is a new keyframe with infinite timestamp */
1103           splitmux->max_in_running_time = G_MAXINT64;
1104           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1105           /* Wake up other input pads to collect this GOP */
1106           GST_SPLITMUX_BROADCAST (splitmux);
1107           check_completed_gop (splitmux, ctx);
1108         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1109           /* If we are waiting for a GOP to be completed (ie, for aux
1110            * pads to catch up), then this pad is complete, so check
1111            * if the whole GOP is.
1112            */
1113           check_completed_gop (splitmux, ctx);
1114         }
1115         GST_SPLITMUX_UNLOCK (splitmux);
1116         break;
1117       default:
1118         break;
1119     }
1120     return GST_PAD_PROBE_PASS;
1121   }
1122
1123   buf = gst_pad_probe_info_get_buffer (info);
1124   buf_info = mq_stream_buf_new ();
1125
1126   if (GST_BUFFER_PTS_IS_VALID (buf))
1127     ts = GST_BUFFER_PTS (buf);
1128   else
1129     ts = GST_BUFFER_DTS (buf);
1130
1131   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1132
1133   GST_SPLITMUX_LOCK (splitmux);
1134
1135   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1136     goto beach;
1137
1138   /* If this buffer has a timestamp, advance the input timestamp of the
1139    * stream */
1140   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1141     GstClockTimeDiff running_time =
1142         my_segment_to_running_time (&ctx->in_segment, ts);
1143
1144     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1145         GST_STIME_ARGS (running_time));
1146
1147     if (GST_CLOCK_STIME_IS_VALID (running_time)
1148         && running_time > ctx->in_running_time)
1149       ctx->in_running_time = running_time;
1150   }
1151
1152   /* Try to make sure we have a valid running time */
1153   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1154     ctx->in_running_time =
1155         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1156   }
1157
1158   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1159       GST_STIME_ARGS (ctx->in_running_time));
1160
1161   buf_info->run_ts = ctx->in_running_time;
1162   buf_info->buf_size = gst_buffer_get_size (buf);
1163
1164   /* Update total input byte counter for overflow detect */
1165   ctx->in_bytes += buf_info->buf_size;
1166
1167   /* initialize mux_start_time */
1168   if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
1169     splitmux->mux_start_time = buf_info->run_ts;
1170     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1171         GST_STIME_ARGS (splitmux->mux_start_time));
1172     /* Also take this as the first start time when starting up,
1173      * so that we start counting overflow from the first frame */
1174     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1175       splitmux->max_in_running_time = splitmux->mux_start_time;
1176   }
1177
1178   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1179       " total in_bytes %" G_GSIZE_FORMAT,
1180       GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1181
1182   loop_again = TRUE;
1183   do {
1184     if (ctx->flushing)
1185       break;
1186
1187     switch (splitmux->state) {
1188       case SPLITMUX_STATE_COLLECTING_GOP_START:
1189         if (ctx->is_reference) {
1190           /* If a keyframe, we have a complete GOP */
1191           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1192               !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1193               splitmux->max_in_running_time >= ctx->in_running_time) {
1194             /* Pass this buffer through */
1195             loop_again = FALSE;
1196             break;
1197           }
1198           GST_INFO_OBJECT (pad,
1199               "Have keyframe with running time %" GST_STIME_FORMAT,
1200               GST_STIME_ARGS (ctx->in_running_time));
1201           keyframe = TRUE;
1202           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1203           splitmux->max_in_running_time = ctx->in_running_time;
1204           /* Wake up other input pads to collect this GOP */
1205           GST_SPLITMUX_BROADCAST (splitmux);
1206           check_completed_gop (splitmux, ctx);
1207         } else {
1208           /* We're still waiting for a keyframe on the reference pad, sleep */
1209           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1210           GST_SPLITMUX_WAIT (splitmux);
1211           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1212               splitmux->state);
1213         }
1214         break;
1215       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1216
1217         /* If we overran the target timestamp, it might be time to process
1218          * the GOP, otherwise bail out for more data
1219          */
1220         GST_LOG_OBJECT (pad,
1221             "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
1222             GST_STIME_ARGS (ctx->in_running_time),
1223             GST_STIME_ARGS (splitmux->max_in_running_time));
1224
1225         if (ctx->in_running_time < splitmux->max_in_running_time) {
1226           loop_again = FALSE;
1227           break;
1228         }
1229
1230         GST_LOG_OBJECT (pad,
1231             "Collected last packet of GOP. Checking other pads");
1232         check_completed_gop (splitmux, ctx);
1233         break;
1234       case SPLITMUX_STATE_ENDING_FILE:{
1235         GstEvent *event;
1236
1237         /* If somes streams received no buffer during the last GOP that overran,
1238          * because its next buffer has a timestamp bigger than
1239          * ctx->max_in_running_time, its queue is empty. In that case the only
1240          * way to wakeup the output thread is by injecting an event in the
1241          * queue. This usually happen with subtitle streams.
1242          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1243         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1244         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1245             GST_EVENT_TYPE_SERIALIZED,
1246             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1247                 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1248
1249         GST_SPLITMUX_UNLOCK (splitmux);
1250         gst_pad_send_event (ctx->sinkpad, event);
1251         GST_SPLITMUX_LOCK (splitmux);
1252         /* state may have changed while we were unlocked. Loop again if so */
1253         if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
1254           break;
1255         /* fallthrough */
1256       }
1257       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1258         /* A fragment is ending, wait until that's done before continuing */
1259         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1260         GST_SPLITMUX_WAIT (splitmux);
1261         GST_DEBUG_OBJECT (pad,
1262             "Done sleeping for fragment restart state now %d", splitmux->state);
1263         break;
1264       default:
1265         loop_again = FALSE;
1266         break;
1267     }
1268   } while (loop_again);
1269
1270   if (keyframe) {
1271     splitmux->queued_gops++;
1272     buf_info->keyframe = TRUE;
1273   }
1274
1275   /* Now add this buffer to the queue just before returning */
1276   g_queue_push_head (&ctx->queued_bufs, buf_info);
1277
1278   /* Check the buffer will fit in the mq */
1279   check_queue_length (splitmux, ctx);
1280
1281   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1282       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1283
1284   GST_SPLITMUX_UNLOCK (splitmux);
1285   return GST_PAD_PROBE_PASS;
1286
1287 beach:
1288   GST_SPLITMUX_UNLOCK (splitmux);
1289   if (buf_info)
1290     mq_stream_buf_free (buf_info);
1291   return GST_PAD_PROBE_PASS;
1292 }
1293
1294 static GstPad *
1295 gst_splitmux_sink_request_new_pad (GstElement * element,
1296     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1297 {
1298   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1299   GstPadTemplate *mux_template = NULL;
1300   GstPad *res = NULL;
1301   GstPad *mq_sink, *mq_src;
1302   gchar *gname;
1303   gboolean is_video = FALSE;
1304   MqStreamCtx *ctx;
1305
1306   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1307
1308   GST_SPLITMUX_LOCK (splitmux);
1309   if (!create_elements (splitmux))
1310     goto fail;
1311
1312   if (templ->name_template) {
1313     if (g_str_equal (templ->name_template, "video")) {
1314       /* FIXME: Look for a pad template with matching caps, rather than by name */
1315       mux_template =
1316           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1317           (splitmux->muxer), "video_%u");
1318       is_video = TRUE;
1319       name = NULL;
1320     } else {
1321       mux_template =
1322           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1323           (splitmux->muxer), templ->name_template);
1324     }
1325     if (mux_template == NULL) {
1326       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1327       mux_template =
1328           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1329           (splitmux->muxer), "sink_%d");
1330     }
1331   }
1332
1333   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1334   if (res == NULL)
1335     goto fail;
1336
1337   if (is_video)
1338     gname = g_strdup ("video");
1339   else if (name == NULL)
1340     gname = gst_pad_get_name (res);
1341   else
1342     gname = g_strdup (name);
1343
1344   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1345     gst_element_release_request_pad (splitmux->muxer, res);
1346     gst_object_unref (GST_OBJECT (res));
1347     goto fail;
1348   }
1349
1350   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1351     gst_element_release_request_pad (splitmux->muxer, res);
1352     gst_object_unref (GST_OBJECT (res));
1353     gst_element_release_request_pad (splitmux->mq, mq_sink);
1354     gst_object_unref (GST_OBJECT (mq_sink));
1355     goto fail;
1356   }
1357
1358   gst_object_unref (GST_OBJECT (res));
1359
1360   ctx = mq_stream_ctx_new (splitmux);
1361   ctx->srcpad = mq_src;
1362   ctx->sinkpad = mq_sink;
1363
1364   mq_stream_ctx_ref (ctx);
1365   ctx->src_pad_block_id =
1366       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1367       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1368       _pad_block_destroy_src_notify);
1369   if (is_video && splitmux->reference_ctx != NULL) {
1370     splitmux->reference_ctx->is_reference = FALSE;
1371     splitmux->reference_ctx = NULL;
1372   }
1373   if (splitmux->reference_ctx == NULL) {
1374     splitmux->reference_ctx = ctx;
1375     ctx->is_reference = TRUE;
1376   }
1377
1378   res = gst_ghost_pad_new (gname, mq_sink);
1379   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1380
1381   mq_stream_ctx_ref (ctx);
1382   ctx->sink_pad_block_id =
1383       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1384       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1385       _pad_block_destroy_sink_notify);
1386
1387   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1388       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1389
1390   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1391
1392   g_free (gname);
1393
1394   gst_object_unref (mq_sink);
1395   gst_object_unref (mq_src);
1396
1397   gst_pad_set_active (res, TRUE);
1398   gst_element_add_pad (element, res);
1399   GST_SPLITMUX_UNLOCK (splitmux);
1400
1401   return res;
1402 fail:
1403   GST_SPLITMUX_UNLOCK (splitmux);
1404   return NULL;
1405 }
1406
1407 static void
1408 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1409 {
1410   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1411   GstPad *mqsink, *mqsrc, *muxpad;
1412   MqStreamCtx *ctx =
1413       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1414
1415   GST_SPLITMUX_LOCK (splitmux);
1416
1417   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1418     goto fail;                  /* Elements don't exist yet - nothing to release */
1419
1420   GST_INFO_OBJECT (pad, "releasing request pad");
1421
1422   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1423   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1424   muxpad = gst_pad_get_peer (mqsrc);
1425
1426   /* Remove the context from our consideration */
1427   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1428
1429   if (ctx->sink_pad_block_id)
1430     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1431
1432   if (ctx->src_pad_block_id)
1433     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1434
1435   /* Can release the context now */
1436   mq_stream_ctx_unref (ctx);
1437
1438   /* Release and free the mq input */
1439   gst_element_release_request_pad (splitmux->mq, mqsink);
1440
1441   /* Release and free the muxer input */
1442   gst_element_release_request_pad (splitmux->muxer, muxpad);
1443
1444   gst_object_unref (mqsink);
1445   gst_object_unref (mqsrc);
1446   gst_object_unref (muxpad);
1447
1448   gst_element_remove_pad (element, pad);
1449
1450   /* Reset the internal elements only after all request pads are released */
1451   if (splitmux->contexts == NULL)
1452     gst_splitmux_reset (splitmux);
1453
1454 fail:
1455   GST_SPLITMUX_UNLOCK (splitmux);
1456 }
1457
1458 static GstElement *
1459 create_element (GstSplitMuxSink * splitmux,
1460     const gchar * factory, const gchar * name)
1461 {
1462   GstElement *ret = gst_element_factory_make (factory, name);
1463   if (ret == NULL) {
1464     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1465     return NULL;
1466   }
1467
1468   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1469     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1470     gst_object_unref (ret);
1471     return NULL;
1472   }
1473
1474   return ret;
1475 }
1476
1477 static gboolean
1478 create_elements (GstSplitMuxSink * splitmux)
1479 {
1480   /* Create internal elements */
1481   if (splitmux->mq == NULL) {
1482     if ((splitmux->mq =
1483             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1484       goto fail;
1485
1486     splitmux->mq_max_buffers = 5;
1487     /* No bytes or time limit, we limit buffers manually */
1488     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1489         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1490   }
1491
1492   if (splitmux->muxer == NULL) {
1493     GstElement *provided_muxer = NULL;
1494
1495     GST_OBJECT_LOCK (splitmux);
1496     if (splitmux->provided_muxer != NULL)
1497       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1498     GST_OBJECT_UNLOCK (splitmux);
1499
1500     if (provided_muxer == NULL) {
1501       if ((splitmux->muxer =
1502               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1503         goto fail;
1504     } else {
1505       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1506         g_warning ("Could not add muxer element - splitmuxsink will not work");
1507         gst_object_unref (provided_muxer);
1508         goto fail;
1509       }
1510
1511       splitmux->muxer = provided_muxer;
1512       gst_object_unref (provided_muxer);
1513     }
1514   }
1515
1516   return TRUE;
1517 fail:
1518   return FALSE;
1519 }
1520
1521 static GstElement *
1522 find_sink (GstElement * e)
1523 {
1524   GstElement *res = NULL;
1525   GstIterator *iter;
1526   gboolean done = FALSE;
1527   GValue data = { 0, };
1528
1529   if (!GST_IS_BIN (e))
1530     return e;
1531
1532   iter = gst_bin_iterate_sinks (GST_BIN (e));
1533   while (!done) {
1534     switch (gst_iterator_next (iter, &data)) {
1535       case GST_ITERATOR_OK:
1536       {
1537         GstElement *child = g_value_get_object (&data);
1538         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1539                 "location") != NULL) {
1540           res = child;
1541           done = TRUE;
1542         }
1543         g_value_reset (&data);
1544         break;
1545       }
1546       case GST_ITERATOR_RESYNC:
1547         gst_iterator_resync (iter);
1548         break;
1549       case GST_ITERATOR_DONE:
1550         done = TRUE;
1551         break;
1552       case GST_ITERATOR_ERROR:
1553         g_assert_not_reached ();
1554         break;
1555     }
1556   }
1557   g_value_unset (&data);
1558   gst_iterator_free (iter);
1559
1560   return res;
1561 }
1562
1563 static gboolean
1564 create_sink (GstSplitMuxSink * splitmux)
1565 {
1566   GstElement *provided_sink = NULL;
1567
1568   if (splitmux->active_sink == NULL) {
1569
1570     GST_OBJECT_LOCK (splitmux);
1571     if (splitmux->provided_sink != NULL)
1572       provided_sink = gst_object_ref (splitmux->provided_sink);
1573     GST_OBJECT_UNLOCK (splitmux);
1574
1575     if (provided_sink == NULL) {
1576       if ((splitmux->sink =
1577               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1578         goto fail;
1579       splitmux->active_sink = splitmux->sink;
1580     } else {
1581       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1582         g_warning ("Could not add sink elements - splitmuxsink will not work");
1583         gst_object_unref (provided_sink);
1584         goto fail;
1585       }
1586
1587       splitmux->active_sink = provided_sink;
1588
1589       /* The bin holds a ref now, we can drop our tmp ref */
1590       gst_object_unref (provided_sink);
1591
1592       /* Find the sink element */
1593       splitmux->sink = find_sink (splitmux->active_sink);
1594       if (splitmux->sink == NULL) {
1595         g_warning
1596             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1597         goto fail;
1598       }
1599     }
1600
1601     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1602       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1603       goto fail;
1604     }
1605   }
1606
1607   return TRUE;
1608 fail:
1609   return FALSE;
1610 }
1611
1612 #ifdef __GNUC__
1613 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1614 #endif
1615 static void
1616 set_next_filename (GstSplitMuxSink * splitmux)
1617 {
1618   gchar *fname = NULL;
1619   gst_splitmux_sink_ensure_max_files (splitmux);
1620
1621   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1622       splitmux->fragment_id, &fname);
1623
1624   if (!fname)
1625     fname = splitmux->location ?
1626         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1627
1628   if (fname) {
1629     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1630     g_object_set (splitmux->sink, "location", fname, NULL);
1631     g_free (fname);
1632
1633     splitmux->fragment_id++;
1634   }
1635 }
1636
1637 static GstStateChangeReturn
1638 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1639 {
1640   GstStateChangeReturn ret;
1641   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1642
1643   switch (transition) {
1644     case GST_STATE_CHANGE_NULL_TO_READY:{
1645       GST_SPLITMUX_LOCK (splitmux);
1646       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1647         ret = GST_STATE_CHANGE_FAILURE;
1648         GST_SPLITMUX_UNLOCK (splitmux);
1649         goto beach;
1650       }
1651       GST_SPLITMUX_UNLOCK (splitmux);
1652       splitmux->fragment_id = 0;
1653       set_next_filename (splitmux);
1654       break;
1655     }
1656     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1657       GST_SPLITMUX_LOCK (splitmux);
1658       /* Start by collecting one input on each pad */
1659       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1660       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
1661       splitmux->muxed_out_time = splitmux->mux_start_time =
1662           GST_CLOCK_STIME_NONE;
1663       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1664       splitmux->opening_first_fragment = TRUE;
1665       GST_SPLITMUX_UNLOCK (splitmux);
1666       break;
1667     }
1668     case GST_STATE_CHANGE_PAUSED_TO_READY:
1669     case GST_STATE_CHANGE_READY_TO_NULL:
1670       GST_SPLITMUX_LOCK (splitmux);
1671       splitmux->state = SPLITMUX_STATE_STOPPED;
1672       /* Wake up any blocked threads */
1673       GST_LOG_OBJECT (splitmux,
1674           "State change -> NULL or READY. Waking threads");
1675       GST_SPLITMUX_BROADCAST (splitmux);
1676       GST_SPLITMUX_UNLOCK (splitmux);
1677       break;
1678     default:
1679       break;
1680   }
1681
1682   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1683   if (ret == GST_STATE_CHANGE_FAILURE)
1684     goto beach;
1685
1686   switch (transition) {
1687     case GST_STATE_CHANGE_READY_TO_NULL:
1688       GST_SPLITMUX_LOCK (splitmux);
1689       splitmux->fragment_id = 0;
1690       /* Reset internal elements only if no pad contexts are using them */
1691       if (splitmux->contexts == NULL)
1692         gst_splitmux_reset (splitmux);
1693       GST_SPLITMUX_UNLOCK (splitmux);
1694       break;
1695     default:
1696       break;
1697   }
1698
1699 beach:
1700
1701   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1702       ret == GST_STATE_CHANGE_FAILURE) {
1703     /* Cleanup elements on failed transition out of NULL */
1704     gst_splitmux_reset (splitmux);
1705   }
1706   return ret;
1707 }
1708
1709 gboolean
1710 register_splitmuxsink (GstPlugin * plugin)
1711 {
1712   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1713       "Split File Muxing Sink");
1714
1715   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1716       GST_TYPE_SPLITMUX_SINK);
1717 }
1718
1719 static void
1720 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1721 {
1722   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1723     splitmux->fragment_id = 0;
1724   }
1725 }