splitmux: Implement new elements for splitting files at mux level.
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * is required, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * The splitting process is driven by the video stream contents, and
37  * the video stream must contain closed GOPs for the output file parts
38  * to be played individually correctly.
39  *
40  * <refsect2>
41  * <title>Example pipelines</title>
42  * |[
43  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc max-key-int=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
44  * ]|
45  *
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  *
50  * </refsect2>
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include <string.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MUXER_OVERHEAD,
75   PROP_MUXER,
76   PROP_SINK
77 };
78
79 #define DEFAULT_MAX_SIZE_TIME       0
80 #define DEFAULT_MAX_SIZE_BYTES      0
81 #define DEFAULT_MUXER_OVERHEAD      0.02
82 #define DEFAULT_MUXER "mp4mux"
83 #define DEFAULT_SINK "filesink"
84
85 static GstStaticPadTemplate video_sink_template =
86 GST_STATIC_PAD_TEMPLATE ("video",
87     GST_PAD_SINK,
88     GST_PAD_REQUEST,
89     GST_STATIC_CAPS_ANY);
90 static GstStaticPadTemplate audio_sink_template =
91 GST_STATIC_PAD_TEMPLATE ("audio_%u",
92     GST_PAD_SINK,
93     GST_PAD_REQUEST,
94     GST_STATIC_CAPS_ANY);
95 static GstStaticPadTemplate subtitle_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100
101 static GQuark PAD_CONTEXT;
102
103 static void
104 _do_init (void)
105 {
106   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
107 }
108
109 #define gst_splitmux_sink_parent_class parent_class
110 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
111     _do_init ());
112
113 static gboolean create_elements (GstSplitMuxSink * splitmux);
114 static gboolean create_sink (GstSplitMuxSink * splitmux);
115 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
116     const GValue * value, GParamSpec * pspec);
117 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
118     GValue * value, GParamSpec * pspec);
119 static void gst_splitmux_sink_dispose (GObject * object);
120 static void gst_splitmux_sink_finalize (GObject * object);
121
122 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
123     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
124 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
125
126 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
127     element, GstStateChange transition);
128
129 static void bus_handler (GstBin * bin, GstMessage * msg);
130 static void set_next_filename (GstSplitMuxSink * splitmux);
131 static void start_next_fragment (GstSplitMuxSink * splitmux);
132 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
133
134 static MqStreamBuf *
135 mq_stream_buf_new (void)
136 {
137   return g_slice_new0 (MqStreamBuf);
138 }
139
140 static void
141 mq_stream_buf_free (MqStreamBuf * data)
142 {
143   g_slice_free (MqStreamBuf, data);
144 }
145
146 static void
147 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
148 {
149   GObjectClass *gobject_class = (GObjectClass *) klass;
150   GstElementClass *gstelement_class = (GstElementClass *) klass;
151   GstBinClass *gstbin_class = (GstBinClass *) klass;
152
153   gobject_class->set_property = gst_splitmux_sink_set_property;
154   gobject_class->get_property = gst_splitmux_sink_get_property;
155   gobject_class->dispose = gst_splitmux_sink_dispose;
156   gobject_class->finalize = gst_splitmux_sink_finalize;
157
158   gst_element_class_set_static_metadata (gstelement_class,
159       "Split Muxing Bin", "Generic/Bin/Muxer",
160       "Convenience bin that muxes incoming streams into multiple time/size limited files",
161       "Jan Schmidt <jan@centricular.com>");
162
163   gst_element_class_add_pad_template (gstelement_class,
164       gst_static_pad_template_get (&video_sink_template));
165   gst_element_class_add_pad_template (gstelement_class,
166       gst_static_pad_template_get (&audio_sink_template));
167   gst_element_class_add_pad_template (gstelement_class,
168       gst_static_pad_template_get (&subtitle_sink_template));
169
170   gstelement_class->change_state =
171       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
172   gstelement_class->request_new_pad =
173       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
174   gstelement_class->release_pad =
175       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
176
177   gstbin_class->handle_message = bus_handler;
178
179   g_object_class_install_property (gobject_class, PROP_LOCATION,
180       g_param_spec_string ("location", "File Output Pattern",
181           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
182           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
183   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
184       g_param_spec_double ("mux-overhead", "Muxing Overhead",
185           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
186           DEFAULT_MUXER_OVERHEAD,
187           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
188
189   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
190       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
191           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
192           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
193   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
194       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
195           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
196           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
197
198   g_object_class_install_property (gobject_class, PROP_MUXER,
199       g_param_spec_object ("muxer", "Muxer",
200           "The muxer element to use (NULL = default mp4mux)",
201           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
202   g_object_class_install_property (gobject_class, PROP_SINK,
203       g_param_spec_object ("sink", "Sink",
204           "The sink element (or element chain) to use (NULL = default filesink)",
205           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206 }
207
208 static void
209 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
210 {
211   g_mutex_init (&splitmux->lock);
212   g_cond_init (&splitmux->data_cond);
213
214   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
215   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
216   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
217 }
218
219 static void
220 gst_splitmux_reset (GstSplitMuxSink * splitmux)
221 {
222   if (splitmux->mq)
223     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
224   if (splitmux->muxer)
225     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
226   if (splitmux->active_sink)
227     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
228
229   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
230       NULL;
231 }
232
233 static void
234 gst_splitmux_sink_dispose (GObject * object)
235 {
236   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
237
238   G_OBJECT_CLASS (parent_class)->dispose (object);
239
240   /* Calling parent dispose invalidates all child pointers */
241   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
242       NULL;
243 }
244
245 static void
246 gst_splitmux_sink_finalize (GObject * object)
247 {
248   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
249   g_cond_clear (&splitmux->data_cond);
250   if (splitmux->provided_sink)
251     gst_object_unref (splitmux->provided_sink);
252
253   g_free (splitmux->location);
254
255   G_OBJECT_CLASS (parent_class)->finalize (object);
256 }
257
258 static void
259 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
260     const GValue * value, GParamSpec * pspec)
261 {
262   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
263
264   switch (prop_id) {
265     case PROP_LOCATION:{
266       GST_OBJECT_LOCK (splitmux);
267       g_free (splitmux->location);
268       splitmux->location = g_value_dup_string (value);
269       GST_OBJECT_UNLOCK (splitmux);
270       break;
271     }
272     case PROP_MAX_SIZE_BYTES:
273       GST_OBJECT_LOCK (splitmux);
274       splitmux->threshold_bytes = g_value_get_uint64 (value);
275       GST_OBJECT_UNLOCK (splitmux);
276       break;
277     case PROP_MAX_SIZE_TIME:
278       GST_OBJECT_LOCK (splitmux);
279       splitmux->threshold_time = g_value_get_uint64 (value);
280       GST_OBJECT_UNLOCK (splitmux);
281       break;
282     case PROP_MUXER_OVERHEAD:
283       GST_OBJECT_LOCK (splitmux);
284       splitmux->mux_overhead = g_value_get_double (value);
285       GST_OBJECT_UNLOCK (splitmux);
286       break;
287     case PROP_SINK:
288       GST_SPLITMUX_LOCK (splitmux);
289       if (splitmux->provided_sink)
290         gst_object_unref (splitmux->provided_sink);
291       splitmux->provided_sink = g_value_dup_object (value);
292       GST_SPLITMUX_UNLOCK (splitmux);
293       break;
294     case PROP_MUXER:
295       GST_SPLITMUX_LOCK (splitmux);
296       if (splitmux->provided_muxer)
297         gst_object_unref (splitmux->provided_muxer);
298       splitmux->provided_muxer = g_value_dup_object (value);
299       GST_SPLITMUX_UNLOCK (splitmux);
300       break;
301     default:
302       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
303       break;
304   }
305 }
306
307 static void
308 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
309     GValue * value, GParamSpec * pspec)
310 {
311   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
312
313   switch (prop_id) {
314     case PROP_LOCATION:
315       GST_OBJECT_LOCK (splitmux);
316       g_value_set_string (value, splitmux->location);
317       GST_OBJECT_UNLOCK (splitmux);
318       break;
319     case PROP_MAX_SIZE_BYTES:
320       GST_OBJECT_LOCK (splitmux);
321       g_value_set_uint64 (value, splitmux->threshold_bytes);
322       GST_OBJECT_UNLOCK (splitmux);
323       break;
324     case PROP_MAX_SIZE_TIME:
325       GST_OBJECT_LOCK (splitmux);
326       g_value_set_uint64 (value, splitmux->threshold_time);
327       GST_OBJECT_UNLOCK (splitmux);
328       break;
329     case PROP_MUXER_OVERHEAD:
330       GST_OBJECT_LOCK (splitmux);
331       g_value_set_double (value, splitmux->mux_overhead);
332       GST_OBJECT_UNLOCK (splitmux);
333       break;
334     case PROP_SINK:
335       GST_SPLITMUX_LOCK (splitmux);
336       g_value_set_object (value, splitmux->provided_sink);
337       GST_SPLITMUX_UNLOCK (splitmux);
338       break;
339     case PROP_MUXER:
340       GST_SPLITMUX_LOCK (splitmux);
341       g_value_set_object (value, splitmux->provided_muxer);
342       GST_SPLITMUX_UNLOCK (splitmux);
343       break;
344     default:
345       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
346       break;
347   }
348 }
349
350 static GstPad *
351 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
352 {
353   gchar *tmp, *sinkname, *srcname;
354   GstPad *mq_src;
355
356   sinkname = gst_pad_get_name (sink_pad);
357   tmp = sinkname + 5;
358   srcname = g_strdup_printf ("src_%s", tmp);
359
360   mq_src = gst_element_get_static_pad (mq, srcname);
361
362   g_free (sinkname);
363   g_free (srcname);
364
365   return mq_src;
366 }
367
368 static gboolean
369 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
370     GstPad ** src_pad)
371 {
372   GstPad *mq_sink;
373   GstPad *mq_src;
374
375   /* Request a pad from multiqueue, then connect this one, then
376    * discover the corresponding output pad and return both */
377   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
378   if (mq_sink == NULL)
379     return FALSE;
380
381   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
382   if (mq_src == NULL)
383     goto fail;
384
385   *sink_pad = mq_sink;
386   *src_pad = mq_src;
387
388   return TRUE;
389
390 fail:
391   gst_element_release_request_pad (splitmux->mq, mq_sink);
392   return FALSE;
393 }
394
395 static MqStreamCtx *
396 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
397 {
398   MqStreamCtx *ctx;
399
400   ctx = g_new0 (MqStreamCtx, 1);
401   g_atomic_int_set (&ctx->refcount, 1);
402   ctx->splitmux = splitmux;
403   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
404   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
405   ctx->in_running_time = ctx->out_running_time = 0;
406   g_queue_init (&ctx->queued_bufs);
407   return ctx;
408 }
409
410 static void
411 mq_stream_ctx_free (MqStreamCtx * ctx)
412 {
413   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
414   g_queue_clear (&ctx->queued_bufs);
415   g_free (ctx);
416 }
417
418 static void
419 mq_stream_ctx_unref (MqStreamCtx * ctx)
420 {
421   if (g_atomic_int_dec_and_test (&ctx->refcount))
422     mq_stream_ctx_free (ctx);
423 }
424
425 static void
426 mq_stream_ctx_ref (MqStreamCtx * ctx)
427 {
428   g_atomic_int_inc (&ctx->refcount);
429 }
430
431 static void
432 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
433 {
434   ctx->sink_pad_block_id = 0;
435   mq_stream_ctx_unref (ctx);
436 }
437
438 static void
439 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
440 {
441   ctx->src_pad_block_id = 0;
442   mq_stream_ctx_unref (ctx);
443 }
444
445 /* Called with lock held, drops the lock to send EOS to the
446  * pad
447  */
448 static void
449 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
450 {
451   GstEvent *eos;
452   GstPad *pad;
453
454   eos = gst_event_new_eos ();
455   pad = gst_pad_get_peer (ctx->srcpad);
456
457   ctx->out_eos = TRUE;
458
459   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
460   GST_SPLITMUX_UNLOCK (splitmux);
461   gst_pad_send_event (pad, eos);
462   GST_SPLITMUX_LOCK (splitmux);
463
464   gst_object_unref (pad);
465 }
466
467 /* Called with splitmux lock held to check if this output
468  * context needs to sleep to wait for the release of the
469  * next GOP, or to send EOS to close out the current file
470  */
471 static void
472 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
473 {
474   do {
475
476     GST_LOG_OBJECT (ctx->srcpad,
477         "Checking running time %" GST_TIME_FORMAT " against max %"
478         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
479         GST_TIME_ARGS (splitmux->max_out_running_time));
480
481     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
482         ctx->out_running_time < splitmux->max_out_running_time)
483       return;
484
485     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
486       return;
487
488     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
489       if (ctx->out_eos == FALSE) {
490         send_eos (splitmux, ctx);
491         continue;
492       }
493     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
494       start_next_fragment (splitmux);
495       continue;
496     }
497
498     GST_INFO_OBJECT (ctx->srcpad,
499         "Sleeping for running time %"
500         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
501         GST_TIME_ARGS (ctx->out_running_time),
502         GST_TIME_ARGS (splitmux->max_out_running_time));
503     ctx->out_blocked = TRUE;
504     /* Expand the mq if needed before sleeping */
505     check_queue_length (splitmux, ctx);
506     GST_SPLITMUX_WAIT (splitmux);
507     ctx->out_blocked = FALSE;
508     GST_INFO_OBJECT (ctx->srcpad,
509         "Woken for new max running time %" GST_TIME_FORMAT,
510         GST_TIME_ARGS (splitmux->max_out_running_time));
511   } while (1);
512 }
513
514 static GstPadProbeReturn
515 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
516 {
517   GstSplitMuxSink *splitmux = ctx->splitmux;
518   MqStreamBuf *buf_info = NULL;
519
520   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
521
522   /* FIXME: Handle buffer lists, until then make it clear they won't work */
523   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
524     g_warning ("Buffer list handling not implemented");
525     return GST_PAD_PROBE_DROP;
526   }
527   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
528     GstEvent *event = gst_pad_probe_info_get_event (info);
529
530     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
531
532     switch (GST_EVENT_TYPE (event)) {
533       case GST_EVENT_SEGMENT:
534         gst_event_copy_segment (event, &ctx->out_segment);
535         break;
536       case GST_EVENT_FLUSH_STOP:
537         GST_SPLITMUX_LOCK (splitmux);
538         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
539         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
540         g_queue_clear (&ctx->queued_bufs);
541         ctx->flushing = FALSE;
542         GST_SPLITMUX_UNLOCK (splitmux);
543         break;
544       case GST_EVENT_FLUSH_START:
545         GST_SPLITMUX_LOCK (splitmux);
546         GST_LOG_OBJECT (pad, "Flush start");
547         ctx->flushing = TRUE;
548         GST_SPLITMUX_BROADCAST (splitmux);
549         GST_SPLITMUX_UNLOCK (splitmux);
550         break;
551       case GST_EVENT_EOS:
552         GST_SPLITMUX_LOCK (splitmux);
553         if (splitmux->state == SPLITMUX_STATE_STOPPED)
554           goto beach;
555         ctx->out_eos = TRUE;
556         GST_SPLITMUX_UNLOCK (splitmux);
557         break;
558       case GST_EVENT_GAP:{
559         GstClockTime gap_ts;
560
561         gst_event_parse_gap (event, &gap_ts, NULL);
562         if (gap_ts == GST_CLOCK_TIME_NONE)
563           break;
564
565         GST_SPLITMUX_LOCK (splitmux);
566
567         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
568             GST_FORMAT_TIME, gap_ts);
569
570         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
571             GST_TIME_ARGS (gap_ts));
572
573         if (splitmux->state == SPLITMUX_STATE_STOPPED)
574           goto beach;
575         ctx->out_running_time = gap_ts;
576         complete_or_wait_on_out (splitmux, ctx);
577         GST_SPLITMUX_UNLOCK (splitmux);
578         break;
579       }
580       default:
581         break;
582     }
583     return GST_PAD_PROBE_PASS;
584   }
585
586   /* Allow everything through until the configured next stopping point */
587   GST_SPLITMUX_LOCK (splitmux);
588
589   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
590   if (buf_info == NULL)
591     /* Can only happen due to a poorly timed flush */
592     goto beach;
593
594   /* If we have popped a keyframe, decrement the queued_gop count */
595   if (buf_info->keyframe && splitmux->queued_gops > 0)
596     splitmux->queued_gops--;
597
598   ctx->out_running_time = buf_info->run_ts;
599
600   GST_LOG_OBJECT (splitmux,
601       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
602       " size %" G_GSIZE_FORMAT,
603       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
604
605   complete_or_wait_on_out (splitmux, ctx);
606
607   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
608       splitmux->muxed_out_time < buf_info->run_ts)
609     splitmux->muxed_out_time = buf_info->run_ts;
610
611   splitmux->muxed_out_bytes += buf_info->buf_size;
612
613 #ifndef GST_DISABLE_GST_DEBUG
614   {
615     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
616     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
617         " run ts %" GST_TIME_FORMAT, buf,
618         GST_TIME_ARGS (ctx->out_running_time));
619   }
620 #endif
621
622   GST_SPLITMUX_UNLOCK (splitmux);
623
624   mq_stream_buf_free (buf_info);
625
626   return GST_PAD_PROBE_PASS;
627
628 beach:
629   GST_SPLITMUX_UNLOCK (splitmux);
630   if (buf_info)
631     mq_stream_buf_free (buf_info);
632   return GST_PAD_PROBE_DROP;
633 }
634
635 static gboolean
636 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
637 {
638   return gst_pad_send_event (peer, gst_event_ref (*event));
639 }
640
641 static void
642 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
643 {
644   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
645
646   gst_pad_sticky_events_foreach (ctx->srcpad,
647       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
648
649   /* Clear EOS flag */
650   ctx->out_eos = FALSE;
651 }
652
653 /* Called with lock held when a fragment
654  * reaches EOS and it is time to restart
655  * a new fragment
656  */
657 static void
658 start_next_fragment (GstSplitMuxSink * splitmux)
659 {
660   /* 1 change to new file */
661   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
662   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
663
664   set_next_filename (splitmux);
665
666   gst_element_sync_state_with_parent (splitmux->active_sink);
667   gst_element_sync_state_with_parent (splitmux->muxer);
668
669   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
670
671   /* Switch state and go back to processing */
672   splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
673   if (!splitmux->video_ctx->in_eos)
674     splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
675   else
676     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
677
678   /* Store the overflow parameters as the basis for the next fragment */
679   splitmux->mux_start_time = splitmux->muxed_out_time;
680   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
681
682   GST_DEBUG_OBJECT (splitmux,
683       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
684       GST_TIME_ARGS (splitmux->max_out_running_time));
685
686   GST_SPLITMUX_BROADCAST (splitmux);
687 }
688
689 static void
690 bus_handler (GstBin * bin, GstMessage * message)
691 {
692   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
693
694   switch (GST_MESSAGE_TYPE (message)) {
695     case GST_MESSAGE_EOS:
696       /* If the state is draining out the current file, drop this EOS */
697       GST_SPLITMUX_LOCK (splitmux);
698       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
699           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
700         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
701         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
702         GST_SPLITMUX_BROADCAST (splitmux);
703
704         gst_message_unref (message);
705         GST_SPLITMUX_UNLOCK (splitmux);
706         return;
707       }
708       GST_SPLITMUX_UNLOCK (splitmux);
709       break;
710     default:
711       break;
712   }
713
714   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
715 }
716
717 /* Called with splitmux lock held */
718 /* Called when entering ProcessingCompleteGop state
719  * Assess if mq contents overflowed the current file
720  *   -> If yes, need to switch to new file
721  *   -> if no, set max_out_running_time to let this GOP in and
722  *      go to COLLECTING_GOP_START state
723  */
724 static void
725 handle_gathered_gop (GstSplitMuxSink * splitmux)
726 {
727   GList *cur;
728   gsize queued_bytes = 0;
729   GstClockTime queued_time = 0;
730
731   /* Assess if the multiqueue contents overflowed the current file */
732   for (cur = g_list_first (splitmux->contexts);
733       cur != NULL; cur = g_list_next (cur)) {
734     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
735     if (tmpctx->in_running_time > queued_time)
736       queued_time = tmpctx->in_running_time;
737     queued_bytes += tmpctx->in_bytes;
738   }
739
740   g_assert (queued_bytes >= splitmux->mux_start_bytes);
741   g_assert (queued_time >= splitmux->mux_start_time);
742
743   queued_bytes -= splitmux->mux_start_bytes;
744   queued_time -= splitmux->mux_start_time;
745
746   /* Expand queued bytes estimate by muxer overhead */
747   queued_bytes += (queued_bytes * splitmux->mux_overhead);
748
749   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
750       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
751
752   /* Check for overrun - have we output at least one byte and overrun
753    * either threshold? */
754   if ((splitmux->mux_start_bytes < splitmux->muxed_out_bytes) &&
755       ((splitmux->threshold_bytes > 0 &&
756               queued_bytes >= splitmux->threshold_bytes) ||
757           (splitmux->threshold_time > 0 &&
758               queued_time >= splitmux->threshold_time))) {
759
760     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
761
762     GST_INFO_OBJECT (splitmux,
763         "mq overflowed since last, draining out. max out TS is %"
764         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
765     GST_SPLITMUX_BROADCAST (splitmux);
766
767   } else {
768     /* No overflow */
769     GST_LOG_OBJECT (splitmux,
770         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
771         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
772         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
773         queued_bytes, GST_TIME_ARGS (queued_time));
774
775     /* Wake everyone up to push this one GOP, then sleep */
776     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
777     if (!splitmux->video_ctx->in_eos)
778       splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
779     else
780       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
781
782     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
783         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
784     GST_SPLITMUX_BROADCAST (splitmux);
785   }
786
787 }
788
789 /* Called with splitmux lock held */
790 /* Called from each input pad when it is has all the pieces
791  * for a GOP or EOS, starting with the video pad which has set the
792  * splitmux->max_in_running_time
793  */
794 static void
795 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
796 {
797   GList *cur;
798   gboolean ready = TRUE;
799
800   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
801     /* Iterate each pad, and check that the input running time is at least
802      * up to the video runnning time, and if so handle the collected GOP */
803     GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
804     for (cur = g_list_first (splitmux->contexts);
805         cur != NULL; cur = g_list_next (cur)) {
806       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
807
808       GST_LOG_OBJECT (splitmux,
809           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
810           " EOS %d", tmpctx, tmpctx->srcpad,
811           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
812
813       if (tmpctx->in_running_time < splitmux->max_in_running_time &&
814           !tmpctx->in_eos) {
815         GST_LOG_OBJECT (splitmux,
816             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
817             tmpctx, tmpctx->srcpad);
818         ready = FALSE;
819         break;
820       }
821     }
822     if (ready) {
823       GST_DEBUG_OBJECT (splitmux,
824           "Collected GOP is complete. Processing (ctx %p)", ctx);
825       /* All pads have a complete GOP, release it into the multiqueue */
826       handle_gathered_gop (splitmux);
827     }
828   }
829
830   /* Some pad is not yet ready, or GOP is being pushed
831    * either way, sleep and wait to get woken */
832   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
833           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
834       !ctx->flushing) {
835
836     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
837         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
838         "GOP complete" : "EOF draining", ctx);
839     GST_SPLITMUX_WAIT (splitmux);
840
841     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
842   }
843 }
844
845 static void
846 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
847 {
848   GList *cur;
849   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
850
851   GST_DEBUG_OBJECT (ctx->sinkpad,
852       "Checking queue length len %u cur_max %u queued gops %u",
853       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
854
855   if (cur_len >= splitmux->mq_max_buffers) {
856     gboolean allow_grow = FALSE;
857
858     /* If collecting a GOP and this pad might block,
859      * and there isn't already a pending GOP in the queue
860      * then grow
861      */
862     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
863         ctx->in_running_time < splitmux->max_in_running_time &&
864         splitmux->queued_gops <= 1) {
865       allow_grow = TRUE;
866     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
867         ctx->is_video) {
868       allow_grow = TRUE;
869     }
870
871     if (!allow_grow) {
872       for (cur = g_list_first (splitmux->contexts);
873           cur != NULL; cur = g_list_next (cur)) {
874         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
875         GST_DEBUG_OBJECT (tmpctx->sinkpad,
876             " len %u out_blocked %d",
877             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
878         /* If another stream is starving, grow */
879         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
880           allow_grow = TRUE;
881         }
882       }
883     }
884
885     if (allow_grow) {
886       splitmux->mq_max_buffers = cur_len + 1;
887
888       GST_INFO_OBJECT (splitmux,
889           "Multiqueue overrun - enlarging to %u buffers ctx %p",
890           splitmux->mq_max_buffers, ctx);
891
892       g_object_set (splitmux->mq, "max-size-buffers",
893           splitmux->mq_max_buffers, NULL);
894     }
895   }
896 }
897
898 static GstPadProbeReturn
899 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
900 {
901   GstSplitMuxSink *splitmux = ctx->splitmux;
902   GstBuffer *buf;
903   MqStreamBuf *buf_info;
904   GstClockTime ts;
905   gboolean loop_again;
906   gboolean keyframe = FALSE;
907
908   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
909
910   /* FIXME: Handle buffer lists, until then make it clear they won't work */
911   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
912     g_warning ("Buffer list handling not implemented");
913     return GST_PAD_PROBE_DROP;
914   }
915   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
916     GstEvent *event = gst_pad_probe_info_get_event (info);
917     switch (GST_EVENT_TYPE (event)) {
918       case GST_EVENT_SEGMENT:
919         gst_event_copy_segment (event, &ctx->in_segment);
920         break;
921       case GST_EVENT_FLUSH_STOP:
922         GST_SPLITMUX_LOCK (splitmux);
923         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
924         ctx->in_eos = FALSE;
925         ctx->in_bytes = 0;
926         ctx->in_running_time = 0;
927         GST_SPLITMUX_UNLOCK (splitmux);
928         break;
929       case GST_EVENT_EOS:
930         GST_SPLITMUX_LOCK (splitmux);
931         ctx->in_eos = TRUE;
932
933         if (splitmux->state == SPLITMUX_STATE_STOPPED)
934           goto beach;
935
936         if (ctx->is_video) {
937           GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
938           /* Act as if this is a new keyframe with infinite timestamp */
939           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
940           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
941           /* Wake up other input pads to collect this GOP */
942           GST_SPLITMUX_BROADCAST (splitmux);
943           check_completed_gop (splitmux, ctx);
944         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
945           /* If we are waiting for a GOP to be completed (ie, for aux
946            * pads to catch up), then this pad is complete, so check
947            * if the whole GOP is.
948            */
949           check_completed_gop (splitmux, ctx);
950         }
951         GST_SPLITMUX_UNLOCK (splitmux);
952         break;
953       default:
954         break;
955     }
956     return GST_PAD_PROBE_PASS;
957   }
958
959   buf = gst_pad_probe_info_get_buffer (info);
960   ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
961       GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
962   buf_info = mq_stream_buf_new ();
963
964   if (GST_BUFFER_PTS_IS_VALID (buf))
965     ts = GST_BUFFER_PTS (buf);
966   else
967     ts = GST_BUFFER_DTS (buf);
968
969   GST_SPLITMUX_LOCK (splitmux);
970
971   if (splitmux->state == SPLITMUX_STATE_STOPPED)
972     goto beach;
973
974   /* If this buffer has a timestamp, advance the input timestamp of the
975    * stream */
976   if (GST_CLOCK_TIME_IS_VALID (ts)) {
977     GstClockTime running_time =
978         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
979         GST_BUFFER_TIMESTAMP (buf));
980
981     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
982         (ctx->in_running_time == GST_CLOCK_TIME_NONE
983             || running_time > ctx->in_running_time))
984       ctx->in_running_time = running_time;
985   }
986
987   /* Try to make sure we have a valid running time */
988   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
989     ctx->in_running_time =
990         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
991         ctx->in_segment.start);
992   }
993
994   buf_info->run_ts = ctx->in_running_time;
995   buf_info->buf_size = gst_buffer_get_size (buf);
996
997   /* Update total input byte counter for overflow detect */
998   ctx->in_bytes += buf_info->buf_size;
999
1000   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1001       " total in_bytes %" G_GSIZE_FORMAT,
1002       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1003
1004   loop_again = TRUE;
1005   do {
1006     if (ctx->flushing)
1007       break;
1008
1009     switch (splitmux->state) {
1010       case SPLITMUX_STATE_COLLECTING_GOP_START:
1011         if (ctx->is_video) {
1012           /* If a keyframe, we have a complete GOP */
1013           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1014               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1015               splitmux->max_in_running_time >= ctx->in_running_time) {
1016             /* Pass this buffer through */
1017             loop_again = FALSE;
1018             break;
1019           }
1020           GST_INFO_OBJECT (pad,
1021               "Have keyframe with running time %" GST_TIME_FORMAT,
1022               GST_TIME_ARGS (ctx->in_running_time));
1023           keyframe = TRUE;
1024           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1025           splitmux->max_in_running_time = ctx->in_running_time;
1026           /* Wake up other input pads to collect this GOP */
1027           GST_SPLITMUX_BROADCAST (splitmux);
1028           check_completed_gop (splitmux, ctx);
1029         } else {
1030           /* We're still waiting for a keyframe on the video pad, sleep */
1031           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1032           GST_SPLITMUX_WAIT (splitmux);
1033           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1034               splitmux->state);
1035         }
1036         break;
1037       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1038         /* After a GOP start is found, this buffer might complete the GOP */
1039         /* If we overran the target timestamp, it might be time to process
1040          * the GOP, otherwise bail out for more data
1041          */
1042         GST_LOG_OBJECT (pad,
1043             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1044             GST_TIME_ARGS (ctx->in_running_time),
1045             GST_TIME_ARGS (splitmux->max_in_running_time));
1046
1047         if (ctx->in_running_time < splitmux->max_in_running_time) {
1048           loop_again = FALSE;
1049           break;
1050         }
1051
1052         GST_LOG_OBJECT (pad,
1053             "Collected last packet of GOP. Checking other pads");
1054         check_completed_gop (splitmux, ctx);
1055         break;
1056       case SPLITMUX_STATE_ENDING_FILE:
1057       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1058         /* A fragment is ending, wait until that's done before continuing */
1059         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1060         GST_SPLITMUX_WAIT (splitmux);
1061         GST_DEBUG_OBJECT (pad,
1062             "Done sleeping for fragment restart state now %d", splitmux->state);
1063         break;
1064       default:
1065         loop_again = FALSE;
1066         break;
1067     }
1068   } while (loop_again);
1069
1070   if (keyframe) {
1071     splitmux->queued_gops++;
1072     buf_info->keyframe = TRUE;
1073   }
1074
1075   /* Now add this buffer to the queue just before returning */
1076   g_queue_push_head (&ctx->queued_bufs, buf_info);
1077
1078   /* Check the buffer will fit in the mq */
1079   check_queue_length (splitmux, ctx);
1080
1081   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1082       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1083
1084 beach:
1085   GST_SPLITMUX_UNLOCK (splitmux);
1086
1087   return GST_PAD_PROBE_PASS;
1088 }
1089
1090 static GstPad *
1091 gst_splitmux_sink_request_new_pad (GstElement * element,
1092     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1093 {
1094   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1095   GstPadTemplate *mux_template = NULL;
1096   GstPad *res = NULL;
1097   GstPad *mq_sink, *mq_src;
1098   gchar *gname;
1099   gboolean is_video = FALSE;
1100   MqStreamCtx *ctx;
1101
1102   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1103
1104   GST_SPLITMUX_LOCK (splitmux);
1105   if (!create_elements (splitmux))
1106     goto fail;
1107
1108   if (templ->name_template) {
1109     if (g_str_equal (templ->name_template, "video")) {
1110       /* FIXME: Look for a pad template with matching caps, rather than by name */
1111       mux_template =
1112           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1113           (splitmux->muxer), "video_%u");
1114       is_video = TRUE;
1115       name = NULL;
1116     } else {
1117       mux_template =
1118           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1119           (splitmux->muxer), templ->name_template);
1120     }
1121   }
1122
1123   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1124   if (res == NULL)
1125     goto fail;
1126
1127   if (is_video)
1128     gname = g_strdup ("video");
1129   else if (name == NULL)
1130     gname = gst_pad_get_name (res);
1131   else
1132     gname = g_strdup (name);
1133
1134   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1135     gst_element_release_request_pad (splitmux->muxer, res);
1136     goto fail;
1137   }
1138
1139   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1140     gst_element_release_request_pad (splitmux->muxer, res);
1141     gst_element_release_request_pad (splitmux->mq, mq_sink);
1142     goto fail;
1143   }
1144
1145   ctx = mq_stream_ctx_new (splitmux);
1146   ctx->is_video = is_video;
1147   ctx->srcpad = mq_src;
1148   ctx->sinkpad = mq_sink;
1149
1150   mq_stream_ctx_ref (ctx);
1151   ctx->src_pad_block_id =
1152       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1153       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1154       _pad_block_destroy_src_notify);
1155   if (is_video)
1156     splitmux->video_ctx = ctx;
1157
1158   res = gst_ghost_pad_new (gname, mq_sink);
1159   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1160
1161   mq_stream_ctx_ref (ctx);
1162   ctx->sink_pad_block_id =
1163       gst_pad_add_probe (res, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1164       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1165       _pad_block_destroy_sink_notify);
1166
1167   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1168       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1169
1170   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1171
1172   g_free (gname);
1173
1174   gst_object_unref (mq_sink);
1175   gst_object_unref (mq_src);
1176
1177   gst_pad_set_active (res, TRUE);
1178   gst_element_add_pad (element, res);
1179   GST_SPLITMUX_UNLOCK (splitmux);
1180
1181   return res;
1182 fail:
1183   GST_SPLITMUX_UNLOCK (splitmux);
1184   return NULL;
1185 }
1186
1187 static void
1188 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1189 {
1190   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1191   GstPad *mqsink, *mqsrc, *muxpad;
1192   MqStreamCtx *ctx =
1193       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1194
1195   GST_SPLITMUX_LOCK (splitmux);
1196
1197   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1198     goto fail;                  /* Elements don't exist yet - nothing to release */
1199
1200   GST_INFO_OBJECT (pad, "releasing request pad");
1201
1202   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1203   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1204   muxpad = gst_pad_get_peer (mqsrc);
1205
1206   /* Remove the context from our consideration */
1207   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1208
1209   if (ctx->sink_pad_block_id)
1210     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1211
1212   if (ctx->src_pad_block_id)
1213     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1214
1215   /* Can release the context now */
1216   mq_stream_ctx_unref (ctx);
1217
1218   /* Release and free the mq input */
1219   gst_element_release_request_pad (splitmux->mq, mqsink);
1220
1221   /* Release and free the muxer input */
1222   gst_element_release_request_pad (splitmux->muxer, muxpad);
1223
1224   gst_object_unref (mqsink);
1225   gst_object_unref (mqsrc);
1226   gst_object_unref (muxpad);
1227
1228   gst_element_remove_pad (element, pad);
1229
1230 fail:
1231   GST_SPLITMUX_UNLOCK (splitmux);
1232 }
1233
1234 static GstElement *
1235 create_element (GstSplitMuxSink * splitmux,
1236     const gchar * factory, const gchar * name)
1237 {
1238   GstElement *ret = gst_element_factory_make (factory, name);
1239   if (ret == NULL) {
1240     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1241     return NULL;
1242   }
1243
1244   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1245     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1246     gst_object_unref (ret);
1247     return NULL;
1248   }
1249
1250   return ret;
1251 }
1252
1253 static gboolean
1254 create_elements (GstSplitMuxSink * splitmux)
1255 {
1256   /* Create internal elements */
1257   if (splitmux->mq == NULL) {
1258     if ((splitmux->mq =
1259             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1260       goto fail;
1261
1262     splitmux->mq_max_buffers = 5;
1263     /* No bytes or time limit, we limit buffers manually */
1264     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1265         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1266   }
1267
1268   if (splitmux->muxer == NULL) {
1269     if (splitmux->provided_muxer == NULL) {
1270       if ((splitmux->muxer =
1271               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1272         goto fail;
1273     } else {
1274       splitmux->muxer = splitmux->provided_muxer;
1275       if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_muxer)) {
1276         g_warning ("Could not add muxer element - splitmuxsink will not work");
1277         goto fail;
1278       }
1279     }
1280   }
1281
1282   return TRUE;
1283 fail:
1284   return FALSE;
1285 }
1286
1287 static GstElement *
1288 find_sink (GstElement * e)
1289 {
1290   GstElement *res = NULL;
1291   GstIterator *iter;
1292   gboolean done = FALSE;
1293   GValue data = { 0, };
1294
1295   if (!GST_IS_BIN (e))
1296     return e;
1297
1298   iter = gst_bin_iterate_sinks (GST_BIN (e));
1299   while (!done) {
1300     switch (gst_iterator_next (iter, &data)) {
1301       case GST_ITERATOR_OK:
1302       {
1303         GstElement *child = g_value_get_object (&data);
1304         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1305                 "location") != NULL) {
1306           res = child;
1307           done = TRUE;
1308         }
1309         g_value_reset (&data);
1310         break;
1311       }
1312       case GST_ITERATOR_RESYNC:
1313         gst_iterator_resync (iter);
1314         break;
1315       case GST_ITERATOR_DONE:
1316         done = TRUE;
1317         break;
1318       case GST_ITERATOR_ERROR:
1319         g_assert_not_reached ();
1320         break;
1321     }
1322   }
1323   g_value_unset (&data);
1324   gst_iterator_free (iter);
1325
1326   return res;
1327 }
1328
1329 static gboolean
1330 create_sink (GstSplitMuxSink * splitmux)
1331 {
1332   g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
1333
1334   if (splitmux->provided_sink == NULL) {
1335     if ((splitmux->sink =
1336             create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1337       goto fail;
1338     splitmux->active_sink = splitmux->sink;
1339   } else {
1340     if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_sink)) {
1341       g_warning ("Could not add sink elements - splitmuxsink will not work");
1342       goto fail;
1343     }
1344
1345     splitmux->active_sink = splitmux->provided_sink;
1346
1347     /* Find the sink element */
1348     splitmux->sink = find_sink (splitmux->active_sink);
1349     if (splitmux->sink == NULL) {
1350       g_warning
1351           ("Could not locate sink element in provided sink - splitmuxsink will not work");
1352       goto fail;
1353     }
1354   }
1355
1356   if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1357     g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1358     goto fail;
1359   }
1360
1361   return TRUE;
1362 fail:
1363   return FALSE;
1364 }
1365
1366 #ifdef __GNUC__
1367 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1368 #endif
1369 static void
1370 set_next_filename (GstSplitMuxSink * splitmux)
1371 {
1372   if (splitmux->location) {
1373     gchar *fname;
1374
1375     fname = g_strdup_printf (splitmux->location, splitmux->fragment_id);
1376     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1377     g_object_set (splitmux->sink, "location", fname, NULL);
1378     g_free (fname);
1379
1380     splitmux->fragment_id++;
1381   }
1382 }
1383
1384 static GstStateChangeReturn
1385 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1386 {
1387   GstStateChangeReturn ret;
1388   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1389
1390   switch (transition) {
1391     case GST_STATE_CHANGE_NULL_TO_READY:{
1392       GST_SPLITMUX_LOCK (splitmux);
1393       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1394         ret = GST_STATE_CHANGE_FAILURE;
1395         GST_SPLITMUX_UNLOCK (splitmux);
1396         goto beach;
1397       }
1398       GST_SPLITMUX_UNLOCK (splitmux);
1399       splitmux->fragment_id = 0;
1400       set_next_filename (splitmux);
1401       break;
1402     }
1403     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1404       GST_SPLITMUX_LOCK (splitmux);
1405       /* Start by collecting one input on each pad */
1406       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1407       splitmux->max_in_running_time = 0;
1408       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1409       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1410       GST_SPLITMUX_UNLOCK (splitmux);
1411       break;
1412     }
1413     case GST_STATE_CHANGE_PAUSED_TO_READY:
1414     case GST_STATE_CHANGE_READY_TO_NULL:
1415       GST_SPLITMUX_LOCK (splitmux);
1416       splitmux->state = SPLITMUX_STATE_STOPPED;
1417       /* Wake up any blocked threads */
1418       GST_LOG_OBJECT (splitmux,
1419           "State change -> NULL or READY. Waking threads");
1420       GST_SPLITMUX_BROADCAST (splitmux);
1421       GST_SPLITMUX_UNLOCK (splitmux);
1422       break;
1423     default:
1424       break;
1425   }
1426
1427   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1428   if (ret == GST_STATE_CHANGE_FAILURE)
1429     goto beach;
1430
1431   switch (transition) {
1432     case GST_STATE_CHANGE_READY_TO_NULL:
1433       GST_SPLITMUX_LOCK (splitmux);
1434       splitmux->fragment_id = 0;
1435       gst_splitmux_reset (splitmux);
1436       GST_SPLITMUX_UNLOCK (splitmux);
1437       break;
1438     default:
1439       break;
1440   }
1441
1442 beach:
1443
1444   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1445       ret == GST_STATE_CHANGE_FAILURE) {
1446     /* Cleanup elements on failed transition out of NULL */
1447     gst_splitmux_reset (splitmux);
1448   }
1449   return ret;
1450 }
1451
1452 gboolean
1453 register_splitmuxsink (GstPlugin * plugin)
1454 {
1455   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1456       "Split File Muxing Sink");
1457
1458   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1459       GST_TYPE_SPLITMUX_SINK);
1460 }