splitmuxsink: initialize mux_start_time properly
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * is required, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * The splitting process is driven by the video stream contents, and
37  * the video stream must contain closed GOPs for the output file parts
38  * to be played individually correctly.
39  *
40  * <refsect2>
41  * <title>Example pipelines</title>
42  * |[
43  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
44  * ]|
45  * Records a video stream captured from a v4l2 device and muxes it into
46  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
47  * and 1MB maximum size.
48  * </refsect2>
49  */
50
51 #ifdef HAVE_CONFIG_H
52 #include "config.h"
53 #endif
54
55 #include <string.h>
56 #include "gstsplitmuxsink.h"
57
58 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
59 #define GST_CAT_DEFAULT splitmux_debug
60
61 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
62 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
63 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
64 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
65
66 enum
67 {
68   PROP_0,
69   PROP_LOCATION,
70   PROP_MAX_SIZE_TIME,
71   PROP_MAX_SIZE_BYTES,
72   PROP_MUXER_OVERHEAD,
73   PROP_MUXER,
74   PROP_SINK
75 };
76
77 #define DEFAULT_MAX_SIZE_TIME       0
78 #define DEFAULT_MAX_SIZE_BYTES      0
79 #define DEFAULT_MUXER_OVERHEAD      0.02
80 #define DEFAULT_MUXER "mp4mux"
81 #define DEFAULT_SINK "filesink"
82
83 enum
84 {
85   SIGNAL_FORMAT_LOCATION,
86   SIGNAL_LAST
87 };
88
89 static guint signals[SIGNAL_LAST];
90
91 static GstStaticPadTemplate video_sink_template =
92 GST_STATIC_PAD_TEMPLATE ("video",
93     GST_PAD_SINK,
94     GST_PAD_REQUEST,
95     GST_STATIC_CAPS_ANY);
96 static GstStaticPadTemplate audio_sink_template =
97 GST_STATIC_PAD_TEMPLATE ("audio_%u",
98     GST_PAD_SINK,
99     GST_PAD_REQUEST,
100     GST_STATIC_CAPS_ANY);
101 static GstStaticPadTemplate subtitle_sink_template =
102 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
103     GST_PAD_SINK,
104     GST_PAD_REQUEST,
105     GST_STATIC_CAPS_ANY);
106
107 static GQuark PAD_CONTEXT;
108
109 static void
110 _do_init (void)
111 {
112   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
113 }
114
115 #define gst_splitmux_sink_parent_class parent_class
116 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
117     _do_init ());
118
119 static gboolean create_elements (GstSplitMuxSink * splitmux);
120 static gboolean create_sink (GstSplitMuxSink * splitmux);
121 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
122     const GValue * value, GParamSpec * pspec);
123 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
124     GValue * value, GParamSpec * pspec);
125 static void gst_splitmux_sink_dispose (GObject * object);
126 static void gst_splitmux_sink_finalize (GObject * object);
127
128 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
129     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
130 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
131
132 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
133     element, GstStateChange transition);
134
135 static void bus_handler (GstBin * bin, GstMessage * msg);
136 static void set_next_filename (GstSplitMuxSink * splitmux);
137 static void start_next_fragment (GstSplitMuxSink * splitmux);
138 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
139 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
140
141 static MqStreamBuf *
142 mq_stream_buf_new (void)
143 {
144   return g_slice_new0 (MqStreamBuf);
145 }
146
147 static void
148 mq_stream_buf_free (MqStreamBuf * data)
149 {
150   g_slice_free (MqStreamBuf, data);
151 }
152
153 static void
154 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
155 {
156   GObjectClass *gobject_class = (GObjectClass *) klass;
157   GstElementClass *gstelement_class = (GstElementClass *) klass;
158   GstBinClass *gstbin_class = (GstBinClass *) klass;
159
160   gobject_class->set_property = gst_splitmux_sink_set_property;
161   gobject_class->get_property = gst_splitmux_sink_get_property;
162   gobject_class->dispose = gst_splitmux_sink_dispose;
163   gobject_class->finalize = gst_splitmux_sink_finalize;
164
165   gst_element_class_set_static_metadata (gstelement_class,
166       "Split Muxing Bin", "Generic/Bin/Muxer",
167       "Convenience bin that muxes incoming streams into multiple time/size limited files",
168       "Jan Schmidt <jan@centricular.com>");
169
170   gst_element_class_add_pad_template (gstelement_class,
171       gst_static_pad_template_get (&video_sink_template));
172   gst_element_class_add_pad_template (gstelement_class,
173       gst_static_pad_template_get (&audio_sink_template));
174   gst_element_class_add_pad_template (gstelement_class,
175       gst_static_pad_template_get (&subtitle_sink_template));
176
177   gstelement_class->change_state =
178       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
179   gstelement_class->request_new_pad =
180       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
181   gstelement_class->release_pad =
182       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
183
184   gstbin_class->handle_message = bus_handler;
185
186   g_object_class_install_property (gobject_class, PROP_LOCATION,
187       g_param_spec_string ("location", "File Output Pattern",
188           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
189           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
190   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
191       g_param_spec_double ("mux-overhead", "Muxing Overhead",
192           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
193           DEFAULT_MUXER_OVERHEAD,
194           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
195
196   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
197       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
198           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
199           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
200   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
201       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
202           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
203           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
204
205   g_object_class_install_property (gobject_class, PROP_MUXER,
206       g_param_spec_object ("muxer", "Muxer",
207           "The muxer element to use (NULL = default mp4mux)",
208           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209   g_object_class_install_property (gobject_class, PROP_SINK,
210       g_param_spec_object ("sink", "Sink",
211           "The sink element (or element chain) to use (NULL = default filesink)",
212           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
213
214   /**
215    * GstSplitMuxSink::format-location:
216    * @splitmux: the #GstSplitMuxSink
217    * @fragment_id: the sequence number of the file to be created
218    *
219    * Returns: the location to be used for the next output file
220    */
221   signals[SIGNAL_FORMAT_LOCATION] =
222       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
223       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
224 }
225
226 static void
227 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
228 {
229   g_mutex_init (&splitmux->lock);
230   g_cond_init (&splitmux->data_cond);
231
232   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
233   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
234   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
235
236   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
237 }
238
239 static void
240 gst_splitmux_reset (GstSplitMuxSink * splitmux)
241 {
242   if (splitmux->mq)
243     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
244   if (splitmux->muxer)
245     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
246   if (splitmux->active_sink)
247     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
248
249   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
250       NULL;
251 }
252
253 static void
254 gst_splitmux_sink_dispose (GObject * object)
255 {
256   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
257
258   G_OBJECT_CLASS (parent_class)->dispose (object);
259
260   /* Calling parent dispose invalidates all child pointers */
261   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
262       NULL;
263 }
264
265 static void
266 gst_splitmux_sink_finalize (GObject * object)
267 {
268   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
269   g_cond_clear (&splitmux->data_cond);
270   g_mutex_clear (&splitmux->lock);
271   if (splitmux->provided_sink)
272     gst_object_unref (splitmux->provided_sink);
273   if (splitmux->provided_muxer)
274     gst_object_unref (splitmux->provided_muxer);
275
276   g_free (splitmux->location);
277
278   /* Make sure to free any un-released contexts */
279   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
280   g_list_free (splitmux->contexts);
281
282   G_OBJECT_CLASS (parent_class)->finalize (object);
283 }
284
285 static void
286 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
287     const GValue * value, GParamSpec * pspec)
288 {
289   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
290
291   switch (prop_id) {
292     case PROP_LOCATION:{
293       GST_OBJECT_LOCK (splitmux);
294       g_free (splitmux->location);
295       splitmux->location = g_value_dup_string (value);
296       GST_OBJECT_UNLOCK (splitmux);
297       break;
298     }
299     case PROP_MAX_SIZE_BYTES:
300       GST_OBJECT_LOCK (splitmux);
301       splitmux->threshold_bytes = g_value_get_uint64 (value);
302       GST_OBJECT_UNLOCK (splitmux);
303       break;
304     case PROP_MAX_SIZE_TIME:
305       GST_OBJECT_LOCK (splitmux);
306       splitmux->threshold_time = g_value_get_uint64 (value);
307       GST_OBJECT_UNLOCK (splitmux);
308       break;
309     case PROP_MUXER_OVERHEAD:
310       GST_OBJECT_LOCK (splitmux);
311       splitmux->mux_overhead = g_value_get_double (value);
312       GST_OBJECT_UNLOCK (splitmux);
313       break;
314     case PROP_SINK:
315       GST_OBJECT_LOCK (splitmux);
316       if (splitmux->provided_sink)
317         gst_object_unref (splitmux->provided_sink);
318       splitmux->provided_sink = g_value_dup_object (value);
319       GST_OBJECT_UNLOCK (splitmux);
320       break;
321     case PROP_MUXER:
322       GST_OBJECT_LOCK (splitmux);
323       if (splitmux->provided_muxer)
324         gst_object_unref (splitmux->provided_muxer);
325       splitmux->provided_muxer = g_value_dup_object (value);
326       GST_OBJECT_UNLOCK (splitmux);
327       break;
328     default:
329       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
330       break;
331   }
332 }
333
334 static void
335 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
336     GValue * value, GParamSpec * pspec)
337 {
338   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
339
340   switch (prop_id) {
341     case PROP_LOCATION:
342       GST_OBJECT_LOCK (splitmux);
343       g_value_set_string (value, splitmux->location);
344       GST_OBJECT_UNLOCK (splitmux);
345       break;
346     case PROP_MAX_SIZE_BYTES:
347       GST_OBJECT_LOCK (splitmux);
348       g_value_set_uint64 (value, splitmux->threshold_bytes);
349       GST_OBJECT_UNLOCK (splitmux);
350       break;
351     case PROP_MAX_SIZE_TIME:
352       GST_OBJECT_LOCK (splitmux);
353       g_value_set_uint64 (value, splitmux->threshold_time);
354       GST_OBJECT_UNLOCK (splitmux);
355       break;
356     case PROP_MUXER_OVERHEAD:
357       GST_OBJECT_LOCK (splitmux);
358       g_value_set_double (value, splitmux->mux_overhead);
359       GST_OBJECT_UNLOCK (splitmux);
360       break;
361     case PROP_SINK:
362       GST_OBJECT_LOCK (splitmux);
363       g_value_set_object (value, splitmux->provided_sink);
364       GST_OBJECT_UNLOCK (splitmux);
365       break;
366     case PROP_MUXER:
367       GST_OBJECT_LOCK (splitmux);
368       g_value_set_object (value, splitmux->provided_muxer);
369       GST_OBJECT_UNLOCK (splitmux);
370       break;
371     default:
372       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
373       break;
374   }
375 }
376
377 static GstPad *
378 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
379 {
380   gchar *tmp, *sinkname, *srcname;
381   GstPad *mq_src;
382
383   sinkname = gst_pad_get_name (sink_pad);
384   tmp = sinkname + 5;
385   srcname = g_strdup_printf ("src_%s", tmp);
386
387   mq_src = gst_element_get_static_pad (mq, srcname);
388
389   g_free (sinkname);
390   g_free (srcname);
391
392   return mq_src;
393 }
394
395 static gboolean
396 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
397     GstPad ** src_pad)
398 {
399   GstPad *mq_sink;
400   GstPad *mq_src;
401
402   /* Request a pad from multiqueue, then connect this one, then
403    * discover the corresponding output pad and return both */
404   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
405   if (mq_sink == NULL)
406     return FALSE;
407
408   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
409   if (mq_src == NULL)
410     goto fail;
411
412   *sink_pad = mq_sink;
413   *src_pad = mq_src;
414
415   return TRUE;
416
417 fail:
418   gst_element_release_request_pad (splitmux->mq, mq_sink);
419   return FALSE;
420 }
421
422 static MqStreamCtx *
423 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
424 {
425   MqStreamCtx *ctx;
426
427   ctx = g_new0 (MqStreamCtx, 1);
428   g_atomic_int_set (&ctx->refcount, 1);
429   ctx->splitmux = splitmux;
430   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
431   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
432   ctx->in_running_time = ctx->out_running_time = 0;
433   g_queue_init (&ctx->queued_bufs);
434   return ctx;
435 }
436
437 static void
438 mq_stream_ctx_free (MqStreamCtx * ctx)
439 {
440   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
441   g_queue_clear (&ctx->queued_bufs);
442   g_free (ctx);
443 }
444
445 static void
446 mq_stream_ctx_unref (MqStreamCtx * ctx)
447 {
448   if (g_atomic_int_dec_and_test (&ctx->refcount))
449     mq_stream_ctx_free (ctx);
450 }
451
452 static void
453 mq_stream_ctx_ref (MqStreamCtx * ctx)
454 {
455   g_atomic_int_inc (&ctx->refcount);
456 }
457
458 static void
459 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
460 {
461   ctx->sink_pad_block_id = 0;
462   mq_stream_ctx_unref (ctx);
463 }
464
465 static void
466 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
467 {
468   ctx->src_pad_block_id = 0;
469   mq_stream_ctx_unref (ctx);
470 }
471
472 /* Called with lock held, drops the lock to send EOS to the
473  * pad
474  */
475 static void
476 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
477 {
478   GstEvent *eos;
479   GstPad *pad;
480
481   eos = gst_event_new_eos ();
482   pad = gst_pad_get_peer (ctx->srcpad);
483
484   ctx->out_eos = TRUE;
485
486   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
487   GST_SPLITMUX_UNLOCK (splitmux);
488   gst_pad_send_event (pad, eos);
489   GST_SPLITMUX_LOCK (splitmux);
490
491   gst_object_unref (pad);
492 }
493
494 /* Called with splitmux lock held to check if this output
495  * context needs to sleep to wait for the release of the
496  * next GOP, or to send EOS to close out the current file
497  */
498 static void
499 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
500 {
501   do {
502
503     GST_LOG_OBJECT (ctx->srcpad,
504         "Checking running time %" GST_TIME_FORMAT " against max %"
505         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
506         GST_TIME_ARGS (splitmux->max_out_running_time));
507
508     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
509         ctx->out_running_time < splitmux->max_out_running_time) {
510       splitmux->have_muxed_something = TRUE;
511       return;
512     }
513
514     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
515       return;
516
517     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
518       if (ctx->out_eos == FALSE) {
519         send_eos (splitmux, ctx);
520         continue;
521       }
522     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
523       start_next_fragment (splitmux);
524       continue;
525     }
526
527     GST_INFO_OBJECT (ctx->srcpad,
528         "Sleeping for running time %"
529         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
530         GST_TIME_ARGS (ctx->out_running_time),
531         GST_TIME_ARGS (splitmux->max_out_running_time));
532     ctx->out_blocked = TRUE;
533     /* Expand the mq if needed before sleeping */
534     check_queue_length (splitmux, ctx);
535     GST_SPLITMUX_WAIT (splitmux);
536     ctx->out_blocked = FALSE;
537     GST_INFO_OBJECT (ctx->srcpad,
538         "Woken for new max running time %" GST_TIME_FORMAT,
539         GST_TIME_ARGS (splitmux->max_out_running_time));
540   } while (1);
541 }
542
543 static GstPadProbeReturn
544 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
545 {
546   GstSplitMuxSink *splitmux = ctx->splitmux;
547   MqStreamBuf *buf_info = NULL;
548
549   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
550
551   /* FIXME: Handle buffer lists, until then make it clear they won't work */
552   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
553     g_warning ("Buffer list handling not implemented");
554     return GST_PAD_PROBE_DROP;
555   }
556   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
557     GstEvent *event = gst_pad_probe_info_get_event (info);
558
559     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
560
561     switch (GST_EVENT_TYPE (event)) {
562       case GST_EVENT_SEGMENT:
563         gst_event_copy_segment (event, &ctx->out_segment);
564         break;
565       case GST_EVENT_FLUSH_STOP:
566         GST_SPLITMUX_LOCK (splitmux);
567         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
568         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
569         g_queue_clear (&ctx->queued_bufs);
570         ctx->flushing = FALSE;
571         GST_SPLITMUX_UNLOCK (splitmux);
572         break;
573       case GST_EVENT_FLUSH_START:
574         GST_SPLITMUX_LOCK (splitmux);
575         GST_LOG_OBJECT (pad, "Flush start");
576         ctx->flushing = TRUE;
577         GST_SPLITMUX_BROADCAST (splitmux);
578         GST_SPLITMUX_UNLOCK (splitmux);
579         break;
580       case GST_EVENT_EOS:
581         GST_SPLITMUX_LOCK (splitmux);
582         if (splitmux->state == SPLITMUX_STATE_STOPPED)
583           goto beach;
584         ctx->out_eos = TRUE;
585         GST_SPLITMUX_UNLOCK (splitmux);
586         break;
587       case GST_EVENT_GAP:{
588         GstClockTime gap_ts;
589
590         gst_event_parse_gap (event, &gap_ts, NULL);
591         if (gap_ts == GST_CLOCK_TIME_NONE)
592           break;
593
594         GST_SPLITMUX_LOCK (splitmux);
595
596         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
597             GST_FORMAT_TIME, gap_ts);
598
599         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
600             GST_TIME_ARGS (gap_ts));
601
602         if (splitmux->state == SPLITMUX_STATE_STOPPED)
603           goto beach;
604         ctx->out_running_time = gap_ts;
605         complete_or_wait_on_out (splitmux, ctx);
606         GST_SPLITMUX_UNLOCK (splitmux);
607         break;
608       }
609       default:
610         break;
611     }
612     return GST_PAD_PROBE_PASS;
613   }
614
615   /* Allow everything through until the configured next stopping point */
616   GST_SPLITMUX_LOCK (splitmux);
617
618   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
619   if (buf_info == NULL)
620     /* Can only happen due to a poorly timed flush */
621     goto beach;
622
623   /* If we have popped a keyframe, decrement the queued_gop count */
624   if (buf_info->keyframe && splitmux->queued_gops > 0)
625     splitmux->queued_gops--;
626
627   ctx->out_running_time = buf_info->run_ts;
628
629   GST_LOG_OBJECT (splitmux,
630       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
631       " size %" G_GSIZE_FORMAT,
632       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
633
634   complete_or_wait_on_out (splitmux, ctx);
635
636   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
637       splitmux->muxed_out_time < buf_info->run_ts)
638     splitmux->muxed_out_time = buf_info->run_ts;
639
640   splitmux->muxed_out_bytes += buf_info->buf_size;
641
642 #ifndef GST_DISABLE_GST_DEBUG
643   {
644     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
645     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
646         " run ts %" GST_TIME_FORMAT, buf,
647         GST_TIME_ARGS (ctx->out_running_time));
648   }
649 #endif
650
651   GST_SPLITMUX_UNLOCK (splitmux);
652
653   mq_stream_buf_free (buf_info);
654
655   return GST_PAD_PROBE_PASS;
656
657 beach:
658   GST_SPLITMUX_UNLOCK (splitmux);
659   return GST_PAD_PROBE_DROP;
660 }
661
662 static gboolean
663 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
664 {
665   return gst_pad_send_event (peer, gst_event_ref (*event));
666 }
667
668 static void
669 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
670 {
671   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
672
673   gst_pad_sticky_events_foreach (ctx->srcpad,
674       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
675
676   /* Clear EOS flag */
677   ctx->out_eos = FALSE;
678
679   gst_object_unref (peer);
680 }
681
682 /* Called with lock held when a fragment
683  * reaches EOS and it is time to restart
684  * a new fragment
685  */
686 static void
687 start_next_fragment (GstSplitMuxSink * splitmux)
688 {
689   /* 1 change to new file */
690   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
691   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
692
693   set_next_filename (splitmux);
694
695   gst_element_sync_state_with_parent (splitmux->active_sink);
696   gst_element_sync_state_with_parent (splitmux->muxer);
697
698   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
699
700   /* Switch state and go back to processing */
701   splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
702
703   if (!splitmux->video_ctx->in_eos) {
704     splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
705   } else {
706     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
707     splitmux->have_muxed_something = FALSE;
708   }
709   splitmux->have_muxed_something =
710       (splitmux->video_ctx->in_running_time > splitmux->muxed_out_time);
711
712   /* Store the overflow parameters as the basis for the next fragment */
713   splitmux->mux_start_time = splitmux->muxed_out_time;
714   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
715
716   GST_DEBUG_OBJECT (splitmux,
717       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
718       GST_TIME_ARGS (splitmux->max_out_running_time));
719
720   GST_SPLITMUX_BROADCAST (splitmux);
721 }
722
723 static void
724 bus_handler (GstBin * bin, GstMessage * message)
725 {
726   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
727
728   switch (GST_MESSAGE_TYPE (message)) {
729     case GST_MESSAGE_EOS:
730       /* If the state is draining out the current file, drop this EOS */
731       GST_SPLITMUX_LOCK (splitmux);
732       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
733           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
734         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
735         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
736         GST_SPLITMUX_BROADCAST (splitmux);
737
738         gst_message_unref (message);
739         GST_SPLITMUX_UNLOCK (splitmux);
740         return;
741       }
742       GST_SPLITMUX_UNLOCK (splitmux);
743       break;
744     default:
745       break;
746   }
747
748   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
749 }
750
751 /* Called with splitmux lock held */
752 /* Called when entering ProcessingCompleteGop state
753  * Assess if mq contents overflowed the current file
754  *   -> If yes, need to switch to new file
755  *   -> if no, set max_out_running_time to let this GOP in and
756  *      go to COLLECTING_GOP_START state
757  */
758 static void
759 handle_gathered_gop (GstSplitMuxSink * splitmux)
760 {
761   GList *cur;
762   gsize queued_bytes = 0;
763   GstClockTime queued_time = 0;
764
765   /* Assess if the multiqueue contents overflowed the current file */
766   for (cur = g_list_first (splitmux->contexts);
767       cur != NULL; cur = g_list_next (cur)) {
768     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
769     if (tmpctx->in_running_time > queued_time)
770       queued_time = tmpctx->in_running_time;
771     queued_bytes += tmpctx->in_bytes;
772   }
773
774   g_assert (queued_bytes >= splitmux->mux_start_bytes);
775   g_assert (queued_time >= splitmux->mux_start_time);
776
777   queued_bytes -= splitmux->mux_start_bytes;
778   queued_time -= splitmux->mux_start_time;
779
780   /* Expand queued bytes estimate by muxer overhead */
781   queued_bytes += (queued_bytes * splitmux->mux_overhead);
782
783   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
784       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
785
786   /* Check for overrun - have we output at least one byte and overrun
787    * either threshold? */
788   if (splitmux->have_muxed_something &&
789       ((splitmux->threshold_bytes > 0 &&
790               queued_bytes >= splitmux->threshold_bytes) ||
791           (splitmux->threshold_time > 0 &&
792               queued_time >= splitmux->threshold_time))) {
793
794     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
795
796     GST_INFO_OBJECT (splitmux,
797         "mq overflowed since last, draining out. max out TS is %"
798         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
799     GST_SPLITMUX_BROADCAST (splitmux);
800
801   } else {
802     /* No overflow */
803     GST_LOG_OBJECT (splitmux,
804         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
805         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
806         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
807         queued_bytes, GST_TIME_ARGS (queued_time));
808
809     /* Wake everyone up to push this one GOP, then sleep */
810     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
811     splitmux->have_muxed_something = TRUE;
812
813     if (!splitmux->video_ctx->in_eos)
814       splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
815     else
816       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
817
818     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
819         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
820     GST_SPLITMUX_BROADCAST (splitmux);
821   }
822
823 }
824
825 /* Called with splitmux lock held */
826 /* Called from each input pad when it is has all the pieces
827  * for a GOP or EOS, starting with the video pad which has set the
828  * splitmux->max_in_running_time
829  */
830 static void
831 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
832 {
833   GList *cur;
834   gboolean ready = TRUE;
835   GstClockTime current_max_in_running_time;
836
837   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
838     /* Iterate each pad, and check that the input running time is at least
839      * up to the video runnning time, and if so handle the collected GOP */
840     GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
841     for (cur = g_list_first (splitmux->contexts);
842         cur != NULL; cur = g_list_next (cur)) {
843       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
844
845       GST_LOG_OBJECT (splitmux,
846           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
847           " EOS %d", tmpctx, tmpctx->srcpad,
848           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
849
850       if (tmpctx->in_running_time < splitmux->max_in_running_time &&
851           !tmpctx->in_eos) {
852         GST_LOG_OBJECT (splitmux,
853             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
854             tmpctx, tmpctx->srcpad);
855         ready = FALSE;
856         break;
857       }
858     }
859     if (ready) {
860       GST_DEBUG_OBJECT (splitmux,
861           "Collected GOP is complete. Processing (ctx %p)", ctx);
862       /* All pads have a complete GOP, release it into the multiqueue */
863       handle_gathered_gop (splitmux);
864     }
865   }
866
867   /* Some pad is not yet ready, or GOP is being pushed
868    * either way, sleep and wait to get woken */
869   current_max_in_running_time = splitmux->max_in_running_time;
870   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
871           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
872       !ctx->flushing &&
873       (current_max_in_running_time == splitmux->max_in_running_time)) {
874
875     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
876         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
877         "GOP complete" : "EOF draining", ctx);
878     GST_SPLITMUX_WAIT (splitmux);
879
880     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
881   }
882 }
883
884 static void
885 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
886 {
887   GList *cur;
888   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
889
890   GST_DEBUG_OBJECT (ctx->sinkpad,
891       "Checking queue length len %u cur_max %u queued gops %u",
892       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
893
894   if (cur_len >= splitmux->mq_max_buffers) {
895     gboolean allow_grow = FALSE;
896
897     /* If collecting a GOP and this pad might block,
898      * and there isn't already a pending GOP in the queue
899      * then grow
900      */
901     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
902         ctx->in_running_time < splitmux->max_in_running_time &&
903         splitmux->queued_gops <= 1) {
904       allow_grow = TRUE;
905     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
906         ctx->is_video && splitmux->queued_gops <= 1) {
907       allow_grow = TRUE;
908     }
909
910     if (!allow_grow) {
911       for (cur = g_list_first (splitmux->contexts);
912           cur != NULL; cur = g_list_next (cur)) {
913         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
914         GST_DEBUG_OBJECT (tmpctx->sinkpad,
915             " len %u out_blocked %d",
916             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
917         /* If another stream is starving, grow */
918         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
919           allow_grow = TRUE;
920         }
921       }
922     }
923
924     if (allow_grow) {
925       splitmux->mq_max_buffers = cur_len + 1;
926
927       GST_INFO_OBJECT (splitmux,
928           "Multiqueue overrun - enlarging to %u buffers ctx %p",
929           splitmux->mq_max_buffers, ctx);
930
931       g_object_set (splitmux->mq, "max-size-buffers",
932           splitmux->mq_max_buffers, NULL);
933     }
934   }
935 }
936
937 static GstPadProbeReturn
938 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
939 {
940   GstSplitMuxSink *splitmux = ctx->splitmux;
941   GstBuffer *buf;
942   MqStreamBuf *buf_info = NULL;
943   GstClockTime ts;
944   gboolean loop_again;
945   gboolean keyframe = FALSE;
946
947   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
948
949   /* FIXME: Handle buffer lists, until then make it clear they won't work */
950   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
951     g_warning ("Buffer list handling not implemented");
952     return GST_PAD_PROBE_DROP;
953   }
954   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
955     GstEvent *event = gst_pad_probe_info_get_event (info);
956     switch (GST_EVENT_TYPE (event)) {
957       case GST_EVENT_SEGMENT:
958         gst_event_copy_segment (event, &ctx->in_segment);
959         break;
960       case GST_EVENT_FLUSH_STOP:
961         GST_SPLITMUX_LOCK (splitmux);
962         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
963         ctx->in_eos = FALSE;
964         ctx->in_bytes = 0;
965         ctx->in_running_time = 0;
966         GST_SPLITMUX_UNLOCK (splitmux);
967         break;
968       case GST_EVENT_EOS:
969         GST_SPLITMUX_LOCK (splitmux);
970         ctx->in_eos = TRUE;
971
972         if (splitmux->state == SPLITMUX_STATE_STOPPED)
973           goto beach;
974
975         if (ctx->is_video) {
976           GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
977           /* Act as if this is a new keyframe with infinite timestamp */
978           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
979           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
980           /* Wake up other input pads to collect this GOP */
981           GST_SPLITMUX_BROADCAST (splitmux);
982           check_completed_gop (splitmux, ctx);
983         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
984           /* If we are waiting for a GOP to be completed (ie, for aux
985            * pads to catch up), then this pad is complete, so check
986            * if the whole GOP is.
987            */
988           check_completed_gop (splitmux, ctx);
989         }
990         GST_SPLITMUX_UNLOCK (splitmux);
991         break;
992       default:
993         break;
994     }
995     return GST_PAD_PROBE_PASS;
996   }
997
998   buf = gst_pad_probe_info_get_buffer (info);
999   ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
1000       GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
1001   buf_info = mq_stream_buf_new ();
1002
1003   if (GST_BUFFER_PTS_IS_VALID (buf))
1004     ts = GST_BUFFER_PTS (buf);
1005   else
1006     ts = GST_BUFFER_DTS (buf);
1007
1008   GST_SPLITMUX_LOCK (splitmux);
1009
1010   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1011     goto beach;
1012
1013   /* If this buffer has a timestamp, advance the input timestamp of the
1014    * stream */
1015   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1016     GstClockTime running_time =
1017         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1018         GST_BUFFER_TIMESTAMP (buf));
1019
1020     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1021         (ctx->in_running_time == GST_CLOCK_TIME_NONE
1022             || running_time > ctx->in_running_time))
1023       ctx->in_running_time = running_time;
1024   }
1025
1026   /* Try to make sure we have a valid running time */
1027   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1028     ctx->in_running_time =
1029         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1030         ctx->in_segment.start);
1031   }
1032
1033   buf_info->run_ts = ctx->in_running_time;
1034   buf_info->buf_size = gst_buffer_get_size (buf);
1035
1036   /* Update total input byte counter for overflow detect */
1037   ctx->in_bytes += buf_info->buf_size;
1038
1039   /* initialize mux_start_time */
1040   if (ctx->is_reference && splitmux->mux_start_time == 0)
1041     splitmux->mux_start_time = buf_info->run_ts;
1042
1043   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1044       " total in_bytes %" G_GSIZE_FORMAT,
1045       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1046
1047   loop_again = TRUE;
1048   do {
1049     if (ctx->flushing)
1050       break;
1051
1052     switch (splitmux->state) {
1053       case SPLITMUX_STATE_COLLECTING_GOP_START:
1054         if (ctx->is_video) {
1055           /* If a keyframe, we have a complete GOP */
1056           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1057               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1058               splitmux->max_in_running_time >= ctx->in_running_time) {
1059             /* Pass this buffer through */
1060             loop_again = FALSE;
1061             break;
1062           }
1063           GST_INFO_OBJECT (pad,
1064               "Have keyframe with running time %" GST_TIME_FORMAT,
1065               GST_TIME_ARGS (ctx->in_running_time));
1066           keyframe = TRUE;
1067           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1068           splitmux->max_in_running_time = ctx->in_running_time;
1069           /* Wake up other input pads to collect this GOP */
1070           GST_SPLITMUX_BROADCAST (splitmux);
1071           check_completed_gop (splitmux, ctx);
1072         } else {
1073           /* We're still waiting for a keyframe on the video pad, sleep */
1074           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1075           GST_SPLITMUX_WAIT (splitmux);
1076           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1077               splitmux->state);
1078         }
1079         break;
1080       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1081         /* After a GOP start is found, this buffer might complete the GOP */
1082         /* If we overran the target timestamp, it might be time to process
1083          * the GOP, otherwise bail out for more data
1084          */
1085         GST_LOG_OBJECT (pad,
1086             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1087             GST_TIME_ARGS (ctx->in_running_time),
1088             GST_TIME_ARGS (splitmux->max_in_running_time));
1089
1090         if (ctx->in_running_time < splitmux->max_in_running_time) {
1091           loop_again = FALSE;
1092           break;
1093         }
1094
1095         GST_LOG_OBJECT (pad,
1096             "Collected last packet of GOP. Checking other pads");
1097         check_completed_gop (splitmux, ctx);
1098         break;
1099       case SPLITMUX_STATE_ENDING_FILE:
1100       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1101         /* A fragment is ending, wait until that's done before continuing */
1102         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1103         GST_SPLITMUX_WAIT (splitmux);
1104         GST_DEBUG_OBJECT (pad,
1105             "Done sleeping for fragment restart state now %d", splitmux->state);
1106         break;
1107       default:
1108         loop_again = FALSE;
1109         break;
1110     }
1111   } while (loop_again);
1112
1113   if (keyframe) {
1114     splitmux->queued_gops++;
1115     buf_info->keyframe = TRUE;
1116   }
1117
1118   /* Now add this buffer to the queue just before returning */
1119   g_queue_push_head (&ctx->queued_bufs, buf_info);
1120
1121   /* Check the buffer will fit in the mq */
1122   check_queue_length (splitmux, ctx);
1123
1124   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1125       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1126
1127   GST_SPLITMUX_UNLOCK (splitmux);
1128   return GST_PAD_PROBE_PASS;
1129
1130 beach:
1131   GST_SPLITMUX_UNLOCK (splitmux);
1132   if (buf_info)
1133     mq_stream_buf_free (buf_info);
1134   return GST_PAD_PROBE_PASS;
1135 }
1136
1137 static GstPad *
1138 gst_splitmux_sink_request_new_pad (GstElement * element,
1139     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1140 {
1141   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1142   GstPadTemplate *mux_template = NULL;
1143   GstPad *res = NULL;
1144   GstPad *mq_sink, *mq_src;
1145   gchar *gname;
1146   gboolean is_video = FALSE;
1147   MqStreamCtx *ctx;
1148
1149   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1150
1151   GST_SPLITMUX_LOCK (splitmux);
1152   if (!create_elements (splitmux))
1153     goto fail;
1154
1155   if (templ->name_template) {
1156     if (g_str_equal (templ->name_template, "video")) {
1157       /* FIXME: Look for a pad template with matching caps, rather than by name */
1158       mux_template =
1159           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1160           (splitmux->muxer), "video_%u");
1161       is_video = TRUE;
1162       name = NULL;
1163     } else {
1164       mux_template =
1165           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1166           (splitmux->muxer), templ->name_template);
1167     }
1168     if (mux_template == NULL) {
1169       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1170       mux_template =
1171           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1172           (splitmux->muxer), "sink_%d");
1173     }
1174   }
1175
1176   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1177   if (res == NULL)
1178     goto fail;
1179
1180   if (is_video)
1181     gname = g_strdup ("video");
1182   else if (name == NULL)
1183     gname = gst_pad_get_name (res);
1184   else
1185     gname = g_strdup (name);
1186
1187   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1188     gst_element_release_request_pad (splitmux->muxer, res);
1189     gst_object_unref (GST_OBJECT (res));
1190     goto fail;
1191   }
1192
1193   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1194     gst_element_release_request_pad (splitmux->muxer, res);
1195     gst_object_unref (GST_OBJECT (res));
1196     gst_element_release_request_pad (splitmux->mq, mq_sink);
1197     gst_object_unref (GST_OBJECT (mq_sink));
1198     goto fail;
1199   }
1200
1201   gst_object_unref (GST_OBJECT (res));
1202
1203   ctx = mq_stream_ctx_new (splitmux);
1204   ctx->is_video = is_video;
1205   ctx->srcpad = mq_src;
1206   ctx->sinkpad = mq_sink;
1207
1208   mq_stream_ctx_ref (ctx);
1209   ctx->src_pad_block_id =
1210       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1211       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1212       _pad_block_destroy_src_notify);
1213   if (is_video)
1214     splitmux->video_ctx = ctx;
1215
1216   res = gst_ghost_pad_new (gname, mq_sink);
1217   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1218
1219   mq_stream_ctx_ref (ctx);
1220   ctx->sink_pad_block_id =
1221       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1222       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1223       _pad_block_destroy_sink_notify);
1224
1225   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1226       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1227
1228   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1229
1230   g_free (gname);
1231
1232   gst_object_unref (mq_sink);
1233   gst_object_unref (mq_src);
1234
1235   gst_pad_set_active (res, TRUE);
1236   gst_element_add_pad (element, res);
1237   GST_SPLITMUX_UNLOCK (splitmux);
1238
1239   return res;
1240 fail:
1241   GST_SPLITMUX_UNLOCK (splitmux);
1242   return NULL;
1243 }
1244
1245 static void
1246 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1247 {
1248   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1249   GstPad *mqsink, *mqsrc, *muxpad;
1250   MqStreamCtx *ctx =
1251       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1252
1253   GST_SPLITMUX_LOCK (splitmux);
1254
1255   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1256     goto fail;                  /* Elements don't exist yet - nothing to release */
1257
1258   GST_INFO_OBJECT (pad, "releasing request pad");
1259
1260   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1261   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1262   muxpad = gst_pad_get_peer (mqsrc);
1263
1264   /* Remove the context from our consideration */
1265   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1266
1267   if (ctx->sink_pad_block_id)
1268     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1269
1270   if (ctx->src_pad_block_id)
1271     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1272
1273   /* Can release the context now */
1274   mq_stream_ctx_unref (ctx);
1275
1276   /* Release and free the mq input */
1277   gst_element_release_request_pad (splitmux->mq, mqsink);
1278
1279   /* Release and free the muxer input */
1280   gst_element_release_request_pad (splitmux->muxer, muxpad);
1281
1282   gst_object_unref (mqsink);
1283   gst_object_unref (mqsrc);
1284   gst_object_unref (muxpad);
1285
1286   gst_element_remove_pad (element, pad);
1287
1288 fail:
1289   GST_SPLITMUX_UNLOCK (splitmux);
1290 }
1291
1292 static GstElement *
1293 create_element (GstSplitMuxSink * splitmux,
1294     const gchar * factory, const gchar * name)
1295 {
1296   GstElement *ret = gst_element_factory_make (factory, name);
1297   if (ret == NULL) {
1298     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1299     return NULL;
1300   }
1301
1302   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1303     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1304     gst_object_unref (ret);
1305     return NULL;
1306   }
1307
1308   return ret;
1309 }
1310
1311 static gboolean
1312 create_elements (GstSplitMuxSink * splitmux)
1313 {
1314   /* Create internal elements */
1315   if (splitmux->mq == NULL) {
1316     if ((splitmux->mq =
1317             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1318       goto fail;
1319
1320     splitmux->mq_max_buffers = 5;
1321     /* No bytes or time limit, we limit buffers manually */
1322     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1323         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1324   }
1325
1326   if (splitmux->muxer == NULL) {
1327     GstElement *provided_muxer = NULL;
1328
1329     GST_OBJECT_LOCK (splitmux);
1330     if (splitmux->provided_muxer != NULL)
1331       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1332     GST_OBJECT_UNLOCK (splitmux);
1333
1334     if (provided_muxer == NULL) {
1335       if ((splitmux->muxer =
1336               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1337         goto fail;
1338     } else {
1339       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1340         g_warning ("Could not add muxer element - splitmuxsink will not work");
1341         gst_object_unref (provided_muxer);
1342         goto fail;
1343       }
1344
1345       splitmux->muxer = provided_muxer;
1346       gst_object_unref (provided_muxer);
1347     }
1348   }
1349
1350   return TRUE;
1351 fail:
1352   return FALSE;
1353 }
1354
1355 static GstElement *
1356 find_sink (GstElement * e)
1357 {
1358   GstElement *res = NULL;
1359   GstIterator *iter;
1360   gboolean done = FALSE;
1361   GValue data = { 0, };
1362
1363   if (!GST_IS_BIN (e))
1364     return e;
1365
1366   iter = gst_bin_iterate_sinks (GST_BIN (e));
1367   while (!done) {
1368     switch (gst_iterator_next (iter, &data)) {
1369       case GST_ITERATOR_OK:
1370       {
1371         GstElement *child = g_value_get_object (&data);
1372         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1373                 "location") != NULL) {
1374           res = child;
1375           done = TRUE;
1376         }
1377         g_value_reset (&data);
1378         break;
1379       }
1380       case GST_ITERATOR_RESYNC:
1381         gst_iterator_resync (iter);
1382         break;
1383       case GST_ITERATOR_DONE:
1384         done = TRUE;
1385         break;
1386       case GST_ITERATOR_ERROR:
1387         g_assert_not_reached ();
1388         break;
1389     }
1390   }
1391   g_value_unset (&data);
1392   gst_iterator_free (iter);
1393
1394   return res;
1395 }
1396
1397 static gboolean
1398 create_sink (GstSplitMuxSink * splitmux)
1399 {
1400   GstElement *provided_sink = NULL;
1401
1402   g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
1403
1404   GST_OBJECT_LOCK (splitmux);
1405   if (splitmux->provided_sink != NULL)
1406     provided_sink = gst_object_ref (splitmux->provided_sink);
1407   GST_OBJECT_UNLOCK (splitmux);
1408
1409   if (provided_sink == NULL) {
1410     if ((splitmux->sink =
1411             create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1412       goto fail;
1413     splitmux->active_sink = splitmux->sink;
1414   } else {
1415     if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1416       g_warning ("Could not add sink elements - splitmuxsink will not work");
1417       gst_object_unref (provided_sink);
1418       goto fail;
1419     }
1420
1421     splitmux->active_sink = provided_sink;
1422
1423     /* The bin holds a ref now, we can drop our tmp ref */
1424     gst_object_unref (provided_sink);
1425
1426     /* Find the sink element */
1427     splitmux->sink = find_sink (splitmux->active_sink);
1428     if (splitmux->sink == NULL) {
1429       g_warning
1430           ("Could not locate sink element in provided sink - splitmuxsink will not work");
1431       goto fail;
1432     }
1433   }
1434
1435   if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1436     g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1437     goto fail;
1438   }
1439
1440   return TRUE;
1441 fail:
1442   return FALSE;
1443 }
1444
1445 #ifdef __GNUC__
1446 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1447 #endif
1448 static void
1449 set_next_filename (GstSplitMuxSink * splitmux)
1450 {
1451   gchar *fname = NULL;
1452
1453   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1454       splitmux->fragment_id, &fname);
1455
1456   if (!fname)
1457     fname = splitmux->location ?
1458         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1459
1460   if (fname) {
1461     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1462     g_object_set (splitmux->sink, "location", fname, NULL);
1463     g_free (fname);
1464
1465     splitmux->fragment_id++;
1466   }
1467 }
1468
1469 static GstStateChangeReturn
1470 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1471 {
1472   GstStateChangeReturn ret;
1473   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1474
1475   switch (transition) {
1476     case GST_STATE_CHANGE_NULL_TO_READY:{
1477       GST_SPLITMUX_LOCK (splitmux);
1478       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1479         ret = GST_STATE_CHANGE_FAILURE;
1480         GST_SPLITMUX_UNLOCK (splitmux);
1481         goto beach;
1482       }
1483       GST_SPLITMUX_UNLOCK (splitmux);
1484       splitmux->fragment_id = 0;
1485       set_next_filename (splitmux);
1486       break;
1487     }
1488     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1489       GST_SPLITMUX_LOCK (splitmux);
1490       /* Start by collecting one input on each pad */
1491       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1492       splitmux->max_in_running_time = 0;
1493       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1494       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1495       GST_SPLITMUX_UNLOCK (splitmux);
1496       break;
1497     }
1498     case GST_STATE_CHANGE_PAUSED_TO_READY:
1499     case GST_STATE_CHANGE_READY_TO_NULL:
1500       GST_SPLITMUX_LOCK (splitmux);
1501       splitmux->state = SPLITMUX_STATE_STOPPED;
1502       /* Wake up any blocked threads */
1503       GST_LOG_OBJECT (splitmux,
1504           "State change -> NULL or READY. Waking threads");
1505       GST_SPLITMUX_BROADCAST (splitmux);
1506       GST_SPLITMUX_UNLOCK (splitmux);
1507       break;
1508     default:
1509       break;
1510   }
1511
1512   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1513   if (ret == GST_STATE_CHANGE_FAILURE)
1514     goto beach;
1515
1516   switch (transition) {
1517     case GST_STATE_CHANGE_READY_TO_NULL:
1518       GST_SPLITMUX_LOCK (splitmux);
1519       splitmux->fragment_id = 0;
1520       gst_splitmux_reset (splitmux);
1521       GST_SPLITMUX_UNLOCK (splitmux);
1522       break;
1523     default:
1524       break;
1525   }
1526
1527 beach:
1528
1529   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1530       ret == GST_STATE_CHANGE_FAILURE) {
1531     /* Cleanup elements on failed transition out of NULL */
1532     gst_splitmux_reset (splitmux);
1533   }
1534   return ret;
1535 }
1536
1537 gboolean
1538 register_splitmuxsink (GstPlugin * plugin)
1539 {
1540   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1541       "Split File Muxing Sink");
1542
1543   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1544       GST_TYPE_SPLITMUX_SINK);
1545 }