bf66a113f50a583b38e7b7cae3cf07d61058cd45
[platform/upstream/gstreamer.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MAX_FILES,
75   PROP_MUXER_OVERHEAD,
76   PROP_MUXER,
77   PROP_SINK
78 };
79
80 #define DEFAULT_MAX_SIZE_TIME       0
81 #define DEFAULT_MAX_SIZE_BYTES      0
82 #define DEFAULT_MAX_FILES           0
83 #define DEFAULT_MUXER_OVERHEAD      0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
86
87 enum
88 {
89   SIGNAL_FORMAT_LOCATION,
90   SIGNAL_LAST
91 };
92
93 static guint signals[SIGNAL_LAST];
94
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
102     GST_PAD_SINK,
103     GST_PAD_REQUEST,
104     GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
107     GST_PAD_SINK,
108     GST_PAD_REQUEST,
109     GST_STATIC_CAPS_ANY);
110
111 static GQuark PAD_CONTEXT;
112
113 static void
114 _do_init (void)
115 {
116   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
117 }
118
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
121     _do_init ());
122
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126     const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128     GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
131
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
135
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137     element, GstStateChange transition);
138
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
144
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
146
147 static MqStreamBuf *
148 mq_stream_buf_new (void)
149 {
150   return g_slice_new0 (MqStreamBuf);
151 }
152
153 static void
154 mq_stream_buf_free (MqStreamBuf * data)
155 {
156   g_slice_free (MqStreamBuf, data);
157 }
158
159 static void
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
161 {
162   GObjectClass *gobject_class = (GObjectClass *) klass;
163   GstElementClass *gstelement_class = (GstElementClass *) klass;
164   GstBinClass *gstbin_class = (GstBinClass *) klass;
165
166   gobject_class->set_property = gst_splitmux_sink_set_property;
167   gobject_class->get_property = gst_splitmux_sink_get_property;
168   gobject_class->dispose = gst_splitmux_sink_dispose;
169   gobject_class->finalize = gst_splitmux_sink_finalize;
170
171   gst_element_class_set_static_metadata (gstelement_class,
172       "Split Muxing Bin", "Generic/Bin/Muxer",
173       "Convenience bin that muxes incoming streams into multiple time/size limited files",
174       "Jan Schmidt <jan@centricular.com>");
175
176   gst_element_class_add_static_pad_template (gstelement_class,
177       &video_sink_template);
178   gst_element_class_add_static_pad_template (gstelement_class,
179       &audio_sink_template);
180   gst_element_class_add_static_pad_template (gstelement_class,
181       &subtitle_sink_template);
182
183   gstelement_class->change_state =
184       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185   gstelement_class->request_new_pad =
186       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187   gstelement_class->release_pad =
188       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
189
190   gstbin_class->handle_message = bus_handler;
191
192   g_object_class_install_property (gobject_class, PROP_LOCATION,
193       g_param_spec_string ("location", "File Output Pattern",
194           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197       g_param_spec_double ("mux-overhead", "Muxing Overhead",
198           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199           DEFAULT_MUXER_OVERHEAD,
200           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
201
202   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211       g_param_spec_uint ("max-files", "Max files",
212           "Maximum number of files to keep on disk. Once the maximum is reached,"
213           "old files start to be deleted to make room for new ones.",
214           0, G_MAXUINT, DEFAULT_MAX_FILES,
215           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
216
217
218   g_object_class_install_property (gobject_class, PROP_MUXER,
219       g_param_spec_object ("muxer", "Muxer",
220           "The muxer element to use (NULL = default mp4mux)",
221           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_SINK,
223       g_param_spec_object ("sink", "Sink",
224           "The sink element (or element chain) to use (NULL = default filesink)",
225           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   /**
228    * GstSplitMuxSink::format-location:
229    * @splitmux: the #GstSplitMuxSink
230    * @fragment_id: the sequence number of the file to be created
231    *
232    * Returns: the location to be used for the next output file
233    */
234   signals[SIGNAL_FORMAT_LOCATION] =
235       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
237 }
238
239 static void
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
241 {
242   g_mutex_init (&splitmux->lock);
243   g_cond_init (&splitmux->data_cond);
244
245   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248   splitmux->max_files = DEFAULT_MAX_FILES;
249
250   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251   g_object_set (splitmux, "async-handling", TRUE, NULL);
252 }
253
254 static void
255 gst_splitmux_reset (GstSplitMuxSink * splitmux)
256 {
257   if (splitmux->mq)
258     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
259   if (splitmux->muxer)
260     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
261   if (splitmux->active_sink)
262     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
263
264   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
265       NULL;
266 }
267
268 static void
269 gst_splitmux_sink_dispose (GObject * object)
270 {
271   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
272
273   G_OBJECT_CLASS (parent_class)->dispose (object);
274
275   /* Calling parent dispose invalidates all child pointers */
276   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
277       NULL;
278 }
279
280 static void
281 gst_splitmux_sink_finalize (GObject * object)
282 {
283   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
284   g_cond_clear (&splitmux->data_cond);
285   g_mutex_clear (&splitmux->lock);
286   if (splitmux->provided_sink)
287     gst_object_unref (splitmux->provided_sink);
288   if (splitmux->provided_muxer)
289     gst_object_unref (splitmux->provided_muxer);
290
291   g_free (splitmux->location);
292
293   /* Make sure to free any un-released contexts */
294   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
295   g_list_free (splitmux->contexts);
296
297   G_OBJECT_CLASS (parent_class)->finalize (object);
298 }
299
300 static void
301 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
302     const GValue * value, GParamSpec * pspec)
303 {
304   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
305
306   switch (prop_id) {
307     case PROP_LOCATION:{
308       GST_OBJECT_LOCK (splitmux);
309       g_free (splitmux->location);
310       splitmux->location = g_value_dup_string (value);
311       GST_OBJECT_UNLOCK (splitmux);
312       break;
313     }
314     case PROP_MAX_SIZE_BYTES:
315       GST_OBJECT_LOCK (splitmux);
316       splitmux->threshold_bytes = g_value_get_uint64 (value);
317       GST_OBJECT_UNLOCK (splitmux);
318       break;
319     case PROP_MAX_SIZE_TIME:
320       GST_OBJECT_LOCK (splitmux);
321       splitmux->threshold_time = g_value_get_uint64 (value);
322       GST_OBJECT_UNLOCK (splitmux);
323       break;
324     case PROP_MAX_FILES:
325       GST_OBJECT_LOCK (splitmux);
326       splitmux->max_files = g_value_get_uint (value);
327       GST_OBJECT_UNLOCK (splitmux);
328       break;
329     case PROP_MUXER_OVERHEAD:
330       GST_OBJECT_LOCK (splitmux);
331       splitmux->mux_overhead = g_value_get_double (value);
332       GST_OBJECT_UNLOCK (splitmux);
333       break;
334     case PROP_SINK:
335       GST_OBJECT_LOCK (splitmux);
336       if (splitmux->provided_sink)
337         gst_object_unref (splitmux->provided_sink);
338       splitmux->provided_sink = g_value_dup_object (value);
339       GST_OBJECT_UNLOCK (splitmux);
340       break;
341     case PROP_MUXER:
342       GST_OBJECT_LOCK (splitmux);
343       if (splitmux->provided_muxer)
344         gst_object_unref (splitmux->provided_muxer);
345       splitmux->provided_muxer = g_value_dup_object (value);
346       GST_OBJECT_UNLOCK (splitmux);
347       break;
348     default:
349       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
350       break;
351   }
352 }
353
354 static void
355 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
356     GValue * value, GParamSpec * pspec)
357 {
358   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
359
360   switch (prop_id) {
361     case PROP_LOCATION:
362       GST_OBJECT_LOCK (splitmux);
363       g_value_set_string (value, splitmux->location);
364       GST_OBJECT_UNLOCK (splitmux);
365       break;
366     case PROP_MAX_SIZE_BYTES:
367       GST_OBJECT_LOCK (splitmux);
368       g_value_set_uint64 (value, splitmux->threshold_bytes);
369       GST_OBJECT_UNLOCK (splitmux);
370       break;
371     case PROP_MAX_SIZE_TIME:
372       GST_OBJECT_LOCK (splitmux);
373       g_value_set_uint64 (value, splitmux->threshold_time);
374       GST_OBJECT_UNLOCK (splitmux);
375       break;
376     case PROP_MAX_FILES:
377       GST_OBJECT_LOCK (splitmux);
378       g_value_set_uint (value, splitmux->max_files);
379       GST_OBJECT_UNLOCK (splitmux);
380       break;
381     case PROP_MUXER_OVERHEAD:
382       GST_OBJECT_LOCK (splitmux);
383       g_value_set_double (value, splitmux->mux_overhead);
384       GST_OBJECT_UNLOCK (splitmux);
385       break;
386     case PROP_SINK:
387       GST_OBJECT_LOCK (splitmux);
388       g_value_set_object (value, splitmux->provided_sink);
389       GST_OBJECT_UNLOCK (splitmux);
390       break;
391     case PROP_MUXER:
392       GST_OBJECT_LOCK (splitmux);
393       g_value_set_object (value, splitmux->provided_muxer);
394       GST_OBJECT_UNLOCK (splitmux);
395       break;
396     default:
397       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
398       break;
399   }
400 }
401
402 /* Convenience function */
403 static inline GstClockTimeDiff
404 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
405 {
406   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
407
408   if (GST_CLOCK_TIME_IS_VALID (val)) {
409     gboolean sign =
410         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
411     if (sign > 0)
412       res = val;
413     else if (sign < 0)
414       res = -val;
415   }
416   return res;
417 }
418
419 static GstPad *
420 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
421 {
422   gchar *tmp, *sinkname, *srcname;
423   GstPad *mq_src;
424
425   sinkname = gst_pad_get_name (sink_pad);
426   tmp = sinkname + 5;
427   srcname = g_strdup_printf ("src_%s", tmp);
428
429   mq_src = gst_element_get_static_pad (mq, srcname);
430
431   g_free (sinkname);
432   g_free (srcname);
433
434   return mq_src;
435 }
436
437 static gboolean
438 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
439     GstPad ** src_pad)
440 {
441   GstPad *mq_sink;
442   GstPad *mq_src;
443
444   /* Request a pad from multiqueue, then connect this one, then
445    * discover the corresponding output pad and return both */
446   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
447   if (mq_sink == NULL)
448     return FALSE;
449
450   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
451   if (mq_src == NULL)
452     goto fail;
453
454   *sink_pad = mq_sink;
455   *src_pad = mq_src;
456
457   return TRUE;
458
459 fail:
460   gst_element_release_request_pad (splitmux->mq, mq_sink);
461   return FALSE;
462 }
463
464 static MqStreamCtx *
465 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
466 {
467   MqStreamCtx *ctx;
468
469   ctx = g_new0 (MqStreamCtx, 1);
470   g_atomic_int_set (&ctx->refcount, 1);
471   ctx->splitmux = splitmux;
472   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
473   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
474   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
475   g_queue_init (&ctx->queued_bufs);
476   return ctx;
477 }
478
479 static void
480 mq_stream_ctx_free (MqStreamCtx * ctx)
481 {
482   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
483   g_queue_clear (&ctx->queued_bufs);
484   g_free (ctx);
485 }
486
487 static void
488 mq_stream_ctx_unref (MqStreamCtx * ctx)
489 {
490   if (g_atomic_int_dec_and_test (&ctx->refcount))
491     mq_stream_ctx_free (ctx);
492 }
493
494 static void
495 mq_stream_ctx_ref (MqStreamCtx * ctx)
496 {
497   g_atomic_int_inc (&ctx->refcount);
498 }
499
500 static void
501 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
502 {
503   ctx->sink_pad_block_id = 0;
504   mq_stream_ctx_unref (ctx);
505 }
506
507 static void
508 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
509 {
510   ctx->src_pad_block_id = 0;
511   mq_stream_ctx_unref (ctx);
512 }
513
514 static void
515 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
516 {
517   gchar *location = NULL;
518   GstMessage *msg;
519   const gchar *msg_name = opened ?
520       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
521
522   g_object_get (splitmux->sink, "location", &location, NULL);
523
524   msg = gst_message_new_element (GST_OBJECT (splitmux),
525       gst_structure_new (msg_name,
526           "location", G_TYPE_STRING, location,
527           "running-time", GST_TYPE_CLOCK_TIME,
528           splitmux->reference_ctx->out_running_time, NULL));
529   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
530
531   g_free (location);
532 }
533
534 /* Called with lock held, drops the lock to send EOS to the
535  * pad
536  */
537 static void
538 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
539 {
540   GstEvent *eos;
541   GstPad *pad;
542
543   eos = gst_event_new_eos ();
544   pad = gst_pad_get_peer (ctx->srcpad);
545
546   ctx->out_eos = TRUE;
547
548   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
549   GST_SPLITMUX_UNLOCK (splitmux);
550   gst_pad_send_event (pad, eos);
551   GST_SPLITMUX_LOCK (splitmux);
552
553   gst_object_unref (pad);
554 }
555
556 /* Called with splitmux lock held to check if this output
557  * context needs to sleep to wait for the release of the
558  * next GOP, or to send EOS to close out the current file
559  */
560 static void
561 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
562 {
563   do {
564
565     GST_LOG_OBJECT (ctx->srcpad,
566         "Checking running time %" GST_STIME_FORMAT " against max %"
567         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
568         GST_STIME_ARGS (splitmux->max_out_running_time));
569
570     if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
571         ctx->out_running_time < splitmux->max_out_running_time) {
572       splitmux->have_muxed_something = TRUE;
573       return;
574     }
575
576     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
577       return;
578
579     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
580       if (ctx->out_eos == FALSE) {
581         send_eos (splitmux, ctx);
582         continue;
583       }
584     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
585       start_next_fragment (splitmux);
586       continue;
587     }
588
589     GST_INFO_OBJECT (ctx->srcpad,
590         "Sleeping for running time %"
591         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
592         GST_STIME_ARGS (ctx->out_running_time),
593         GST_STIME_ARGS (splitmux->max_out_running_time));
594     ctx->out_blocked = TRUE;
595     /* Expand the mq if needed before sleeping */
596     check_queue_length (splitmux, ctx);
597     GST_SPLITMUX_WAIT (splitmux);
598     ctx->out_blocked = FALSE;
599     GST_INFO_OBJECT (ctx->srcpad,
600         "Woken for new max running time %" GST_STIME_FORMAT,
601         GST_STIME_ARGS (splitmux->max_out_running_time));
602   } while (1);
603 }
604
605 static GstPadProbeReturn
606 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
607 {
608   GstSplitMuxSink *splitmux = ctx->splitmux;
609   MqStreamBuf *buf_info = NULL;
610
611   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
612
613   /* FIXME: Handle buffer lists, until then make it clear they won't work */
614   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
615     g_warning ("Buffer list handling not implemented");
616     return GST_PAD_PROBE_DROP;
617   }
618   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
619     GstEvent *event = gst_pad_probe_info_get_event (info);
620
621     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
622
623     switch (GST_EVENT_TYPE (event)) {
624       case GST_EVENT_SEGMENT:
625         gst_event_copy_segment (event, &ctx->out_segment);
626         break;
627       case GST_EVENT_FLUSH_STOP:
628         GST_SPLITMUX_LOCK (splitmux);
629         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
630         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
631         g_queue_clear (&ctx->queued_bufs);
632         ctx->flushing = FALSE;
633         GST_SPLITMUX_UNLOCK (splitmux);
634         break;
635       case GST_EVENT_FLUSH_START:
636         GST_SPLITMUX_LOCK (splitmux);
637         GST_LOG_OBJECT (pad, "Flush start");
638         ctx->flushing = TRUE;
639         GST_SPLITMUX_BROADCAST (splitmux);
640         GST_SPLITMUX_UNLOCK (splitmux);
641         break;
642       case GST_EVENT_EOS:
643         GST_SPLITMUX_LOCK (splitmux);
644         if (splitmux->state == SPLITMUX_STATE_STOPPED)
645           goto beach;
646         ctx->out_eos = TRUE;
647         GST_SPLITMUX_UNLOCK (splitmux);
648         break;
649       case GST_EVENT_GAP:{
650         GstClockTime gap_ts;
651         GstClockTimeDiff rtime;
652
653         gst_event_parse_gap (event, &gap_ts, NULL);
654         if (gap_ts == GST_CLOCK_TIME_NONE)
655           break;
656
657         GST_SPLITMUX_LOCK (splitmux);
658
659         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
660
661         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
662             GST_STIME_ARGS (rtime));
663
664         if (splitmux->state == SPLITMUX_STATE_STOPPED)
665           goto beach;
666
667         if (rtime != GST_CLOCK_STIME_NONE) {
668           ctx->out_running_time = rtime;
669           complete_or_wait_on_out (splitmux, ctx);
670         }
671         GST_SPLITMUX_UNLOCK (splitmux);
672         break;
673       }
674       case GST_EVENT_CUSTOM_DOWNSTREAM:{
675         const GstStructure *s;
676         GstClockTimeDiff ts = 0;
677
678         s = gst_event_get_structure (event);
679         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
680           break;
681
682         gst_structure_get_int64 (s, "timestamp", &ts);
683
684         GST_SPLITMUX_LOCK (splitmux);
685
686         if (splitmux->state == SPLITMUX_STATE_STOPPED)
687           goto beach;
688         ctx->out_running_time = ts;
689         complete_or_wait_on_out (splitmux, ctx);
690         GST_SPLITMUX_UNLOCK (splitmux);
691         return GST_PAD_PROBE_DROP;
692       }
693       default:
694         break;
695     }
696     return GST_PAD_PROBE_PASS;
697   }
698
699   /* Allow everything through until the configured next stopping point */
700   GST_SPLITMUX_LOCK (splitmux);
701
702   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
703   if (buf_info == NULL)
704     /* Can only happen due to a poorly timed flush */
705     goto beach;
706
707   /* If we have popped a keyframe, decrement the queued_gop count */
708   if (buf_info->keyframe && splitmux->queued_gops > 0)
709     splitmux->queued_gops--;
710
711   ctx->out_running_time = buf_info->run_ts;
712
713   GST_LOG_OBJECT (splitmux,
714       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
715       " size %" G_GSIZE_FORMAT,
716       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
717
718   if (splitmux->opening_first_fragment) {
719     send_fragment_opened_closed_msg (splitmux, TRUE);
720     splitmux->opening_first_fragment = FALSE;
721   }
722
723   complete_or_wait_on_out (splitmux, ctx);
724
725   if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
726       splitmux->muxed_out_time < buf_info->run_ts)
727     splitmux->muxed_out_time = buf_info->run_ts;
728
729   splitmux->muxed_out_bytes += buf_info->buf_size;
730
731 #ifndef GST_DISABLE_GST_DEBUG
732   {
733     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
734     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
735         " run ts %" GST_STIME_FORMAT, buf,
736         GST_STIME_ARGS (ctx->out_running_time));
737   }
738 #endif
739
740   GST_SPLITMUX_UNLOCK (splitmux);
741
742   mq_stream_buf_free (buf_info);
743
744   return GST_PAD_PROBE_PASS;
745
746 beach:
747   GST_SPLITMUX_UNLOCK (splitmux);
748   return GST_PAD_PROBE_DROP;
749 }
750
751 static gboolean
752 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
753 {
754   return gst_pad_send_event (peer, gst_event_ref (*event));
755 }
756
757 static void
758 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
759 {
760   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
761
762   gst_pad_sticky_events_foreach (ctx->srcpad,
763       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
764
765   /* Clear EOS flag if not actually EOS */
766   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
767
768   gst_object_unref (peer);
769 }
770
771 /* Called with lock held when a fragment
772  * reaches EOS and it is time to restart
773  * a new fragment
774  */
775 static void
776 start_next_fragment (GstSplitMuxSink * splitmux)
777 {
778   /* 1 change to new file */
779   gst_element_set_locked_state (splitmux->muxer, TRUE);
780   gst_element_set_locked_state (splitmux->active_sink, TRUE);
781   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
782   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
783
784   set_next_filename (splitmux);
785
786   gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
787   gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
788   gst_element_set_locked_state (splitmux->muxer, FALSE);
789   gst_element_set_locked_state (splitmux->active_sink, FALSE);
790
791   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
792
793   /* Switch state and go back to processing */
794   if (!splitmux->reference_ctx->in_eos) {
795     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
796     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
797   } else {
798     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
799     splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
800     splitmux->have_muxed_something = FALSE;
801   }
802   splitmux->have_muxed_something =
803       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
804
805   /* Store the overflow parameters as the basis for the next fragment */
806   splitmux->mux_start_time = splitmux->muxed_out_time;
807   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
808
809   GST_DEBUG_OBJECT (splitmux,
810       "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
811       GST_STIME_ARGS (splitmux->max_out_running_time));
812
813   send_fragment_opened_closed_msg (splitmux, TRUE);
814
815   GST_SPLITMUX_BROADCAST (splitmux);
816 }
817
818 static void
819 bus_handler (GstBin * bin, GstMessage * message)
820 {
821   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
822
823   switch (GST_MESSAGE_TYPE (message)) {
824     case GST_MESSAGE_EOS:
825       /* If the state is draining out the current file, drop this EOS */
826       GST_SPLITMUX_LOCK (splitmux);
827
828       send_fragment_opened_closed_msg (splitmux, FALSE);
829
830       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
831           splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
832         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
833         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
834         GST_SPLITMUX_BROADCAST (splitmux);
835
836         gst_message_unref (message);
837         GST_SPLITMUX_UNLOCK (splitmux);
838         return;
839       }
840       GST_SPLITMUX_UNLOCK (splitmux);
841       break;
842     default:
843       break;
844   }
845
846   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
847 }
848
849 /* Called with splitmux lock held */
850 /* Called when entering ProcessingCompleteGop state
851  * Assess if mq contents overflowed the current file
852  *   -> If yes, need to switch to new file
853  *   -> if no, set max_out_running_time to let this GOP in and
854  *      go to COLLECTING_GOP_START state
855  */
856 static void
857 handle_gathered_gop (GstSplitMuxSink * splitmux)
858 {
859   GList *cur;
860   gsize queued_bytes = 0;
861   GstClockTimeDiff queued_time = 0;
862
863   /* Assess if the multiqueue contents overflowed the current file */
864   for (cur = g_list_first (splitmux->contexts);
865       cur != NULL; cur = g_list_next (cur)) {
866     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
867     if (tmpctx->in_running_time > queued_time)
868       queued_time = tmpctx->in_running_time;
869     queued_bytes += tmpctx->in_bytes;
870   }
871
872   g_assert (queued_bytes >= splitmux->mux_start_bytes);
873   g_assert (queued_time >= splitmux->mux_start_time);
874
875   queued_bytes -= splitmux->mux_start_bytes;
876   queued_time -= splitmux->mux_start_time;
877
878   /* Expand queued bytes estimate by muxer overhead */
879   queued_bytes += (queued_bytes * splitmux->mux_overhead);
880
881   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
882       " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
883
884   /* Check for overrun - have we output at least one byte and overrun
885    * either threshold? */
886   if ((splitmux->have_muxed_something &&
887           ((splitmux->threshold_bytes > 0 &&
888                   queued_bytes >= splitmux->threshold_bytes) ||
889               (splitmux->threshold_time > 0 &&
890                   queued_time >= splitmux->threshold_time)))) {
891
892     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
893
894     GST_INFO_OBJECT (splitmux,
895         "mq overflowed since last, draining out. max out TS is %"
896         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
897     GST_SPLITMUX_BROADCAST (splitmux);
898
899   } else {
900     /* No overflow */
901     GST_LOG_OBJECT (splitmux,
902         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
903         " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
904         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
905         queued_bytes, GST_STIME_ARGS (queued_time));
906
907     /* Wake everyone up to push this one GOP, then sleep */
908     splitmux->have_muxed_something = TRUE;
909
910     if (!splitmux->reference_ctx->in_eos) {
911       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
912       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
913     } else {
914       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
915       splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
916     }
917
918     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
919         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
920     GST_SPLITMUX_BROADCAST (splitmux);
921   }
922
923 }
924
925 /* Called with splitmux lock held */
926 /* Called from each input pad when it is has all the pieces
927  * for a GOP or EOS, starting with the reference pad which has set the
928  * splitmux->max_in_running_time
929  */
930 static void
931 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
932 {
933   GList *cur;
934   gboolean ready = TRUE;
935   GstClockTimeDiff current_max_in_running_time;
936
937   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
938     /* Iterate each pad, and check that the input running time is at least
939      * up to the reference running time, and if so handle the collected GOP */
940     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
941         GST_STIME_FORMAT " ctx %p",
942         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
943     for (cur = g_list_first (splitmux->contexts); cur != NULL;
944         cur = g_list_next (cur)) {
945       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
946
947       GST_LOG_OBJECT (splitmux,
948           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
949           " EOS %d", tmpctx, tmpctx->srcpad,
950           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
951
952       if (splitmux->max_in_running_time != G_MAXINT64 &&
953           tmpctx->in_running_time < splitmux->max_in_running_time &&
954           !tmpctx->in_eos) {
955         GST_LOG_OBJECT (splitmux,
956             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
957             tmpctx, tmpctx->srcpad);
958         ready = FALSE;
959         break;
960       }
961     }
962     if (ready) {
963       GST_DEBUG_OBJECT (splitmux,
964           "Collected GOP is complete. Processing (ctx %p)", ctx);
965       /* All pads have a complete GOP, release it into the multiqueue */
966       handle_gathered_gop (splitmux);
967     }
968   }
969
970   /* If upstream reached EOS we are not expecting more data, no need to wait
971    * here. */
972   if (ctx->in_eos)
973     return;
974
975   /* Some pad is not yet ready, or GOP is being pushed
976    * either way, sleep and wait to get woken */
977   current_max_in_running_time = splitmux->max_in_running_time;
978   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
979           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
980       !ctx->flushing &&
981       (current_max_in_running_time == splitmux->max_in_running_time)) {
982
983     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
984         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
985         "GOP complete" : "EOF draining", ctx);
986     GST_SPLITMUX_WAIT (splitmux);
987
988     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
989   }
990 }
991
992 static void
993 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
994 {
995   GList *cur;
996   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
997
998   GST_DEBUG_OBJECT (ctx->sinkpad,
999       "Checking queue length len %u cur_max %u queued gops %u",
1000       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
1001
1002   if (cur_len >= splitmux->mq_max_buffers) {
1003     gboolean allow_grow = FALSE;
1004
1005     /* If collecting a GOP and this pad might block,
1006      * and there isn't already a pending GOP in the queue
1007      * then grow
1008      */
1009     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
1010         ctx->in_running_time < splitmux->max_in_running_time &&
1011         splitmux->queued_gops <= 1) {
1012       allow_grow = TRUE;
1013     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
1014         ctx->is_reference && splitmux->queued_gops <= 1) {
1015       allow_grow = TRUE;
1016     }
1017
1018     if (!allow_grow) {
1019       for (cur = g_list_first (splitmux->contexts);
1020           cur != NULL; cur = g_list_next (cur)) {
1021         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1022         GST_DEBUG_OBJECT (tmpctx->sinkpad,
1023             " len %u out_blocked %d",
1024             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1025         /* If another stream is starving, grow */
1026         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1027           allow_grow = TRUE;
1028         }
1029       }
1030     }
1031
1032     if (allow_grow) {
1033       splitmux->mq_max_buffers = cur_len + 1;
1034
1035       GST_INFO_OBJECT (splitmux,
1036           "Multiqueue overrun - enlarging to %u buffers ctx %p",
1037           splitmux->mq_max_buffers, ctx);
1038
1039       g_object_set (splitmux->mq, "max-size-buffers",
1040           splitmux->mq_max_buffers, NULL);
1041     }
1042   }
1043 }
1044
1045 static GstPadProbeReturn
1046 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1047 {
1048   GstSplitMuxSink *splitmux = ctx->splitmux;
1049   GstBuffer *buf;
1050   MqStreamBuf *buf_info = NULL;
1051   GstClockTime ts;
1052   gboolean loop_again;
1053   gboolean keyframe = FALSE;
1054
1055   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1056
1057   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1058   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1059     g_warning ("Buffer list handling not implemented");
1060     return GST_PAD_PROBE_DROP;
1061   }
1062   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1063     GstEvent *event = gst_pad_probe_info_get_event (info);
1064     switch (GST_EVENT_TYPE (event)) {
1065       case GST_EVENT_SEGMENT:
1066         gst_event_copy_segment (event, &ctx->in_segment);
1067         break;
1068       case GST_EVENT_FLUSH_STOP:
1069         GST_SPLITMUX_LOCK (splitmux);
1070         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1071         ctx->in_eos = FALSE;
1072         ctx->in_bytes = 0;
1073         ctx->in_running_time = GST_CLOCK_STIME_NONE;
1074         GST_SPLITMUX_UNLOCK (splitmux);
1075         break;
1076       case GST_EVENT_EOS:
1077         GST_SPLITMUX_LOCK (splitmux);
1078         ctx->in_eos = TRUE;
1079
1080         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1081           goto beach;
1082
1083         if (ctx->is_reference) {
1084           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1085           /* Act as if this is a new keyframe with infinite timestamp */
1086           splitmux->max_in_running_time = G_MAXINT64;
1087           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1088           /* Wake up other input pads to collect this GOP */
1089           GST_SPLITMUX_BROADCAST (splitmux);
1090           check_completed_gop (splitmux, ctx);
1091         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1092           /* If we are waiting for a GOP to be completed (ie, for aux
1093            * pads to catch up), then this pad is complete, so check
1094            * if the whole GOP is.
1095            */
1096           check_completed_gop (splitmux, ctx);
1097         }
1098         GST_SPLITMUX_UNLOCK (splitmux);
1099         break;
1100       default:
1101         break;
1102     }
1103     return GST_PAD_PROBE_PASS;
1104   }
1105
1106   buf = gst_pad_probe_info_get_buffer (info);
1107   buf_info = mq_stream_buf_new ();
1108
1109   if (GST_BUFFER_PTS_IS_VALID (buf))
1110     ts = GST_BUFFER_PTS (buf);
1111   else
1112     ts = GST_BUFFER_DTS (buf);
1113
1114   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1115
1116   GST_SPLITMUX_LOCK (splitmux);
1117
1118   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1119     goto beach;
1120
1121   /* If this buffer has a timestamp, advance the input timestamp of the
1122    * stream */
1123   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1124     GstClockTimeDiff running_time =
1125         my_segment_to_running_time (&ctx->in_segment,
1126         GST_BUFFER_TIMESTAMP (buf));
1127
1128     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1129         GST_STIME_ARGS (running_time));
1130
1131     if (GST_CLOCK_STIME_IS_VALID (running_time)
1132         && running_time > ctx->in_running_time)
1133       ctx->in_running_time = running_time;
1134   }
1135
1136   /* Try to make sure we have a valid running time */
1137   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1138     ctx->in_running_time =
1139         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1140   }
1141
1142   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1143       GST_STIME_ARGS (ctx->in_running_time));
1144
1145   buf_info->run_ts = ctx->in_running_time;
1146   buf_info->buf_size = gst_buffer_get_size (buf);
1147
1148   /* Update total input byte counter for overflow detect */
1149   ctx->in_bytes += buf_info->buf_size;
1150
1151   /* initialize mux_start_time */
1152   if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
1153     splitmux->mux_start_time = buf_info->run_ts;
1154     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1155         GST_STIME_ARGS (splitmux->mux_start_time));
1156     /* Also take this as the first start time when starting up,
1157      * so that we start counting overflow from the first frame */
1158     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1159       splitmux->max_in_running_time = splitmux->mux_start_time;
1160   }
1161
1162   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1163       " total in_bytes %" G_GSIZE_FORMAT,
1164       GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1165
1166   loop_again = TRUE;
1167   do {
1168     if (ctx->flushing)
1169       break;
1170
1171     switch (splitmux->state) {
1172       case SPLITMUX_STATE_COLLECTING_GOP_START:
1173         if (ctx->is_reference) {
1174           /* If a keyframe, we have a complete GOP */
1175           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1176               !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1177               splitmux->max_in_running_time >= ctx->in_running_time) {
1178             /* Pass this buffer through */
1179             loop_again = FALSE;
1180             break;
1181           }
1182           GST_INFO_OBJECT (pad,
1183               "Have keyframe with running time %" GST_STIME_FORMAT,
1184               GST_STIME_ARGS (ctx->in_running_time));
1185           keyframe = TRUE;
1186           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1187           splitmux->max_in_running_time = ctx->in_running_time;
1188           /* Wake up other input pads to collect this GOP */
1189           GST_SPLITMUX_BROADCAST (splitmux);
1190           check_completed_gop (splitmux, ctx);
1191         } else {
1192           /* We're still waiting for a keyframe on the reference pad, sleep */
1193           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1194           GST_SPLITMUX_WAIT (splitmux);
1195           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1196               splitmux->state);
1197         }
1198         break;
1199       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1200
1201         /* If we overran the target timestamp, it might be time to process
1202          * the GOP, otherwise bail out for more data
1203          */
1204         GST_LOG_OBJECT (pad,
1205             "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
1206             GST_STIME_ARGS (ctx->in_running_time),
1207             GST_STIME_ARGS (splitmux->max_in_running_time));
1208
1209         if (ctx->in_running_time < splitmux->max_in_running_time) {
1210           loop_again = FALSE;
1211           break;
1212         }
1213
1214         GST_LOG_OBJECT (pad,
1215             "Collected last packet of GOP. Checking other pads");
1216         check_completed_gop (splitmux, ctx);
1217         break;
1218       case SPLITMUX_STATE_ENDING_FILE:{
1219         GstEvent *event;
1220
1221         /* If somes streams received no buffer during the last GOP that overran,
1222          * because its next buffer has a timestamp bigger than
1223          * ctx->max_in_running_time, its queue is empty. In that case the only
1224          * way to wakeup the output thread is by injecting an event in the
1225          * queue. This usually happen with subtitle streams.
1226          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1227         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1228         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1229             GST_EVENT_TYPE_SERIALIZED,
1230             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1231                 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1232
1233         GST_SPLITMUX_UNLOCK (splitmux);
1234         gst_pad_send_event (ctx->sinkpad, event);
1235         GST_SPLITMUX_LOCK (splitmux);
1236         /* fallthrough */
1237       }
1238       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1239         /* A fragment is ending, wait until that's done before continuing */
1240         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1241         GST_SPLITMUX_WAIT (splitmux);
1242         GST_DEBUG_OBJECT (pad,
1243             "Done sleeping for fragment restart state now %d", splitmux->state);
1244         break;
1245       default:
1246         loop_again = FALSE;
1247         break;
1248     }
1249   } while (loop_again);
1250
1251   if (keyframe) {
1252     splitmux->queued_gops++;
1253     buf_info->keyframe = TRUE;
1254   }
1255
1256   /* Now add this buffer to the queue just before returning */
1257   g_queue_push_head (&ctx->queued_bufs, buf_info);
1258
1259   /* Check the buffer will fit in the mq */
1260   check_queue_length (splitmux, ctx);
1261
1262   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1263       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1264
1265   GST_SPLITMUX_UNLOCK (splitmux);
1266   return GST_PAD_PROBE_PASS;
1267
1268 beach:
1269   GST_SPLITMUX_UNLOCK (splitmux);
1270   if (buf_info)
1271     mq_stream_buf_free (buf_info);
1272   return GST_PAD_PROBE_PASS;
1273 }
1274
1275 static GstPad *
1276 gst_splitmux_sink_request_new_pad (GstElement * element,
1277     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1278 {
1279   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1280   GstPadTemplate *mux_template = NULL;
1281   GstPad *res = NULL;
1282   GstPad *mq_sink, *mq_src;
1283   gchar *gname;
1284   gboolean is_video = FALSE;
1285   MqStreamCtx *ctx;
1286
1287   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1288
1289   GST_SPLITMUX_LOCK (splitmux);
1290   if (!create_elements (splitmux))
1291     goto fail;
1292
1293   if (templ->name_template) {
1294     if (g_str_equal (templ->name_template, "video")) {
1295       /* FIXME: Look for a pad template with matching caps, rather than by name */
1296       mux_template =
1297           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1298           (splitmux->muxer), "video_%u");
1299       is_video = TRUE;
1300       name = NULL;
1301     } else {
1302       mux_template =
1303           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1304           (splitmux->muxer), templ->name_template);
1305     }
1306     if (mux_template == NULL) {
1307       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1308       mux_template =
1309           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1310           (splitmux->muxer), "sink_%d");
1311     }
1312   }
1313
1314   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1315   if (res == NULL)
1316     goto fail;
1317
1318   if (is_video)
1319     gname = g_strdup ("video");
1320   else if (name == NULL)
1321     gname = gst_pad_get_name (res);
1322   else
1323     gname = g_strdup (name);
1324
1325   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1326     gst_element_release_request_pad (splitmux->muxer, res);
1327     gst_object_unref (GST_OBJECT (res));
1328     goto fail;
1329   }
1330
1331   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1332     gst_element_release_request_pad (splitmux->muxer, res);
1333     gst_object_unref (GST_OBJECT (res));
1334     gst_element_release_request_pad (splitmux->mq, mq_sink);
1335     gst_object_unref (GST_OBJECT (mq_sink));
1336     goto fail;
1337   }
1338
1339   gst_object_unref (GST_OBJECT (res));
1340
1341   ctx = mq_stream_ctx_new (splitmux);
1342   ctx->srcpad = mq_src;
1343   ctx->sinkpad = mq_sink;
1344
1345   mq_stream_ctx_ref (ctx);
1346   ctx->src_pad_block_id =
1347       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1348       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1349       _pad_block_destroy_src_notify);
1350   if (is_video && splitmux->reference_ctx != NULL) {
1351     splitmux->reference_ctx->is_reference = FALSE;
1352     splitmux->reference_ctx = NULL;
1353   }
1354   if (splitmux->reference_ctx == NULL) {
1355     splitmux->reference_ctx = ctx;
1356     ctx->is_reference = TRUE;
1357   }
1358
1359   res = gst_ghost_pad_new (gname, mq_sink);
1360   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1361
1362   mq_stream_ctx_ref (ctx);
1363   ctx->sink_pad_block_id =
1364       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1365       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1366       _pad_block_destroy_sink_notify);
1367
1368   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1369       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1370
1371   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1372
1373   g_free (gname);
1374
1375   gst_object_unref (mq_sink);
1376   gst_object_unref (mq_src);
1377
1378   gst_pad_set_active (res, TRUE);
1379   gst_element_add_pad (element, res);
1380   GST_SPLITMUX_UNLOCK (splitmux);
1381
1382   return res;
1383 fail:
1384   GST_SPLITMUX_UNLOCK (splitmux);
1385   return NULL;
1386 }
1387
1388 static void
1389 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1390 {
1391   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1392   GstPad *mqsink, *mqsrc, *muxpad;
1393   MqStreamCtx *ctx =
1394       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1395
1396   GST_SPLITMUX_LOCK (splitmux);
1397
1398   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1399     goto fail;                  /* Elements don't exist yet - nothing to release */
1400
1401   GST_INFO_OBJECT (pad, "releasing request pad");
1402
1403   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1404   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1405   muxpad = gst_pad_get_peer (mqsrc);
1406
1407   /* Remove the context from our consideration */
1408   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1409
1410   if (ctx->sink_pad_block_id)
1411     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1412
1413   if (ctx->src_pad_block_id)
1414     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1415
1416   /* Can release the context now */
1417   mq_stream_ctx_unref (ctx);
1418
1419   /* Release and free the mq input */
1420   gst_element_release_request_pad (splitmux->mq, mqsink);
1421
1422   /* Release and free the muxer input */
1423   gst_element_release_request_pad (splitmux->muxer, muxpad);
1424
1425   gst_object_unref (mqsink);
1426   gst_object_unref (mqsrc);
1427   gst_object_unref (muxpad);
1428
1429   gst_element_remove_pad (element, pad);
1430
1431   /* Reset the internal elements only after all request pads are released */
1432   if (splitmux->contexts == NULL)
1433     gst_splitmux_reset (splitmux);
1434
1435 fail:
1436   GST_SPLITMUX_UNLOCK (splitmux);
1437 }
1438
1439 static GstElement *
1440 create_element (GstSplitMuxSink * splitmux,
1441     const gchar * factory, const gchar * name)
1442 {
1443   GstElement *ret = gst_element_factory_make (factory, name);
1444   if (ret == NULL) {
1445     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1446     return NULL;
1447   }
1448
1449   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1450     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1451     gst_object_unref (ret);
1452     return NULL;
1453   }
1454
1455   return ret;
1456 }
1457
1458 static gboolean
1459 create_elements (GstSplitMuxSink * splitmux)
1460 {
1461   /* Create internal elements */
1462   if (splitmux->mq == NULL) {
1463     if ((splitmux->mq =
1464             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1465       goto fail;
1466
1467     splitmux->mq_max_buffers = 5;
1468     /* No bytes or time limit, we limit buffers manually */
1469     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1470         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1471   }
1472
1473   if (splitmux->muxer == NULL) {
1474     GstElement *provided_muxer = NULL;
1475
1476     GST_OBJECT_LOCK (splitmux);
1477     if (splitmux->provided_muxer != NULL)
1478       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1479     GST_OBJECT_UNLOCK (splitmux);
1480
1481     if (provided_muxer == NULL) {
1482       if ((splitmux->muxer =
1483               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1484         goto fail;
1485     } else {
1486       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1487         g_warning ("Could not add muxer element - splitmuxsink will not work");
1488         gst_object_unref (provided_muxer);
1489         goto fail;
1490       }
1491
1492       splitmux->muxer = provided_muxer;
1493       gst_object_unref (provided_muxer);
1494     }
1495   }
1496
1497   return TRUE;
1498 fail:
1499   return FALSE;
1500 }
1501
1502 static GstElement *
1503 find_sink (GstElement * e)
1504 {
1505   GstElement *res = NULL;
1506   GstIterator *iter;
1507   gboolean done = FALSE;
1508   GValue data = { 0, };
1509
1510   if (!GST_IS_BIN (e))
1511     return e;
1512
1513   iter = gst_bin_iterate_sinks (GST_BIN (e));
1514   while (!done) {
1515     switch (gst_iterator_next (iter, &data)) {
1516       case GST_ITERATOR_OK:
1517       {
1518         GstElement *child = g_value_get_object (&data);
1519         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1520                 "location") != NULL) {
1521           res = child;
1522           done = TRUE;
1523         }
1524         g_value_reset (&data);
1525         break;
1526       }
1527       case GST_ITERATOR_RESYNC:
1528         gst_iterator_resync (iter);
1529         break;
1530       case GST_ITERATOR_DONE:
1531         done = TRUE;
1532         break;
1533       case GST_ITERATOR_ERROR:
1534         g_assert_not_reached ();
1535         break;
1536     }
1537   }
1538   g_value_unset (&data);
1539   gst_iterator_free (iter);
1540
1541   return res;
1542 }
1543
1544 static gboolean
1545 create_sink (GstSplitMuxSink * splitmux)
1546 {
1547   GstElement *provided_sink = NULL;
1548
1549   if (splitmux->active_sink == NULL) {
1550
1551     GST_OBJECT_LOCK (splitmux);
1552     if (splitmux->provided_sink != NULL)
1553       provided_sink = gst_object_ref (splitmux->provided_sink);
1554     GST_OBJECT_UNLOCK (splitmux);
1555
1556     if (provided_sink == NULL) {
1557       if ((splitmux->sink =
1558               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1559         goto fail;
1560       splitmux->active_sink = splitmux->sink;
1561     } else {
1562       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1563         g_warning ("Could not add sink elements - splitmuxsink will not work");
1564         gst_object_unref (provided_sink);
1565         goto fail;
1566       }
1567
1568       splitmux->active_sink = provided_sink;
1569
1570       /* The bin holds a ref now, we can drop our tmp ref */
1571       gst_object_unref (provided_sink);
1572
1573       /* Find the sink element */
1574       splitmux->sink = find_sink (splitmux->active_sink);
1575       if (splitmux->sink == NULL) {
1576         g_warning
1577             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1578         goto fail;
1579       }
1580     }
1581
1582     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1583       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1584       goto fail;
1585     }
1586   }
1587
1588   return TRUE;
1589 fail:
1590   return FALSE;
1591 }
1592
1593 #ifdef __GNUC__
1594 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1595 #endif
1596 static void
1597 set_next_filename (GstSplitMuxSink * splitmux)
1598 {
1599   gchar *fname = NULL;
1600   gst_splitmux_sink_ensure_max_files (splitmux);
1601
1602   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1603       splitmux->fragment_id, &fname);
1604
1605   if (!fname)
1606     fname = splitmux->location ?
1607         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1608
1609   if (fname) {
1610     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1611     g_object_set (splitmux->sink, "location", fname, NULL);
1612     g_free (fname);
1613
1614     splitmux->fragment_id++;
1615   }
1616 }
1617
1618 static GstStateChangeReturn
1619 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1620 {
1621   GstStateChangeReturn ret;
1622   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1623
1624   switch (transition) {
1625     case GST_STATE_CHANGE_NULL_TO_READY:{
1626       GST_SPLITMUX_LOCK (splitmux);
1627       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1628         ret = GST_STATE_CHANGE_FAILURE;
1629         GST_SPLITMUX_UNLOCK (splitmux);
1630         goto beach;
1631       }
1632       GST_SPLITMUX_UNLOCK (splitmux);
1633       splitmux->fragment_id = 0;
1634       set_next_filename (splitmux);
1635       break;
1636     }
1637     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1638       GST_SPLITMUX_LOCK (splitmux);
1639       /* Start by collecting one input on each pad */
1640       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1641       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
1642       splitmux->muxed_out_time = splitmux->mux_start_time =
1643           GST_CLOCK_STIME_NONE;
1644       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1645       splitmux->opening_first_fragment = TRUE;
1646       GST_SPLITMUX_UNLOCK (splitmux);
1647       break;
1648     }
1649     case GST_STATE_CHANGE_PAUSED_TO_READY:
1650     case GST_STATE_CHANGE_READY_TO_NULL:
1651       GST_SPLITMUX_LOCK (splitmux);
1652       splitmux->state = SPLITMUX_STATE_STOPPED;
1653       /* Wake up any blocked threads */
1654       GST_LOG_OBJECT (splitmux,
1655           "State change -> NULL or READY. Waking threads");
1656       GST_SPLITMUX_BROADCAST (splitmux);
1657       GST_SPLITMUX_UNLOCK (splitmux);
1658       break;
1659     default:
1660       break;
1661   }
1662
1663   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1664   if (ret == GST_STATE_CHANGE_FAILURE)
1665     goto beach;
1666
1667   switch (transition) {
1668     case GST_STATE_CHANGE_READY_TO_NULL:
1669       GST_SPLITMUX_LOCK (splitmux);
1670       splitmux->fragment_id = 0;
1671       /* Reset internal elements only if no pad contexts are using them */
1672       if (splitmux->contexts == NULL)
1673         gst_splitmux_reset (splitmux);
1674       GST_SPLITMUX_UNLOCK (splitmux);
1675       break;
1676     default:
1677       break;
1678   }
1679
1680 beach:
1681
1682   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1683       ret == GST_STATE_CHANGE_FAILURE) {
1684     /* Cleanup elements on failed transition out of NULL */
1685     gst_splitmux_reset (splitmux);
1686   }
1687   return ret;
1688 }
1689
1690 gboolean
1691 register_splitmuxsink (GstPlugin * plugin)
1692 {
1693   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1694       "Split File Muxing Sink");
1695
1696   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1697       GST_TYPE_SPLITMUX_SINK);
1698 }
1699
1700 static void
1701 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1702 {
1703   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1704     splitmux->fragment_id = 0;
1705   }
1706 }