Revert "splitmuxsink: Use GstBin async-handling instead of our own."
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MAX_FILES,
75   PROP_MUXER_OVERHEAD,
76   PROP_MUXER,
77   PROP_SINK
78 };
79
80 #define DEFAULT_MAX_SIZE_TIME       0
81 #define DEFAULT_MAX_SIZE_BYTES      0
82 #define DEFAULT_MAX_FILES           0
83 #define DEFAULT_MUXER_OVERHEAD      0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
86
87 enum
88 {
89   SIGNAL_FORMAT_LOCATION,
90   SIGNAL_LAST
91 };
92
93 static guint signals[SIGNAL_LAST];
94
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
102     GST_PAD_SINK,
103     GST_PAD_REQUEST,
104     GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
107     GST_PAD_SINK,
108     GST_PAD_REQUEST,
109     GST_STATIC_CAPS_ANY);
110
111 static GQuark PAD_CONTEXT;
112
113 static void
114 _do_init (void)
115 {
116   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
117 }
118
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
121     _do_init ());
122
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126     const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128     GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
131
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
135
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137     element, GstStateChange transition);
138
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
144
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
146
147 static MqStreamBuf *
148 mq_stream_buf_new (void)
149 {
150   return g_slice_new0 (MqStreamBuf);
151 }
152
153 static void
154 mq_stream_buf_free (MqStreamBuf * data)
155 {
156   g_slice_free (MqStreamBuf, data);
157 }
158
159 static void
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
161 {
162   GObjectClass *gobject_class = (GObjectClass *) klass;
163   GstElementClass *gstelement_class = (GstElementClass *) klass;
164   GstBinClass *gstbin_class = (GstBinClass *) klass;
165
166   gobject_class->set_property = gst_splitmux_sink_set_property;
167   gobject_class->get_property = gst_splitmux_sink_get_property;
168   gobject_class->dispose = gst_splitmux_sink_dispose;
169   gobject_class->finalize = gst_splitmux_sink_finalize;
170
171   gst_element_class_set_static_metadata (gstelement_class,
172       "Split Muxing Bin", "Generic/Bin/Muxer",
173       "Convenience bin that muxes incoming streams into multiple time/size limited files",
174       "Jan Schmidt <jan@centricular.com>");
175
176   gst_element_class_add_static_pad_template (gstelement_class,
177       &video_sink_template);
178   gst_element_class_add_static_pad_template (gstelement_class,
179       &audio_sink_template);
180   gst_element_class_add_static_pad_template (gstelement_class,
181       &subtitle_sink_template);
182
183   gstelement_class->change_state =
184       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185   gstelement_class->request_new_pad =
186       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187   gstelement_class->release_pad =
188       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
189
190   gstbin_class->handle_message = bus_handler;
191
192   g_object_class_install_property (gobject_class, PROP_LOCATION,
193       g_param_spec_string ("location", "File Output Pattern",
194           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197       g_param_spec_double ("mux-overhead", "Muxing Overhead",
198           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199           DEFAULT_MUXER_OVERHEAD,
200           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
201
202   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211       g_param_spec_uint ("max-files", "Max files",
212           "Maximum number of files to keep on disk. Once the maximum is reached,"
213           "old files start to be deleted to make room for new ones.",
214           0, G_MAXUINT, DEFAULT_MAX_FILES,
215           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
216
217
218   g_object_class_install_property (gobject_class, PROP_MUXER,
219       g_param_spec_object ("muxer", "Muxer",
220           "The muxer element to use (NULL = default mp4mux)",
221           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_SINK,
223       g_param_spec_object ("sink", "Sink",
224           "The sink element (or element chain) to use (NULL = default filesink)",
225           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   /**
228    * GstSplitMuxSink::format-location:
229    * @splitmux: the #GstSplitMuxSink
230    * @fragment_id: the sequence number of the file to be created
231    *
232    * Returns: the location to be used for the next output file
233    */
234   signals[SIGNAL_FORMAT_LOCATION] =
235       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
237 }
238
239 static void
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
241 {
242   g_mutex_init (&splitmux->lock);
243   g_cond_init (&splitmux->data_cond);
244
245   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248   splitmux->max_files = DEFAULT_MAX_FILES;
249
250   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251 }
252
253 static void
254 gst_splitmux_reset (GstSplitMuxSink * splitmux)
255 {
256   if (splitmux->mq)
257     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
258   if (splitmux->muxer)
259     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
260   if (splitmux->active_sink)
261     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
262
263   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
264       NULL;
265 }
266
267 static void
268 gst_splitmux_sink_dispose (GObject * object)
269 {
270   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
271
272   G_OBJECT_CLASS (parent_class)->dispose (object);
273
274   /* Calling parent dispose invalidates all child pointers */
275   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
276       NULL;
277 }
278
279 static void
280 gst_splitmux_sink_finalize (GObject * object)
281 {
282   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
283   g_cond_clear (&splitmux->data_cond);
284   g_mutex_clear (&splitmux->lock);
285   if (splitmux->provided_sink)
286     gst_object_unref (splitmux->provided_sink);
287   if (splitmux->provided_muxer)
288     gst_object_unref (splitmux->provided_muxer);
289
290   g_free (splitmux->location);
291
292   /* Make sure to free any un-released contexts */
293   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
294   g_list_free (splitmux->contexts);
295
296   G_OBJECT_CLASS (parent_class)->finalize (object);
297 }
298
299 static void
300 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
301     const GValue * value, GParamSpec * pspec)
302 {
303   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
304
305   switch (prop_id) {
306     case PROP_LOCATION:{
307       GST_OBJECT_LOCK (splitmux);
308       g_free (splitmux->location);
309       splitmux->location = g_value_dup_string (value);
310       GST_OBJECT_UNLOCK (splitmux);
311       break;
312     }
313     case PROP_MAX_SIZE_BYTES:
314       GST_OBJECT_LOCK (splitmux);
315       splitmux->threshold_bytes = g_value_get_uint64 (value);
316       GST_OBJECT_UNLOCK (splitmux);
317       break;
318     case PROP_MAX_SIZE_TIME:
319       GST_OBJECT_LOCK (splitmux);
320       splitmux->threshold_time = g_value_get_uint64 (value);
321       GST_OBJECT_UNLOCK (splitmux);
322       break;
323     case PROP_MAX_FILES:
324       GST_OBJECT_LOCK (splitmux);
325       splitmux->max_files = g_value_get_uint (value);
326       GST_OBJECT_UNLOCK (splitmux);
327       break;
328     case PROP_MUXER_OVERHEAD:
329       GST_OBJECT_LOCK (splitmux);
330       splitmux->mux_overhead = g_value_get_double (value);
331       GST_OBJECT_UNLOCK (splitmux);
332       break;
333     case PROP_SINK:
334       GST_OBJECT_LOCK (splitmux);
335       if (splitmux->provided_sink)
336         gst_object_unref (splitmux->provided_sink);
337       splitmux->provided_sink = g_value_dup_object (value);
338       GST_OBJECT_UNLOCK (splitmux);
339       break;
340     case PROP_MUXER:
341       GST_OBJECT_LOCK (splitmux);
342       if (splitmux->provided_muxer)
343         gst_object_unref (splitmux->provided_muxer);
344       splitmux->provided_muxer = g_value_dup_object (value);
345       GST_OBJECT_UNLOCK (splitmux);
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
349       break;
350   }
351 }
352
353 static void
354 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
355     GValue * value, GParamSpec * pspec)
356 {
357   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
358
359   switch (prop_id) {
360     case PROP_LOCATION:
361       GST_OBJECT_LOCK (splitmux);
362       g_value_set_string (value, splitmux->location);
363       GST_OBJECT_UNLOCK (splitmux);
364       break;
365     case PROP_MAX_SIZE_BYTES:
366       GST_OBJECT_LOCK (splitmux);
367       g_value_set_uint64 (value, splitmux->threshold_bytes);
368       GST_OBJECT_UNLOCK (splitmux);
369       break;
370     case PROP_MAX_SIZE_TIME:
371       GST_OBJECT_LOCK (splitmux);
372       g_value_set_uint64 (value, splitmux->threshold_time);
373       GST_OBJECT_UNLOCK (splitmux);
374       break;
375     case PROP_MAX_FILES:
376       GST_OBJECT_LOCK (splitmux);
377       g_value_set_uint (value, splitmux->max_files);
378       GST_OBJECT_UNLOCK (splitmux);
379       break;
380     case PROP_MUXER_OVERHEAD:
381       GST_OBJECT_LOCK (splitmux);
382       g_value_set_double (value, splitmux->mux_overhead);
383       GST_OBJECT_UNLOCK (splitmux);
384       break;
385     case PROP_SINK:
386       GST_OBJECT_LOCK (splitmux);
387       g_value_set_object (value, splitmux->provided_sink);
388       GST_OBJECT_UNLOCK (splitmux);
389       break;
390     case PROP_MUXER:
391       GST_OBJECT_LOCK (splitmux);
392       g_value_set_object (value, splitmux->provided_muxer);
393       GST_OBJECT_UNLOCK (splitmux);
394       break;
395     default:
396       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
397       break;
398   }
399 }
400
401 /* Convenience function */
402 static inline GstClockTimeDiff
403 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
404 {
405   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
406
407   if (GST_CLOCK_TIME_IS_VALID (val)) {
408     gboolean sign =
409         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
410     if (sign > 0)
411       res = val;
412     else if (sign < 0)
413       res = -val;
414   }
415   return res;
416 }
417
418 static GstPad *
419 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
420 {
421   gchar *tmp, *sinkname, *srcname;
422   GstPad *mq_src;
423
424   sinkname = gst_pad_get_name (sink_pad);
425   tmp = sinkname + 5;
426   srcname = g_strdup_printf ("src_%s", tmp);
427
428   mq_src = gst_element_get_static_pad (mq, srcname);
429
430   g_free (sinkname);
431   g_free (srcname);
432
433   return mq_src;
434 }
435
436 static gboolean
437 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
438     GstPad ** src_pad)
439 {
440   GstPad *mq_sink;
441   GstPad *mq_src;
442
443   /* Request a pad from multiqueue, then connect this one, then
444    * discover the corresponding output pad and return both */
445   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
446   if (mq_sink == NULL)
447     return FALSE;
448
449   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
450   if (mq_src == NULL)
451     goto fail;
452
453   *sink_pad = mq_sink;
454   *src_pad = mq_src;
455
456   return TRUE;
457
458 fail:
459   gst_element_release_request_pad (splitmux->mq, mq_sink);
460   return FALSE;
461 }
462
463 static MqStreamCtx *
464 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
465 {
466   MqStreamCtx *ctx;
467
468   ctx = g_new0 (MqStreamCtx, 1);
469   g_atomic_int_set (&ctx->refcount, 1);
470   ctx->splitmux = splitmux;
471   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
472   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
473   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
474   g_queue_init (&ctx->queued_bufs);
475   return ctx;
476 }
477
478 static void
479 mq_stream_ctx_free (MqStreamCtx * ctx)
480 {
481   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
482   g_queue_clear (&ctx->queued_bufs);
483   g_free (ctx);
484 }
485
486 static void
487 mq_stream_ctx_unref (MqStreamCtx * ctx)
488 {
489   if (g_atomic_int_dec_and_test (&ctx->refcount))
490     mq_stream_ctx_free (ctx);
491 }
492
493 static void
494 mq_stream_ctx_ref (MqStreamCtx * ctx)
495 {
496   g_atomic_int_inc (&ctx->refcount);
497 }
498
499 static void
500 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
501 {
502   ctx->sink_pad_block_id = 0;
503   mq_stream_ctx_unref (ctx);
504 }
505
506 static void
507 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
508 {
509   ctx->src_pad_block_id = 0;
510   mq_stream_ctx_unref (ctx);
511 }
512
513 static void
514 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
515 {
516   gchar *location = NULL;
517   GstMessage *msg;
518   const gchar *msg_name = opened ?
519       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
520
521   g_object_get (splitmux->sink, "location", &location, NULL);
522
523   msg = gst_message_new_element (GST_OBJECT (splitmux),
524       gst_structure_new (msg_name,
525           "location", G_TYPE_STRING, location,
526           "running-time", GST_TYPE_CLOCK_TIME,
527           splitmux->reference_ctx->out_running_time, NULL));
528   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
529
530   g_free (location);
531 }
532
533 /* Called with lock held, drops the lock to send EOS to the
534  * pad
535  */
536 static void
537 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
538 {
539   GstEvent *eos;
540   GstPad *pad;
541
542   eos = gst_event_new_eos ();
543   pad = gst_pad_get_peer (ctx->srcpad);
544
545   ctx->out_eos = TRUE;
546
547   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
548   GST_SPLITMUX_UNLOCK (splitmux);
549   gst_pad_send_event (pad, eos);
550   GST_SPLITMUX_LOCK (splitmux);
551
552   gst_object_unref (pad);
553 }
554
555 /* Called with splitmux lock held to check if this output
556  * context needs to sleep to wait for the release of the
557  * next GOP, or to send EOS to close out the current file
558  */
559 static void
560 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
561 {
562   do {
563
564     GST_LOG_OBJECT (ctx->srcpad,
565         "Checking running time %" GST_STIME_FORMAT " against max %"
566         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
567         GST_STIME_ARGS (splitmux->max_out_running_time));
568
569     if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
570         ctx->out_running_time < splitmux->max_out_running_time) {
571       splitmux->have_muxed_something = TRUE;
572       return;
573     }
574
575     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
576       return;
577
578     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
579       if (ctx->out_eos == FALSE) {
580         send_eos (splitmux, ctx);
581         continue;
582       }
583     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
584       start_next_fragment (splitmux);
585       continue;
586     }
587
588     GST_INFO_OBJECT (ctx->srcpad,
589         "Sleeping for running time %"
590         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
591         GST_STIME_ARGS (ctx->out_running_time),
592         GST_STIME_ARGS (splitmux->max_out_running_time));
593     ctx->out_blocked = TRUE;
594     /* Expand the mq if needed before sleeping */
595     check_queue_length (splitmux, ctx);
596     GST_SPLITMUX_WAIT (splitmux);
597     ctx->out_blocked = FALSE;
598     GST_INFO_OBJECT (ctx->srcpad,
599         "Woken for new max running time %" GST_STIME_FORMAT,
600         GST_STIME_ARGS (splitmux->max_out_running_time));
601   } while (1);
602 }
603
604 static GstPadProbeReturn
605 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
606 {
607   GstSplitMuxSink *splitmux = ctx->splitmux;
608   MqStreamBuf *buf_info = NULL;
609
610   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
611
612   /* FIXME: Handle buffer lists, until then make it clear they won't work */
613   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
614     g_warning ("Buffer list handling not implemented");
615     return GST_PAD_PROBE_DROP;
616   }
617   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
618     GstEvent *event = gst_pad_probe_info_get_event (info);
619
620     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
621
622     switch (GST_EVENT_TYPE (event)) {
623       case GST_EVENT_SEGMENT:
624         gst_event_copy_segment (event, &ctx->out_segment);
625         break;
626       case GST_EVENT_FLUSH_STOP:
627         GST_SPLITMUX_LOCK (splitmux);
628         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
629         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
630         g_queue_clear (&ctx->queued_bufs);
631         ctx->flushing = FALSE;
632         GST_SPLITMUX_UNLOCK (splitmux);
633         break;
634       case GST_EVENT_FLUSH_START:
635         GST_SPLITMUX_LOCK (splitmux);
636         GST_LOG_OBJECT (pad, "Flush start");
637         ctx->flushing = TRUE;
638         GST_SPLITMUX_BROADCAST (splitmux);
639         GST_SPLITMUX_UNLOCK (splitmux);
640         break;
641       case GST_EVENT_EOS:
642         GST_SPLITMUX_LOCK (splitmux);
643         if (splitmux->state == SPLITMUX_STATE_STOPPED)
644           goto beach;
645         ctx->out_eos = TRUE;
646         GST_SPLITMUX_UNLOCK (splitmux);
647         break;
648       case GST_EVENT_GAP:{
649         GstClockTime gap_ts;
650         GstClockTimeDiff rtime;
651
652         gst_event_parse_gap (event, &gap_ts, NULL);
653         if (gap_ts == GST_CLOCK_TIME_NONE)
654           break;
655
656         GST_SPLITMUX_LOCK (splitmux);
657
658         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
659
660         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
661             GST_STIME_ARGS (rtime));
662
663         if (splitmux->state == SPLITMUX_STATE_STOPPED)
664           goto beach;
665
666         if (rtime != GST_CLOCK_STIME_NONE) {
667           ctx->out_running_time = rtime;
668           complete_or_wait_on_out (splitmux, ctx);
669         }
670         GST_SPLITMUX_UNLOCK (splitmux);
671         break;
672       }
673       case GST_EVENT_CUSTOM_DOWNSTREAM:{
674         const GstStructure *s;
675         GstClockTimeDiff ts = 0;
676
677         s = gst_event_get_structure (event);
678         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
679           break;
680
681         gst_structure_get_int64 (s, "timestamp", &ts);
682
683         GST_SPLITMUX_LOCK (splitmux);
684
685         if (splitmux->state == SPLITMUX_STATE_STOPPED)
686           goto beach;
687         ctx->out_running_time = ts;
688         complete_or_wait_on_out (splitmux, ctx);
689         GST_SPLITMUX_UNLOCK (splitmux);
690         return GST_PAD_PROBE_DROP;
691       }
692       default:
693         break;
694     }
695     return GST_PAD_PROBE_PASS;
696   }
697
698   /* Allow everything through until the configured next stopping point */
699   GST_SPLITMUX_LOCK (splitmux);
700
701   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
702   if (buf_info == NULL)
703     /* Can only happen due to a poorly timed flush */
704     goto beach;
705
706   /* If we have popped a keyframe, decrement the queued_gop count */
707   if (buf_info->keyframe && splitmux->queued_gops > 0)
708     splitmux->queued_gops--;
709
710   ctx->out_running_time = buf_info->run_ts;
711
712   GST_LOG_OBJECT (splitmux,
713       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
714       " size %" G_GSIZE_FORMAT,
715       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
716
717   if (splitmux->opening_first_fragment) {
718     send_fragment_opened_closed_msg (splitmux, TRUE);
719     splitmux->opening_first_fragment = FALSE;
720   }
721
722   complete_or_wait_on_out (splitmux, ctx);
723
724   if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
725       splitmux->muxed_out_time < buf_info->run_ts)
726     splitmux->muxed_out_time = buf_info->run_ts;
727
728   splitmux->muxed_out_bytes += buf_info->buf_size;
729
730 #ifndef GST_DISABLE_GST_DEBUG
731   {
732     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
733     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
734         " run ts %" GST_STIME_FORMAT, buf,
735         GST_STIME_ARGS (ctx->out_running_time));
736   }
737 #endif
738
739   GST_SPLITMUX_UNLOCK (splitmux);
740
741   mq_stream_buf_free (buf_info);
742
743   return GST_PAD_PROBE_PASS;
744
745 beach:
746   GST_SPLITMUX_UNLOCK (splitmux);
747   return GST_PAD_PROBE_DROP;
748 }
749
750 static gboolean
751 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
752 {
753   return gst_pad_send_event (peer, gst_event_ref (*event));
754 }
755
756 static void
757 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
758 {
759   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
760
761   gst_pad_sticky_events_foreach (ctx->srcpad,
762       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
763
764   /* Clear EOS flag if not actually EOS */
765   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
766
767   gst_object_unref (peer);
768 }
769
770 /* Called with lock held when a fragment
771  * reaches EOS and it is time to restart
772  * a new fragment
773  */
774 static void
775 start_next_fragment (GstSplitMuxSink * splitmux)
776 {
777   /* 1 change to new file */
778   splitmux->switching_fragment = TRUE;
779
780   gst_element_set_locked_state (splitmux->muxer, TRUE);
781   gst_element_set_locked_state (splitmux->active_sink, TRUE);
782   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
783   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
784
785   set_next_filename (splitmux);
786
787   gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
788   gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
789   gst_element_set_locked_state (splitmux->muxer, FALSE);
790   gst_element_set_locked_state (splitmux->active_sink, FALSE);
791
792   splitmux->switching_fragment = FALSE;
793
794   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
795
796   /* Switch state and go back to processing */
797   if (!splitmux->reference_ctx->in_eos) {
798     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
799     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
800   } else {
801     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
802     splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
803     splitmux->have_muxed_something = FALSE;
804   }
805   splitmux->have_muxed_something =
806       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
807
808   /* Store the overflow parameters as the basis for the next fragment */
809   splitmux->mux_start_time = splitmux->muxed_out_time;
810   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
811
812   GST_DEBUG_OBJECT (splitmux,
813       "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
814       GST_STIME_ARGS (splitmux->max_out_running_time));
815
816   send_fragment_opened_closed_msg (splitmux, TRUE);
817
818   GST_SPLITMUX_BROADCAST (splitmux);
819 }
820
821 static void
822 bus_handler (GstBin * bin, GstMessage * message)
823 {
824   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
825
826   switch (GST_MESSAGE_TYPE (message)) {
827     case GST_MESSAGE_EOS:
828       /* If the state is draining out the current file, drop this EOS */
829       GST_SPLITMUX_LOCK (splitmux);
830
831       send_fragment_opened_closed_msg (splitmux, FALSE);
832
833       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
834           splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
835         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
836         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
837         GST_SPLITMUX_BROADCAST (splitmux);
838
839         gst_message_unref (message);
840         GST_SPLITMUX_UNLOCK (splitmux);
841         return;
842       }
843       GST_SPLITMUX_UNLOCK (splitmux);
844       break;
845     case GST_MESSAGE_ASYNC_START:
846     case GST_MESSAGE_ASYNC_DONE:
847       /* Ignore state changes from our children while switching */
848       if (splitmux->switching_fragment) {
849         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink ||
850             GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
851           GST_LOG_OBJECT (splitmux,
852               "Ignoring state change from child %" GST_PTR_FORMAT
853               " while switching", GST_MESSAGE_SRC (message));
854           gst_message_unref (message);
855           return;
856         }
857       }
858       break;
859     default:
860       break;
861   }
862
863   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
864 }
865
866 /* Called with splitmux lock held */
867 /* Called when entering ProcessingCompleteGop state
868  * Assess if mq contents overflowed the current file
869  *   -> If yes, need to switch to new file
870  *   -> if no, set max_out_running_time to let this GOP in and
871  *      go to COLLECTING_GOP_START state
872  */
873 static void
874 handle_gathered_gop (GstSplitMuxSink * splitmux)
875 {
876   GList *cur;
877   gsize queued_bytes = 0;
878   GstClockTimeDiff queued_time = 0;
879
880   /* Assess if the multiqueue contents overflowed the current file */
881   for (cur = g_list_first (splitmux->contexts);
882       cur != NULL; cur = g_list_next (cur)) {
883     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
884     if (tmpctx->in_running_time > queued_time)
885       queued_time = tmpctx->in_running_time;
886     queued_bytes += tmpctx->in_bytes;
887   }
888
889   g_assert (queued_bytes >= splitmux->mux_start_bytes);
890   g_assert (queued_time >= splitmux->mux_start_time);
891
892   queued_bytes -= splitmux->mux_start_bytes;
893   queued_time -= splitmux->mux_start_time;
894
895   /* Expand queued bytes estimate by muxer overhead */
896   queued_bytes += (queued_bytes * splitmux->mux_overhead);
897
898   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
899       " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
900
901   /* Check for overrun - have we output at least one byte and overrun
902    * either threshold? */
903   if ((splitmux->have_muxed_something &&
904           ((splitmux->threshold_bytes > 0 &&
905                   queued_bytes >= splitmux->threshold_bytes) ||
906               (splitmux->threshold_time > 0 &&
907                   queued_time >= splitmux->threshold_time)))) {
908
909     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
910
911     GST_INFO_OBJECT (splitmux,
912         "mq overflowed since last, draining out. max out TS is %"
913         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
914     GST_SPLITMUX_BROADCAST (splitmux);
915
916   } else {
917     /* No overflow */
918     GST_LOG_OBJECT (splitmux,
919         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
920         " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
921         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
922         queued_bytes, GST_STIME_ARGS (queued_time));
923
924     /* Wake everyone up to push this one GOP, then sleep */
925     splitmux->have_muxed_something = TRUE;
926
927     if (!splitmux->reference_ctx->in_eos) {
928       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
929       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
930     } else {
931       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
932       splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
933     }
934
935     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
936         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
937     GST_SPLITMUX_BROADCAST (splitmux);
938   }
939
940 }
941
942 /* Called with splitmux lock held */
943 /* Called from each input pad when it is has all the pieces
944  * for a GOP or EOS, starting with the reference pad which has set the
945  * splitmux->max_in_running_time
946  */
947 static void
948 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
949 {
950   GList *cur;
951   gboolean ready = TRUE;
952   GstClockTimeDiff current_max_in_running_time;
953
954   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
955     /* Iterate each pad, and check that the input running time is at least
956      * up to the reference running time, and if so handle the collected GOP */
957     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
958         GST_STIME_FORMAT " ctx %p",
959         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
960     for (cur = g_list_first (splitmux->contexts); cur != NULL;
961         cur = g_list_next (cur)) {
962       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
963
964       GST_LOG_OBJECT (splitmux,
965           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
966           " EOS %d", tmpctx, tmpctx->srcpad,
967           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
968
969       if (splitmux->max_in_running_time != G_MAXINT64 &&
970           tmpctx->in_running_time < splitmux->max_in_running_time &&
971           !tmpctx->in_eos) {
972         GST_LOG_OBJECT (splitmux,
973             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
974             tmpctx, tmpctx->srcpad);
975         ready = FALSE;
976         break;
977       }
978     }
979     if (ready) {
980       GST_DEBUG_OBJECT (splitmux,
981           "Collected GOP is complete. Processing (ctx %p)", ctx);
982       /* All pads have a complete GOP, release it into the multiqueue */
983       handle_gathered_gop (splitmux);
984     }
985   }
986
987   /* If upstream reached EOS we are not expecting more data, no need to wait
988    * here. */
989   if (ctx->in_eos)
990     return;
991
992   /* Some pad is not yet ready, or GOP is being pushed
993    * either way, sleep and wait to get woken */
994   current_max_in_running_time = splitmux->max_in_running_time;
995   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
996           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
997       !ctx->flushing &&
998       (current_max_in_running_time == splitmux->max_in_running_time)) {
999
1000     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
1001         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
1002         "GOP complete" : "EOF draining", ctx);
1003     GST_SPLITMUX_WAIT (splitmux);
1004
1005     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
1006   }
1007 }
1008
1009 static void
1010 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1011 {
1012   GList *cur;
1013   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
1014
1015   GST_DEBUG_OBJECT (ctx->sinkpad,
1016       "Checking queue length len %u cur_max %u queued gops %u",
1017       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
1018
1019   if (cur_len >= splitmux->mq_max_buffers) {
1020     gboolean allow_grow = FALSE;
1021
1022     /* If collecting a GOP and this pad might block,
1023      * and there isn't already a pending GOP in the queue
1024      * then grow
1025      */
1026     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
1027         ctx->in_running_time < splitmux->max_in_running_time &&
1028         splitmux->queued_gops <= 1) {
1029       allow_grow = TRUE;
1030     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
1031         ctx->is_reference && splitmux->queued_gops <= 1) {
1032       allow_grow = TRUE;
1033     }
1034
1035     if (!allow_grow) {
1036       for (cur = g_list_first (splitmux->contexts);
1037           cur != NULL; cur = g_list_next (cur)) {
1038         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1039         GST_DEBUG_OBJECT (tmpctx->sinkpad,
1040             " len %u out_blocked %d",
1041             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1042         /* If another stream is starving, grow */
1043         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1044           allow_grow = TRUE;
1045         }
1046       }
1047     }
1048
1049     if (allow_grow) {
1050       splitmux->mq_max_buffers = cur_len + 1;
1051
1052       GST_INFO_OBJECT (splitmux,
1053           "Multiqueue overrun - enlarging to %u buffers ctx %p",
1054           splitmux->mq_max_buffers, ctx);
1055
1056       g_object_set (splitmux->mq, "max-size-buffers",
1057           splitmux->mq_max_buffers, NULL);
1058     }
1059   }
1060 }
1061
1062 static GstPadProbeReturn
1063 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1064 {
1065   GstSplitMuxSink *splitmux = ctx->splitmux;
1066   GstBuffer *buf;
1067   MqStreamBuf *buf_info = NULL;
1068   GstClockTime ts;
1069   gboolean loop_again;
1070   gboolean keyframe = FALSE;
1071
1072   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1073
1074   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1075   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1076     g_warning ("Buffer list handling not implemented");
1077     return GST_PAD_PROBE_DROP;
1078   }
1079   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1080     GstEvent *event = gst_pad_probe_info_get_event (info);
1081     switch (GST_EVENT_TYPE (event)) {
1082       case GST_EVENT_SEGMENT:
1083         gst_event_copy_segment (event, &ctx->in_segment);
1084         break;
1085       case GST_EVENT_FLUSH_STOP:
1086         GST_SPLITMUX_LOCK (splitmux);
1087         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1088         ctx->in_eos = FALSE;
1089         ctx->in_bytes = 0;
1090         ctx->in_running_time = GST_CLOCK_STIME_NONE;
1091         GST_SPLITMUX_UNLOCK (splitmux);
1092         break;
1093       case GST_EVENT_EOS:
1094         GST_SPLITMUX_LOCK (splitmux);
1095         ctx->in_eos = TRUE;
1096
1097         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1098           goto beach;
1099
1100         if (ctx->is_reference) {
1101           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1102           /* Act as if this is a new keyframe with infinite timestamp */
1103           splitmux->max_in_running_time = G_MAXINT64;
1104           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1105           /* Wake up other input pads to collect this GOP */
1106           GST_SPLITMUX_BROADCAST (splitmux);
1107           check_completed_gop (splitmux, ctx);
1108         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1109           /* If we are waiting for a GOP to be completed (ie, for aux
1110            * pads to catch up), then this pad is complete, so check
1111            * if the whole GOP is.
1112            */
1113           check_completed_gop (splitmux, ctx);
1114         }
1115         GST_SPLITMUX_UNLOCK (splitmux);
1116         break;
1117       default:
1118         break;
1119     }
1120     return GST_PAD_PROBE_PASS;
1121   }
1122
1123   buf = gst_pad_probe_info_get_buffer (info);
1124   buf_info = mq_stream_buf_new ();
1125
1126   if (GST_BUFFER_PTS_IS_VALID (buf))
1127     ts = GST_BUFFER_PTS (buf);
1128   else
1129     ts = GST_BUFFER_DTS (buf);
1130
1131   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1132
1133   GST_SPLITMUX_LOCK (splitmux);
1134
1135   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1136     goto beach;
1137
1138   /* If this buffer has a timestamp, advance the input timestamp of the
1139    * stream */
1140   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1141     GstClockTimeDiff running_time =
1142         my_segment_to_running_time (&ctx->in_segment,
1143         GST_BUFFER_TIMESTAMP (buf));
1144
1145     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1146         GST_STIME_ARGS (running_time));
1147
1148     if (GST_CLOCK_STIME_IS_VALID (running_time)
1149         && running_time > ctx->in_running_time)
1150       ctx->in_running_time = running_time;
1151   }
1152
1153   /* Try to make sure we have a valid running time */
1154   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1155     ctx->in_running_time =
1156         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1157   }
1158
1159   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1160       GST_STIME_ARGS (ctx->in_running_time));
1161
1162   buf_info->run_ts = ctx->in_running_time;
1163   buf_info->buf_size = gst_buffer_get_size (buf);
1164
1165   /* Update total input byte counter for overflow detect */
1166   ctx->in_bytes += buf_info->buf_size;
1167
1168   /* initialize mux_start_time */
1169   if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
1170     splitmux->mux_start_time = buf_info->run_ts;
1171     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1172         GST_STIME_ARGS (splitmux->mux_start_time));
1173     /* Also take this as the first start time when starting up,
1174      * so that we start counting overflow from the first frame */
1175     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1176       splitmux->max_in_running_time = splitmux->mux_start_time;
1177   }
1178
1179   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1180       " total in_bytes %" G_GSIZE_FORMAT,
1181       GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1182
1183   loop_again = TRUE;
1184   do {
1185     if (ctx->flushing)
1186       break;
1187
1188     switch (splitmux->state) {
1189       case SPLITMUX_STATE_COLLECTING_GOP_START:
1190         if (ctx->is_reference) {
1191           /* If a keyframe, we have a complete GOP */
1192           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1193               !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1194               splitmux->max_in_running_time >= ctx->in_running_time) {
1195             /* Pass this buffer through */
1196             loop_again = FALSE;
1197             break;
1198           }
1199           GST_INFO_OBJECT (pad,
1200               "Have keyframe with running time %" GST_STIME_FORMAT,
1201               GST_STIME_ARGS (ctx->in_running_time));
1202           keyframe = TRUE;
1203           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1204           splitmux->max_in_running_time = ctx->in_running_time;
1205           /* Wake up other input pads to collect this GOP */
1206           GST_SPLITMUX_BROADCAST (splitmux);
1207           check_completed_gop (splitmux, ctx);
1208         } else {
1209           /* We're still waiting for a keyframe on the reference pad, sleep */
1210           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1211           GST_SPLITMUX_WAIT (splitmux);
1212           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1213               splitmux->state);
1214         }
1215         break;
1216       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1217
1218         /* If we overran the target timestamp, it might be time to process
1219          * the GOP, otherwise bail out for more data
1220          */
1221         GST_LOG_OBJECT (pad,
1222             "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
1223             GST_STIME_ARGS (ctx->in_running_time),
1224             GST_STIME_ARGS (splitmux->max_in_running_time));
1225
1226         if (ctx->in_running_time < splitmux->max_in_running_time) {
1227           loop_again = FALSE;
1228           break;
1229         }
1230
1231         GST_LOG_OBJECT (pad,
1232             "Collected last packet of GOP. Checking other pads");
1233         check_completed_gop (splitmux, ctx);
1234         break;
1235       case SPLITMUX_STATE_ENDING_FILE:{
1236         GstEvent *event;
1237
1238         /* If somes streams received no buffer during the last GOP that overran,
1239          * because its next buffer has a timestamp bigger than
1240          * ctx->max_in_running_time, its queue is empty. In that case the only
1241          * way to wakeup the output thread is by injecting an event in the
1242          * queue. This usually happen with subtitle streams.
1243          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1244         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1245         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1246             GST_EVENT_TYPE_SERIALIZED,
1247             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1248                 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1249
1250         GST_SPLITMUX_UNLOCK (splitmux);
1251         gst_pad_send_event (ctx->sinkpad, event);
1252         GST_SPLITMUX_LOCK (splitmux);
1253         /* state may have changed while we were unlocked. Loop again if so */
1254         if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
1255           break;
1256         /* fallthrough */
1257       }
1258       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1259         /* A fragment is ending, wait until that's done before continuing */
1260         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1261         GST_SPLITMUX_WAIT (splitmux);
1262         GST_DEBUG_OBJECT (pad,
1263             "Done sleeping for fragment restart state now %d", splitmux->state);
1264         break;
1265       default:
1266         loop_again = FALSE;
1267         break;
1268     }
1269   } while (loop_again);
1270
1271   if (keyframe) {
1272     splitmux->queued_gops++;
1273     buf_info->keyframe = TRUE;
1274   }
1275
1276   /* Now add this buffer to the queue just before returning */
1277   g_queue_push_head (&ctx->queued_bufs, buf_info);
1278
1279   /* Check the buffer will fit in the mq */
1280   check_queue_length (splitmux, ctx);
1281
1282   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1283       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1284
1285   GST_SPLITMUX_UNLOCK (splitmux);
1286   return GST_PAD_PROBE_PASS;
1287
1288 beach:
1289   GST_SPLITMUX_UNLOCK (splitmux);
1290   if (buf_info)
1291     mq_stream_buf_free (buf_info);
1292   return GST_PAD_PROBE_PASS;
1293 }
1294
1295 static GstPad *
1296 gst_splitmux_sink_request_new_pad (GstElement * element,
1297     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1298 {
1299   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1300   GstPadTemplate *mux_template = NULL;
1301   GstPad *res = NULL;
1302   GstPad *mq_sink, *mq_src;
1303   gchar *gname;
1304   gboolean is_video = FALSE;
1305   MqStreamCtx *ctx;
1306
1307   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1308
1309   GST_SPLITMUX_LOCK (splitmux);
1310   if (!create_elements (splitmux))
1311     goto fail;
1312
1313   if (templ->name_template) {
1314     if (g_str_equal (templ->name_template, "video")) {
1315       /* FIXME: Look for a pad template with matching caps, rather than by name */
1316       mux_template =
1317           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1318           (splitmux->muxer), "video_%u");
1319       is_video = TRUE;
1320       name = NULL;
1321     } else {
1322       mux_template =
1323           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1324           (splitmux->muxer), templ->name_template);
1325     }
1326     if (mux_template == NULL) {
1327       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1328       mux_template =
1329           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1330           (splitmux->muxer), "sink_%d");
1331     }
1332   }
1333
1334   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1335   if (res == NULL)
1336     goto fail;
1337
1338   if (is_video)
1339     gname = g_strdup ("video");
1340   else if (name == NULL)
1341     gname = gst_pad_get_name (res);
1342   else
1343     gname = g_strdup (name);
1344
1345   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1346     gst_element_release_request_pad (splitmux->muxer, res);
1347     gst_object_unref (GST_OBJECT (res));
1348     goto fail;
1349   }
1350
1351   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1352     gst_element_release_request_pad (splitmux->muxer, res);
1353     gst_object_unref (GST_OBJECT (res));
1354     gst_element_release_request_pad (splitmux->mq, mq_sink);
1355     gst_object_unref (GST_OBJECT (mq_sink));
1356     goto fail;
1357   }
1358
1359   gst_object_unref (GST_OBJECT (res));
1360
1361   ctx = mq_stream_ctx_new (splitmux);
1362   ctx->srcpad = mq_src;
1363   ctx->sinkpad = mq_sink;
1364
1365   mq_stream_ctx_ref (ctx);
1366   ctx->src_pad_block_id =
1367       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1368       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1369       _pad_block_destroy_src_notify);
1370   if (is_video && splitmux->reference_ctx != NULL) {
1371     splitmux->reference_ctx->is_reference = FALSE;
1372     splitmux->reference_ctx = NULL;
1373   }
1374   if (splitmux->reference_ctx == NULL) {
1375     splitmux->reference_ctx = ctx;
1376     ctx->is_reference = TRUE;
1377   }
1378
1379   res = gst_ghost_pad_new (gname, mq_sink);
1380   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1381
1382   mq_stream_ctx_ref (ctx);
1383   ctx->sink_pad_block_id =
1384       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1385       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1386       _pad_block_destroy_sink_notify);
1387
1388   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1389       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1390
1391   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1392
1393   g_free (gname);
1394
1395   gst_object_unref (mq_sink);
1396   gst_object_unref (mq_src);
1397
1398   gst_pad_set_active (res, TRUE);
1399   gst_element_add_pad (element, res);
1400   GST_SPLITMUX_UNLOCK (splitmux);
1401
1402   return res;
1403 fail:
1404   GST_SPLITMUX_UNLOCK (splitmux);
1405   return NULL;
1406 }
1407
1408 static void
1409 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1410 {
1411   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1412   GstPad *mqsink, *mqsrc, *muxpad;
1413   MqStreamCtx *ctx =
1414       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1415
1416   GST_SPLITMUX_LOCK (splitmux);
1417
1418   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1419     goto fail;                  /* Elements don't exist yet - nothing to release */
1420
1421   GST_INFO_OBJECT (pad, "releasing request pad");
1422
1423   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1424   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1425   muxpad = gst_pad_get_peer (mqsrc);
1426
1427   /* Remove the context from our consideration */
1428   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1429
1430   if (ctx->sink_pad_block_id)
1431     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1432
1433   if (ctx->src_pad_block_id)
1434     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1435
1436   /* Can release the context now */
1437   mq_stream_ctx_unref (ctx);
1438
1439   /* Release and free the mq input */
1440   gst_element_release_request_pad (splitmux->mq, mqsink);
1441
1442   /* Release and free the muxer input */
1443   gst_element_release_request_pad (splitmux->muxer, muxpad);
1444
1445   gst_object_unref (mqsink);
1446   gst_object_unref (mqsrc);
1447   gst_object_unref (muxpad);
1448
1449   gst_element_remove_pad (element, pad);
1450
1451   /* Reset the internal elements only after all request pads are released */
1452   if (splitmux->contexts == NULL)
1453     gst_splitmux_reset (splitmux);
1454
1455 fail:
1456   GST_SPLITMUX_UNLOCK (splitmux);
1457 }
1458
1459 static GstElement *
1460 create_element (GstSplitMuxSink * splitmux,
1461     const gchar * factory, const gchar * name)
1462 {
1463   GstElement *ret = gst_element_factory_make (factory, name);
1464   if (ret == NULL) {
1465     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1466     return NULL;
1467   }
1468
1469   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1470     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1471     gst_object_unref (ret);
1472     return NULL;
1473   }
1474
1475   return ret;
1476 }
1477
1478 static gboolean
1479 create_elements (GstSplitMuxSink * splitmux)
1480 {
1481   /* Create internal elements */
1482   if (splitmux->mq == NULL) {
1483     if ((splitmux->mq =
1484             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1485       goto fail;
1486
1487     splitmux->mq_max_buffers = 5;
1488     /* No bytes or time limit, we limit buffers manually */
1489     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1490         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1491   }
1492
1493   if (splitmux->muxer == NULL) {
1494     GstElement *provided_muxer = NULL;
1495
1496     GST_OBJECT_LOCK (splitmux);
1497     if (splitmux->provided_muxer != NULL)
1498       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1499     GST_OBJECT_UNLOCK (splitmux);
1500
1501     if (provided_muxer == NULL) {
1502       if ((splitmux->muxer =
1503               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1504         goto fail;
1505     } else {
1506       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1507         g_warning ("Could not add muxer element - splitmuxsink will not work");
1508         gst_object_unref (provided_muxer);
1509         goto fail;
1510       }
1511
1512       splitmux->muxer = provided_muxer;
1513       gst_object_unref (provided_muxer);
1514     }
1515   }
1516
1517   return TRUE;
1518 fail:
1519   return FALSE;
1520 }
1521
1522 static GstElement *
1523 find_sink (GstElement * e)
1524 {
1525   GstElement *res = NULL;
1526   GstIterator *iter;
1527   gboolean done = FALSE;
1528   GValue data = { 0, };
1529
1530   if (!GST_IS_BIN (e))
1531     return e;
1532
1533   iter = gst_bin_iterate_sinks (GST_BIN (e));
1534   while (!done) {
1535     switch (gst_iterator_next (iter, &data)) {
1536       case GST_ITERATOR_OK:
1537       {
1538         GstElement *child = g_value_get_object (&data);
1539         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1540                 "location") != NULL) {
1541           res = child;
1542           done = TRUE;
1543         }
1544         g_value_reset (&data);
1545         break;
1546       }
1547       case GST_ITERATOR_RESYNC:
1548         gst_iterator_resync (iter);
1549         break;
1550       case GST_ITERATOR_DONE:
1551         done = TRUE;
1552         break;
1553       case GST_ITERATOR_ERROR:
1554         g_assert_not_reached ();
1555         break;
1556     }
1557   }
1558   g_value_unset (&data);
1559   gst_iterator_free (iter);
1560
1561   return res;
1562 }
1563
1564 static gboolean
1565 create_sink (GstSplitMuxSink * splitmux)
1566 {
1567   GstElement *provided_sink = NULL;
1568
1569   if (splitmux->active_sink == NULL) {
1570
1571     GST_OBJECT_LOCK (splitmux);
1572     if (splitmux->provided_sink != NULL)
1573       provided_sink = gst_object_ref (splitmux->provided_sink);
1574     GST_OBJECT_UNLOCK (splitmux);
1575
1576     if (provided_sink == NULL) {
1577       if ((splitmux->sink =
1578               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1579         goto fail;
1580       splitmux->active_sink = splitmux->sink;
1581     } else {
1582       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1583         g_warning ("Could not add sink elements - splitmuxsink will not work");
1584         gst_object_unref (provided_sink);
1585         goto fail;
1586       }
1587
1588       splitmux->active_sink = provided_sink;
1589
1590       /* The bin holds a ref now, we can drop our tmp ref */
1591       gst_object_unref (provided_sink);
1592
1593       /* Find the sink element */
1594       splitmux->sink = find_sink (splitmux->active_sink);
1595       if (splitmux->sink == NULL) {
1596         g_warning
1597             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1598         goto fail;
1599       }
1600     }
1601
1602     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1603       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1604       goto fail;
1605     }
1606   }
1607
1608   return TRUE;
1609 fail:
1610   return FALSE;
1611 }
1612
1613 #ifdef __GNUC__
1614 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1615 #endif
1616 static void
1617 set_next_filename (GstSplitMuxSink * splitmux)
1618 {
1619   gchar *fname = NULL;
1620   gst_splitmux_sink_ensure_max_files (splitmux);
1621
1622   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1623       splitmux->fragment_id, &fname);
1624
1625   if (!fname)
1626     fname = splitmux->location ?
1627         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1628
1629   if (fname) {
1630     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1631     g_object_set (splitmux->sink, "location", fname, NULL);
1632     g_free (fname);
1633
1634     splitmux->fragment_id++;
1635   }
1636 }
1637
1638 static GstStateChangeReturn
1639 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1640 {
1641   GstStateChangeReturn ret;
1642   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1643
1644   switch (transition) {
1645     case GST_STATE_CHANGE_NULL_TO_READY:{
1646       GST_SPLITMUX_LOCK (splitmux);
1647       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1648         ret = GST_STATE_CHANGE_FAILURE;
1649         GST_SPLITMUX_UNLOCK (splitmux);
1650         goto beach;
1651       }
1652       GST_SPLITMUX_UNLOCK (splitmux);
1653       splitmux->fragment_id = 0;
1654       set_next_filename (splitmux);
1655       break;
1656     }
1657     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1658       GST_SPLITMUX_LOCK (splitmux);
1659       /* Start by collecting one input on each pad */
1660       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1661       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
1662       splitmux->muxed_out_time = splitmux->mux_start_time =
1663           GST_CLOCK_STIME_NONE;
1664       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1665       splitmux->opening_first_fragment = TRUE;
1666       GST_SPLITMUX_UNLOCK (splitmux);
1667       break;
1668     }
1669     case GST_STATE_CHANGE_PAUSED_TO_READY:
1670     case GST_STATE_CHANGE_READY_TO_NULL:
1671       GST_SPLITMUX_LOCK (splitmux);
1672       splitmux->state = SPLITMUX_STATE_STOPPED;
1673       /* Wake up any blocked threads */
1674       GST_LOG_OBJECT (splitmux,
1675           "State change -> NULL or READY. Waking threads");
1676       GST_SPLITMUX_BROADCAST (splitmux);
1677       GST_SPLITMUX_UNLOCK (splitmux);
1678       break;
1679     default:
1680       break;
1681   }
1682
1683   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1684   if (ret == GST_STATE_CHANGE_FAILURE)
1685     goto beach;
1686
1687   switch (transition) {
1688     case GST_STATE_CHANGE_READY_TO_NULL:
1689       GST_SPLITMUX_LOCK (splitmux);
1690       splitmux->fragment_id = 0;
1691       /* Reset internal elements only if no pad contexts are using them */
1692       if (splitmux->contexts == NULL)
1693         gst_splitmux_reset (splitmux);
1694       GST_SPLITMUX_UNLOCK (splitmux);
1695       break;
1696     default:
1697       break;
1698   }
1699
1700 beach:
1701
1702   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1703       ret == GST_STATE_CHANGE_FAILURE) {
1704     /* Cleanup elements on failed transition out of NULL */
1705     gst_splitmux_reset (splitmux);
1706   }
1707   return ret;
1708 }
1709
1710 gboolean
1711 register_splitmuxsink (GstPlugin * plugin)
1712 {
1713   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1714       "Split File Muxing Sink");
1715
1716   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1717       GST_TYPE_SPLITMUX_SINK);
1718 }
1719
1720 static void
1721 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1722 {
1723   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1724     splitmux->fragment_id = 0;
1725   }
1726 }