splitmuxsink: flag as sink from the start
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * is required, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * The splitting process is driven by the video stream contents, and
37  * the video stream must contain closed GOPs for the output file parts
38  * to be played individually correctly.
39  *
40  * <refsect2>
41  * <title>Example pipelines</title>
42  * |[
43  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
44  * ]|
45  *
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  *
50  * </refsect2>
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include <string.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MUXER_OVERHEAD,
75   PROP_MUXER,
76   PROP_SINK
77 };
78
79 #define DEFAULT_MAX_SIZE_TIME       0
80 #define DEFAULT_MAX_SIZE_BYTES      0
81 #define DEFAULT_MUXER_OVERHEAD      0.02
82 #define DEFAULT_MUXER "mp4mux"
83 #define DEFAULT_SINK "filesink"
84
85 static GstStaticPadTemplate video_sink_template =
86 GST_STATIC_PAD_TEMPLATE ("video",
87     GST_PAD_SINK,
88     GST_PAD_REQUEST,
89     GST_STATIC_CAPS_ANY);
90 static GstStaticPadTemplate audio_sink_template =
91 GST_STATIC_PAD_TEMPLATE ("audio_%u",
92     GST_PAD_SINK,
93     GST_PAD_REQUEST,
94     GST_STATIC_CAPS_ANY);
95 static GstStaticPadTemplate subtitle_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100
101 static GQuark PAD_CONTEXT;
102
103 static void
104 _do_init (void)
105 {
106   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
107 }
108
109 #define gst_splitmux_sink_parent_class parent_class
110 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
111     _do_init ());
112
113 static gboolean create_elements (GstSplitMuxSink * splitmux);
114 static gboolean create_sink (GstSplitMuxSink * splitmux);
115 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
116     const GValue * value, GParamSpec * pspec);
117 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
118     GValue * value, GParamSpec * pspec);
119 static void gst_splitmux_sink_dispose (GObject * object);
120 static void gst_splitmux_sink_finalize (GObject * object);
121
122 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
123     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
124 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
125
126 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
127     element, GstStateChange transition);
128
129 static void bus_handler (GstBin * bin, GstMessage * msg);
130 static void set_next_filename (GstSplitMuxSink * splitmux);
131 static void start_next_fragment (GstSplitMuxSink * splitmux);
132 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
133 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
134
135 static MqStreamBuf *
136 mq_stream_buf_new (void)
137 {
138   return g_slice_new0 (MqStreamBuf);
139 }
140
141 static void
142 mq_stream_buf_free (MqStreamBuf * data)
143 {
144   g_slice_free (MqStreamBuf, data);
145 }
146
147 static void
148 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
149 {
150   GObjectClass *gobject_class = (GObjectClass *) klass;
151   GstElementClass *gstelement_class = (GstElementClass *) klass;
152   GstBinClass *gstbin_class = (GstBinClass *) klass;
153
154   gobject_class->set_property = gst_splitmux_sink_set_property;
155   gobject_class->get_property = gst_splitmux_sink_get_property;
156   gobject_class->dispose = gst_splitmux_sink_dispose;
157   gobject_class->finalize = gst_splitmux_sink_finalize;
158
159   gst_element_class_set_static_metadata (gstelement_class,
160       "Split Muxing Bin", "Generic/Bin/Muxer",
161       "Convenience bin that muxes incoming streams into multiple time/size limited files",
162       "Jan Schmidt <jan@centricular.com>");
163
164   gst_element_class_add_pad_template (gstelement_class,
165       gst_static_pad_template_get (&video_sink_template));
166   gst_element_class_add_pad_template (gstelement_class,
167       gst_static_pad_template_get (&audio_sink_template));
168   gst_element_class_add_pad_template (gstelement_class,
169       gst_static_pad_template_get (&subtitle_sink_template));
170
171   gstelement_class->change_state =
172       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
173   gstelement_class->request_new_pad =
174       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
175   gstelement_class->release_pad =
176       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
177
178   gstbin_class->handle_message = bus_handler;
179
180   g_object_class_install_property (gobject_class, PROP_LOCATION,
181       g_param_spec_string ("location", "File Output Pattern",
182           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
183           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
184   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
185       g_param_spec_double ("mux-overhead", "Muxing Overhead",
186           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
187           DEFAULT_MUXER_OVERHEAD,
188           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
189
190   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
191       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
192           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
193           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
194   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
195       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
196           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
197           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
198
199   g_object_class_install_property (gobject_class, PROP_MUXER,
200       g_param_spec_object ("muxer", "Muxer",
201           "The muxer element to use (NULL = default mp4mux)",
202           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
203   g_object_class_install_property (gobject_class, PROP_SINK,
204       g_param_spec_object ("sink", "Sink",
205           "The sink element (or element chain) to use (NULL = default filesink)",
206           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
207 }
208
209 static void
210 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
211 {
212   g_mutex_init (&splitmux->lock);
213   g_cond_init (&splitmux->data_cond);
214
215   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
216   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
217   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
218
219   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
220 }
221
222 static void
223 gst_splitmux_reset (GstSplitMuxSink * splitmux)
224 {
225   if (splitmux->mq)
226     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
227   if (splitmux->muxer)
228     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
229   if (splitmux->active_sink)
230     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
231
232   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
233       NULL;
234 }
235
236 static void
237 gst_splitmux_sink_dispose (GObject * object)
238 {
239   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
240
241   G_OBJECT_CLASS (parent_class)->dispose (object);
242
243   /* Calling parent dispose invalidates all child pointers */
244   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
245       NULL;
246 }
247
248 static void
249 gst_splitmux_sink_finalize (GObject * object)
250 {
251   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
252   g_cond_clear (&splitmux->data_cond);
253   if (splitmux->provided_sink)
254     gst_object_unref (splitmux->provided_sink);
255   if (splitmux->provided_muxer)
256     gst_object_unref (splitmux->provided_muxer);
257
258   g_free (splitmux->location);
259
260   /* Make sure to free any un-released contexts */
261   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
262   g_list_free (splitmux->contexts);
263
264   G_OBJECT_CLASS (parent_class)->finalize (object);
265 }
266
267 static void
268 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
269     const GValue * value, GParamSpec * pspec)
270 {
271   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
272
273   switch (prop_id) {
274     case PROP_LOCATION:{
275       GST_OBJECT_LOCK (splitmux);
276       g_free (splitmux->location);
277       splitmux->location = g_value_dup_string (value);
278       GST_OBJECT_UNLOCK (splitmux);
279       break;
280     }
281     case PROP_MAX_SIZE_BYTES:
282       GST_OBJECT_LOCK (splitmux);
283       splitmux->threshold_bytes = g_value_get_uint64 (value);
284       GST_OBJECT_UNLOCK (splitmux);
285       break;
286     case PROP_MAX_SIZE_TIME:
287       GST_OBJECT_LOCK (splitmux);
288       splitmux->threshold_time = g_value_get_uint64 (value);
289       GST_OBJECT_UNLOCK (splitmux);
290       break;
291     case PROP_MUXER_OVERHEAD:
292       GST_OBJECT_LOCK (splitmux);
293       splitmux->mux_overhead = g_value_get_double (value);
294       GST_OBJECT_UNLOCK (splitmux);
295       break;
296     case PROP_SINK:
297       GST_SPLITMUX_LOCK (splitmux);
298       if (splitmux->provided_sink)
299         gst_object_unref (splitmux->provided_sink);
300       splitmux->provided_sink = g_value_dup_object (value);
301       GST_SPLITMUX_UNLOCK (splitmux);
302       break;
303     case PROP_MUXER:
304       GST_SPLITMUX_LOCK (splitmux);
305       if (splitmux->provided_muxer)
306         gst_object_unref (splitmux->provided_muxer);
307       splitmux->provided_muxer = g_value_dup_object (value);
308       GST_SPLITMUX_UNLOCK (splitmux);
309       break;
310     default:
311       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
312       break;
313   }
314 }
315
316 static void
317 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
318     GValue * value, GParamSpec * pspec)
319 {
320   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
321
322   switch (prop_id) {
323     case PROP_LOCATION:
324       GST_OBJECT_LOCK (splitmux);
325       g_value_set_string (value, splitmux->location);
326       GST_OBJECT_UNLOCK (splitmux);
327       break;
328     case PROP_MAX_SIZE_BYTES:
329       GST_OBJECT_LOCK (splitmux);
330       g_value_set_uint64 (value, splitmux->threshold_bytes);
331       GST_OBJECT_UNLOCK (splitmux);
332       break;
333     case PROP_MAX_SIZE_TIME:
334       GST_OBJECT_LOCK (splitmux);
335       g_value_set_uint64 (value, splitmux->threshold_time);
336       GST_OBJECT_UNLOCK (splitmux);
337       break;
338     case PROP_MUXER_OVERHEAD:
339       GST_OBJECT_LOCK (splitmux);
340       g_value_set_double (value, splitmux->mux_overhead);
341       GST_OBJECT_UNLOCK (splitmux);
342       break;
343     case PROP_SINK:
344       GST_SPLITMUX_LOCK (splitmux);
345       g_value_set_object (value, splitmux->provided_sink);
346       GST_SPLITMUX_UNLOCK (splitmux);
347       break;
348     case PROP_MUXER:
349       GST_SPLITMUX_LOCK (splitmux);
350       g_value_set_object (value, splitmux->provided_muxer);
351       GST_SPLITMUX_UNLOCK (splitmux);
352       break;
353     default:
354       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
355       break;
356   }
357 }
358
359 static GstPad *
360 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
361 {
362   gchar *tmp, *sinkname, *srcname;
363   GstPad *mq_src;
364
365   sinkname = gst_pad_get_name (sink_pad);
366   tmp = sinkname + 5;
367   srcname = g_strdup_printf ("src_%s", tmp);
368
369   mq_src = gst_element_get_static_pad (mq, srcname);
370
371   g_free (sinkname);
372   g_free (srcname);
373
374   return mq_src;
375 }
376
377 static gboolean
378 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
379     GstPad ** src_pad)
380 {
381   GstPad *mq_sink;
382   GstPad *mq_src;
383
384   /* Request a pad from multiqueue, then connect this one, then
385    * discover the corresponding output pad and return both */
386   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
387   if (mq_sink == NULL)
388     return FALSE;
389
390   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
391   if (mq_src == NULL)
392     goto fail;
393
394   *sink_pad = mq_sink;
395   *src_pad = mq_src;
396
397   return TRUE;
398
399 fail:
400   gst_element_release_request_pad (splitmux->mq, mq_sink);
401   return FALSE;
402 }
403
404 static MqStreamCtx *
405 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
406 {
407   MqStreamCtx *ctx;
408
409   ctx = g_new0 (MqStreamCtx, 1);
410   g_atomic_int_set (&ctx->refcount, 1);
411   ctx->splitmux = splitmux;
412   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
413   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
414   ctx->in_running_time = ctx->out_running_time = 0;
415   g_queue_init (&ctx->queued_bufs);
416   return ctx;
417 }
418
419 static void
420 mq_stream_ctx_free (MqStreamCtx * ctx)
421 {
422   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
423   g_queue_clear (&ctx->queued_bufs);
424   g_free (ctx);
425 }
426
427 static void
428 mq_stream_ctx_unref (MqStreamCtx * ctx)
429 {
430   if (g_atomic_int_dec_and_test (&ctx->refcount))
431     mq_stream_ctx_free (ctx);
432 }
433
434 static void
435 mq_stream_ctx_ref (MqStreamCtx * ctx)
436 {
437   g_atomic_int_inc (&ctx->refcount);
438 }
439
440 static void
441 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
442 {
443   ctx->sink_pad_block_id = 0;
444   mq_stream_ctx_unref (ctx);
445 }
446
447 static void
448 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
449 {
450   ctx->src_pad_block_id = 0;
451   mq_stream_ctx_unref (ctx);
452 }
453
454 /* Called with lock held, drops the lock to send EOS to the
455  * pad
456  */
457 static void
458 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
459 {
460   GstEvent *eos;
461   GstPad *pad;
462
463   eos = gst_event_new_eos ();
464   pad = gst_pad_get_peer (ctx->srcpad);
465
466   ctx->out_eos = TRUE;
467
468   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
469   GST_SPLITMUX_UNLOCK (splitmux);
470   gst_pad_send_event (pad, eos);
471   GST_SPLITMUX_LOCK (splitmux);
472
473   gst_object_unref (pad);
474 }
475
476 /* Called with splitmux lock held to check if this output
477  * context needs to sleep to wait for the release of the
478  * next GOP, or to send EOS to close out the current file
479  */
480 static void
481 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
482 {
483   do {
484
485     GST_LOG_OBJECT (ctx->srcpad,
486         "Checking running time %" GST_TIME_FORMAT " against max %"
487         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
488         GST_TIME_ARGS (splitmux->max_out_running_time));
489
490     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
491         ctx->out_running_time < splitmux->max_out_running_time)
492       return;
493
494     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
495       return;
496
497     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
498       if (ctx->out_eos == FALSE) {
499         send_eos (splitmux, ctx);
500         continue;
501       }
502     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
503       start_next_fragment (splitmux);
504       continue;
505     }
506
507     GST_INFO_OBJECT (ctx->srcpad,
508         "Sleeping for running time %"
509         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
510         GST_TIME_ARGS (ctx->out_running_time),
511         GST_TIME_ARGS (splitmux->max_out_running_time));
512     ctx->out_blocked = TRUE;
513     /* Expand the mq if needed before sleeping */
514     check_queue_length (splitmux, ctx);
515     GST_SPLITMUX_WAIT (splitmux);
516     ctx->out_blocked = FALSE;
517     GST_INFO_OBJECT (ctx->srcpad,
518         "Woken for new max running time %" GST_TIME_FORMAT,
519         GST_TIME_ARGS (splitmux->max_out_running_time));
520   } while (1);
521 }
522
523 static GstPadProbeReturn
524 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
525 {
526   GstSplitMuxSink *splitmux = ctx->splitmux;
527   MqStreamBuf *buf_info = NULL;
528
529   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
530
531   /* FIXME: Handle buffer lists, until then make it clear they won't work */
532   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
533     g_warning ("Buffer list handling not implemented");
534     return GST_PAD_PROBE_DROP;
535   }
536   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
537     GstEvent *event = gst_pad_probe_info_get_event (info);
538
539     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
540
541     switch (GST_EVENT_TYPE (event)) {
542       case GST_EVENT_SEGMENT:
543         gst_event_copy_segment (event, &ctx->out_segment);
544         break;
545       case GST_EVENT_FLUSH_STOP:
546         GST_SPLITMUX_LOCK (splitmux);
547         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
548         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
549         g_queue_clear (&ctx->queued_bufs);
550         ctx->flushing = FALSE;
551         GST_SPLITMUX_UNLOCK (splitmux);
552         break;
553       case GST_EVENT_FLUSH_START:
554         GST_SPLITMUX_LOCK (splitmux);
555         GST_LOG_OBJECT (pad, "Flush start");
556         ctx->flushing = TRUE;
557         GST_SPLITMUX_BROADCAST (splitmux);
558         GST_SPLITMUX_UNLOCK (splitmux);
559         break;
560       case GST_EVENT_EOS:
561         GST_SPLITMUX_LOCK (splitmux);
562         if (splitmux->state == SPLITMUX_STATE_STOPPED)
563           goto beach;
564         ctx->out_eos = TRUE;
565         GST_SPLITMUX_UNLOCK (splitmux);
566         break;
567       case GST_EVENT_GAP:{
568         GstClockTime gap_ts;
569
570         gst_event_parse_gap (event, &gap_ts, NULL);
571         if (gap_ts == GST_CLOCK_TIME_NONE)
572           break;
573
574         GST_SPLITMUX_LOCK (splitmux);
575
576         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
577             GST_FORMAT_TIME, gap_ts);
578
579         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
580             GST_TIME_ARGS (gap_ts));
581
582         if (splitmux->state == SPLITMUX_STATE_STOPPED)
583           goto beach;
584         ctx->out_running_time = gap_ts;
585         complete_or_wait_on_out (splitmux, ctx);
586         GST_SPLITMUX_UNLOCK (splitmux);
587         break;
588       }
589       default:
590         break;
591     }
592     return GST_PAD_PROBE_PASS;
593   }
594
595   /* Allow everything through until the configured next stopping point */
596   GST_SPLITMUX_LOCK (splitmux);
597
598   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
599   if (buf_info == NULL)
600     /* Can only happen due to a poorly timed flush */
601     goto beach;
602
603   /* If we have popped a keyframe, decrement the queued_gop count */
604   if (buf_info->keyframe && splitmux->queued_gops > 0)
605     splitmux->queued_gops--;
606
607   ctx->out_running_time = buf_info->run_ts;
608
609   GST_LOG_OBJECT (splitmux,
610       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
611       " size %" G_GSIZE_FORMAT,
612       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
613
614   complete_or_wait_on_out (splitmux, ctx);
615
616   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
617       splitmux->muxed_out_time < buf_info->run_ts)
618     splitmux->muxed_out_time = buf_info->run_ts;
619
620   splitmux->muxed_out_bytes += buf_info->buf_size;
621
622 #ifndef GST_DISABLE_GST_DEBUG
623   {
624     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
625     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
626         " run ts %" GST_TIME_FORMAT, buf,
627         GST_TIME_ARGS (ctx->out_running_time));
628   }
629 #endif
630
631   GST_SPLITMUX_UNLOCK (splitmux);
632
633   mq_stream_buf_free (buf_info);
634
635   return GST_PAD_PROBE_PASS;
636
637 beach:
638   GST_SPLITMUX_UNLOCK (splitmux);
639   if (buf_info)
640     mq_stream_buf_free (buf_info);
641   return GST_PAD_PROBE_DROP;
642 }
643
644 static gboolean
645 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
646 {
647   return gst_pad_send_event (peer, gst_event_ref (*event));
648 }
649
650 static void
651 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
652 {
653   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
654
655   gst_pad_sticky_events_foreach (ctx->srcpad,
656       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
657
658   /* Clear EOS flag */
659   ctx->out_eos = FALSE;
660
661   gst_object_unref (peer);
662 }
663
664 /* Called with lock held when a fragment
665  * reaches EOS and it is time to restart
666  * a new fragment
667  */
668 static void
669 start_next_fragment (GstSplitMuxSink * splitmux)
670 {
671   /* 1 change to new file */
672   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
673   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
674
675   set_next_filename (splitmux);
676
677   gst_element_sync_state_with_parent (splitmux->active_sink);
678   gst_element_sync_state_with_parent (splitmux->muxer);
679
680   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
681
682   /* Switch state and go back to processing */
683   splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
684   if (!splitmux->video_ctx->in_eos)
685     splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
686   else
687     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
688
689   /* Store the overflow parameters as the basis for the next fragment */
690   splitmux->mux_start_time = splitmux->muxed_out_time;
691   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
692
693   GST_DEBUG_OBJECT (splitmux,
694       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
695       GST_TIME_ARGS (splitmux->max_out_running_time));
696
697   GST_SPLITMUX_BROADCAST (splitmux);
698 }
699
700 static void
701 bus_handler (GstBin * bin, GstMessage * message)
702 {
703   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
704
705   switch (GST_MESSAGE_TYPE (message)) {
706     case GST_MESSAGE_EOS:
707       /* If the state is draining out the current file, drop this EOS */
708       GST_SPLITMUX_LOCK (splitmux);
709       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
710           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
711         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
712         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
713         GST_SPLITMUX_BROADCAST (splitmux);
714
715         gst_message_unref (message);
716         GST_SPLITMUX_UNLOCK (splitmux);
717         return;
718       }
719       GST_SPLITMUX_UNLOCK (splitmux);
720       break;
721     default:
722       break;
723   }
724
725   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
726 }
727
728 /* Called with splitmux lock held */
729 /* Called when entering ProcessingCompleteGop state
730  * Assess if mq contents overflowed the current file
731  *   -> If yes, need to switch to new file
732  *   -> if no, set max_out_running_time to let this GOP in and
733  *      go to COLLECTING_GOP_START state
734  */
735 static void
736 handle_gathered_gop (GstSplitMuxSink * splitmux)
737 {
738   GList *cur;
739   gsize queued_bytes = 0;
740   GstClockTime queued_time = 0;
741
742   /* Assess if the multiqueue contents overflowed the current file */
743   for (cur = g_list_first (splitmux->contexts);
744       cur != NULL; cur = g_list_next (cur)) {
745     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
746     if (tmpctx->in_running_time > queued_time)
747       queued_time = tmpctx->in_running_time;
748     queued_bytes += tmpctx->in_bytes;
749   }
750
751   g_assert (queued_bytes >= splitmux->mux_start_bytes);
752   g_assert (queued_time >= splitmux->mux_start_time);
753
754   queued_bytes -= splitmux->mux_start_bytes;
755   queued_time -= splitmux->mux_start_time;
756
757   /* Expand queued bytes estimate by muxer overhead */
758   queued_bytes += (queued_bytes * splitmux->mux_overhead);
759
760   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
761       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
762
763   /* Check for overrun - have we output at least one byte and overrun
764    * either threshold? */
765   if ((splitmux->mux_start_bytes < splitmux->muxed_out_bytes) &&
766       ((splitmux->threshold_bytes > 0 &&
767               queued_bytes >= splitmux->threshold_bytes) ||
768           (splitmux->threshold_time > 0 &&
769               queued_time >= splitmux->threshold_time))) {
770
771     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
772
773     GST_INFO_OBJECT (splitmux,
774         "mq overflowed since last, draining out. max out TS is %"
775         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
776     GST_SPLITMUX_BROADCAST (splitmux);
777
778   } else {
779     /* No overflow */
780     GST_LOG_OBJECT (splitmux,
781         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
782         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
783         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
784         queued_bytes, GST_TIME_ARGS (queued_time));
785
786     /* Wake everyone up to push this one GOP, then sleep */
787     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
788     if (!splitmux->video_ctx->in_eos)
789       splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
790     else
791       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
792
793     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
794         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
795     GST_SPLITMUX_BROADCAST (splitmux);
796   }
797
798 }
799
800 /* Called with splitmux lock held */
801 /* Called from each input pad when it is has all the pieces
802  * for a GOP or EOS, starting with the video pad which has set the
803  * splitmux->max_in_running_time
804  */
805 static void
806 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
807 {
808   GList *cur;
809   gboolean ready = TRUE;
810
811   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
812     /* Iterate each pad, and check that the input running time is at least
813      * up to the video runnning time, and if so handle the collected GOP */
814     GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
815     for (cur = g_list_first (splitmux->contexts);
816         cur != NULL; cur = g_list_next (cur)) {
817       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
818
819       GST_LOG_OBJECT (splitmux,
820           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
821           " EOS %d", tmpctx, tmpctx->srcpad,
822           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
823
824       if (tmpctx->in_running_time < splitmux->max_in_running_time &&
825           !tmpctx->in_eos) {
826         GST_LOG_OBJECT (splitmux,
827             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
828             tmpctx, tmpctx->srcpad);
829         ready = FALSE;
830         break;
831       }
832     }
833     if (ready) {
834       GST_DEBUG_OBJECT (splitmux,
835           "Collected GOP is complete. Processing (ctx %p)", ctx);
836       /* All pads have a complete GOP, release it into the multiqueue */
837       handle_gathered_gop (splitmux);
838     }
839   }
840
841   /* Some pad is not yet ready, or GOP is being pushed
842    * either way, sleep and wait to get woken */
843   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
844           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
845       !ctx->flushing) {
846
847     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
848         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
849         "GOP complete" : "EOF draining", ctx);
850     GST_SPLITMUX_WAIT (splitmux);
851
852     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
853   }
854 }
855
856 static void
857 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
858 {
859   GList *cur;
860   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
861
862   GST_DEBUG_OBJECT (ctx->sinkpad,
863       "Checking queue length len %u cur_max %u queued gops %u",
864       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
865
866   if (cur_len >= splitmux->mq_max_buffers) {
867     gboolean allow_grow = FALSE;
868
869     /* If collecting a GOP and this pad might block,
870      * and there isn't already a pending GOP in the queue
871      * then grow
872      */
873     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
874         ctx->in_running_time < splitmux->max_in_running_time &&
875         splitmux->queued_gops <= 1) {
876       allow_grow = TRUE;
877     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
878         ctx->is_video) {
879       allow_grow = TRUE;
880     }
881
882     if (!allow_grow) {
883       for (cur = g_list_first (splitmux->contexts);
884           cur != NULL; cur = g_list_next (cur)) {
885         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
886         GST_DEBUG_OBJECT (tmpctx->sinkpad,
887             " len %u out_blocked %d",
888             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
889         /* If another stream is starving, grow */
890         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
891           allow_grow = TRUE;
892         }
893       }
894     }
895
896     if (allow_grow) {
897       splitmux->mq_max_buffers = cur_len + 1;
898
899       GST_INFO_OBJECT (splitmux,
900           "Multiqueue overrun - enlarging to %u buffers ctx %p",
901           splitmux->mq_max_buffers, ctx);
902
903       g_object_set (splitmux->mq, "max-size-buffers",
904           splitmux->mq_max_buffers, NULL);
905     }
906   }
907 }
908
909 static GstPadProbeReturn
910 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
911 {
912   GstSplitMuxSink *splitmux = ctx->splitmux;
913   GstBuffer *buf;
914   MqStreamBuf *buf_info = NULL;
915   GstClockTime ts;
916   gboolean loop_again;
917   gboolean keyframe = FALSE;
918
919   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
920
921   /* FIXME: Handle buffer lists, until then make it clear they won't work */
922   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
923     g_warning ("Buffer list handling not implemented");
924     return GST_PAD_PROBE_DROP;
925   }
926   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
927     GstEvent *event = gst_pad_probe_info_get_event (info);
928     switch (GST_EVENT_TYPE (event)) {
929       case GST_EVENT_SEGMENT:
930         gst_event_copy_segment (event, &ctx->in_segment);
931         break;
932       case GST_EVENT_FLUSH_STOP:
933         GST_SPLITMUX_LOCK (splitmux);
934         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
935         ctx->in_eos = FALSE;
936         ctx->in_bytes = 0;
937         ctx->in_running_time = 0;
938         GST_SPLITMUX_UNLOCK (splitmux);
939         break;
940       case GST_EVENT_EOS:
941         GST_SPLITMUX_LOCK (splitmux);
942         ctx->in_eos = TRUE;
943
944         if (splitmux->state == SPLITMUX_STATE_STOPPED)
945           goto beach;
946
947         if (ctx->is_video) {
948           GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
949           /* Act as if this is a new keyframe with infinite timestamp */
950           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
951           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
952           /* Wake up other input pads to collect this GOP */
953           GST_SPLITMUX_BROADCAST (splitmux);
954           check_completed_gop (splitmux, ctx);
955         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
956           /* If we are waiting for a GOP to be completed (ie, for aux
957            * pads to catch up), then this pad is complete, so check
958            * if the whole GOP is.
959            */
960           check_completed_gop (splitmux, ctx);
961         }
962         GST_SPLITMUX_UNLOCK (splitmux);
963         break;
964       default:
965         break;
966     }
967     return GST_PAD_PROBE_PASS;
968   }
969
970   buf = gst_pad_probe_info_get_buffer (info);
971   ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
972       GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
973   buf_info = mq_stream_buf_new ();
974
975   if (GST_BUFFER_PTS_IS_VALID (buf))
976     ts = GST_BUFFER_PTS (buf);
977   else
978     ts = GST_BUFFER_DTS (buf);
979
980   GST_SPLITMUX_LOCK (splitmux);
981
982   if (splitmux->state == SPLITMUX_STATE_STOPPED)
983     goto beach;
984
985   /* If this buffer has a timestamp, advance the input timestamp of the
986    * stream */
987   if (GST_CLOCK_TIME_IS_VALID (ts)) {
988     GstClockTime running_time =
989         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
990         GST_BUFFER_TIMESTAMP (buf));
991
992     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
993         (ctx->in_running_time == GST_CLOCK_TIME_NONE
994             || running_time > ctx->in_running_time))
995       ctx->in_running_time = running_time;
996   }
997
998   /* Try to make sure we have a valid running time */
999   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1000     ctx->in_running_time =
1001         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1002         ctx->in_segment.start);
1003   }
1004
1005   buf_info->run_ts = ctx->in_running_time;
1006   buf_info->buf_size = gst_buffer_get_size (buf);
1007
1008   /* Update total input byte counter for overflow detect */
1009   ctx->in_bytes += buf_info->buf_size;
1010
1011   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1012       " total in_bytes %" G_GSIZE_FORMAT,
1013       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1014
1015   loop_again = TRUE;
1016   do {
1017     if (ctx->flushing)
1018       break;
1019
1020     switch (splitmux->state) {
1021       case SPLITMUX_STATE_COLLECTING_GOP_START:
1022         if (ctx->is_video) {
1023           /* If a keyframe, we have a complete GOP */
1024           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1025               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1026               splitmux->max_in_running_time >= ctx->in_running_time) {
1027             /* Pass this buffer through */
1028             loop_again = FALSE;
1029             break;
1030           }
1031           GST_INFO_OBJECT (pad,
1032               "Have keyframe with running time %" GST_TIME_FORMAT,
1033               GST_TIME_ARGS (ctx->in_running_time));
1034           keyframe = TRUE;
1035           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1036           splitmux->max_in_running_time = ctx->in_running_time;
1037           /* Wake up other input pads to collect this GOP */
1038           GST_SPLITMUX_BROADCAST (splitmux);
1039           check_completed_gop (splitmux, ctx);
1040         } else {
1041           /* We're still waiting for a keyframe on the video pad, sleep */
1042           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1043           GST_SPLITMUX_WAIT (splitmux);
1044           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1045               splitmux->state);
1046         }
1047         break;
1048       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1049         /* After a GOP start is found, this buffer might complete the GOP */
1050         /* If we overran the target timestamp, it might be time to process
1051          * the GOP, otherwise bail out for more data
1052          */
1053         GST_LOG_OBJECT (pad,
1054             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1055             GST_TIME_ARGS (ctx->in_running_time),
1056             GST_TIME_ARGS (splitmux->max_in_running_time));
1057
1058         if (ctx->in_running_time < splitmux->max_in_running_time) {
1059           loop_again = FALSE;
1060           break;
1061         }
1062
1063         GST_LOG_OBJECT (pad,
1064             "Collected last packet of GOP. Checking other pads");
1065         check_completed_gop (splitmux, ctx);
1066         break;
1067       case SPLITMUX_STATE_ENDING_FILE:
1068       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1069         /* A fragment is ending, wait until that's done before continuing */
1070         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1071         GST_SPLITMUX_WAIT (splitmux);
1072         GST_DEBUG_OBJECT (pad,
1073             "Done sleeping for fragment restart state now %d", splitmux->state);
1074         break;
1075       default:
1076         loop_again = FALSE;
1077         break;
1078     }
1079   } while (loop_again);
1080
1081   if (keyframe) {
1082     splitmux->queued_gops++;
1083     buf_info->keyframe = TRUE;
1084   }
1085
1086   /* Now add this buffer to the queue just before returning */
1087   g_queue_push_head (&ctx->queued_bufs, buf_info);
1088
1089   /* Check the buffer will fit in the mq */
1090   check_queue_length (splitmux, ctx);
1091
1092   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1093       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1094
1095   GST_SPLITMUX_UNLOCK (splitmux);
1096   return GST_PAD_PROBE_PASS;
1097
1098 beach:
1099   GST_SPLITMUX_UNLOCK (splitmux);
1100   if (buf_info)
1101     mq_stream_buf_free (buf_info);
1102   return GST_PAD_PROBE_PASS;
1103 }
1104
1105 static GstPad *
1106 gst_splitmux_sink_request_new_pad (GstElement * element,
1107     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1108 {
1109   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1110   GstPadTemplate *mux_template = NULL;
1111   GstPad *res = NULL;
1112   GstPad *mq_sink, *mq_src;
1113   gchar *gname;
1114   gboolean is_video = FALSE;
1115   MqStreamCtx *ctx;
1116
1117   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1118
1119   GST_SPLITMUX_LOCK (splitmux);
1120   if (!create_elements (splitmux))
1121     goto fail;
1122
1123   if (templ->name_template) {
1124     if (g_str_equal (templ->name_template, "video")) {
1125       /* FIXME: Look for a pad template with matching caps, rather than by name */
1126       mux_template =
1127           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1128           (splitmux->muxer), "video_%u");
1129       is_video = TRUE;
1130       name = NULL;
1131     } else {
1132       mux_template =
1133           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1134           (splitmux->muxer), templ->name_template);
1135     }
1136   }
1137
1138   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1139   if (res == NULL)
1140     goto fail;
1141
1142   if (is_video)
1143     gname = g_strdup ("video");
1144   else if (name == NULL)
1145     gname = gst_pad_get_name (res);
1146   else
1147     gname = g_strdup (name);
1148
1149   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1150     gst_element_release_request_pad (splitmux->muxer, res);
1151     gst_object_unref (GST_OBJECT (res));
1152     goto fail;
1153   }
1154
1155   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1156     gst_element_release_request_pad (splitmux->muxer, res);
1157     gst_object_unref (GST_OBJECT (res));
1158     gst_element_release_request_pad (splitmux->mq, mq_sink);
1159     gst_object_unref (GST_OBJECT (mq_sink));
1160     goto fail;
1161   }
1162
1163   gst_object_unref (GST_OBJECT (res));
1164
1165   ctx = mq_stream_ctx_new (splitmux);
1166   ctx->is_video = is_video;
1167   ctx->srcpad = mq_src;
1168   ctx->sinkpad = mq_sink;
1169
1170   mq_stream_ctx_ref (ctx);
1171   ctx->src_pad_block_id =
1172       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1173       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1174       _pad_block_destroy_src_notify);
1175   if (is_video)
1176     splitmux->video_ctx = ctx;
1177
1178   res = gst_ghost_pad_new (gname, mq_sink);
1179   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1180
1181   mq_stream_ctx_ref (ctx);
1182   ctx->sink_pad_block_id =
1183       gst_pad_add_probe (res, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1184       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1185       _pad_block_destroy_sink_notify);
1186
1187   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1188       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1189
1190   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1191
1192   g_free (gname);
1193
1194   gst_object_unref (mq_sink);
1195   gst_object_unref (mq_src);
1196
1197   gst_pad_set_active (res, TRUE);
1198   gst_element_add_pad (element, res);
1199   GST_SPLITMUX_UNLOCK (splitmux);
1200
1201   return res;
1202 fail:
1203   GST_SPLITMUX_UNLOCK (splitmux);
1204   return NULL;
1205 }
1206
1207 static void
1208 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1209 {
1210   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1211   GstPad *mqsink, *mqsrc, *muxpad;
1212   MqStreamCtx *ctx =
1213       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1214
1215   GST_SPLITMUX_LOCK (splitmux);
1216
1217   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1218     goto fail;                  /* Elements don't exist yet - nothing to release */
1219
1220   GST_INFO_OBJECT (pad, "releasing request pad");
1221
1222   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1223   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1224   muxpad = gst_pad_get_peer (mqsrc);
1225
1226   /* Remove the context from our consideration */
1227   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1228
1229   if (ctx->sink_pad_block_id)
1230     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1231
1232   if (ctx->src_pad_block_id)
1233     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1234
1235   /* Can release the context now */
1236   mq_stream_ctx_unref (ctx);
1237
1238   /* Release and free the mq input */
1239   gst_element_release_request_pad (splitmux->mq, mqsink);
1240
1241   /* Release and free the muxer input */
1242   gst_element_release_request_pad (splitmux->muxer, muxpad);
1243
1244   gst_object_unref (mqsink);
1245   gst_object_unref (mqsrc);
1246   gst_object_unref (muxpad);
1247
1248   gst_element_remove_pad (element, pad);
1249
1250 fail:
1251   GST_SPLITMUX_UNLOCK (splitmux);
1252 }
1253
1254 static GstElement *
1255 create_element (GstSplitMuxSink * splitmux,
1256     const gchar * factory, const gchar * name)
1257 {
1258   GstElement *ret = gst_element_factory_make (factory, name);
1259   if (ret == NULL) {
1260     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1261     return NULL;
1262   }
1263
1264   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1265     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1266     gst_object_unref (ret);
1267     return NULL;
1268   }
1269
1270   return ret;
1271 }
1272
1273 static gboolean
1274 create_elements (GstSplitMuxSink * splitmux)
1275 {
1276   /* Create internal elements */
1277   if (splitmux->mq == NULL) {
1278     if ((splitmux->mq =
1279             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1280       goto fail;
1281
1282     splitmux->mq_max_buffers = 5;
1283     /* No bytes or time limit, we limit buffers manually */
1284     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1285         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1286   }
1287
1288   if (splitmux->muxer == NULL) {
1289     if (splitmux->provided_muxer == NULL) {
1290       if ((splitmux->muxer =
1291               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1292         goto fail;
1293     } else {
1294       splitmux->muxer = splitmux->provided_muxer;
1295       if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_muxer)) {
1296         g_warning ("Could not add muxer element - splitmuxsink will not work");
1297         goto fail;
1298       }
1299     }
1300   }
1301
1302   return TRUE;
1303 fail:
1304   return FALSE;
1305 }
1306
1307 static GstElement *
1308 find_sink (GstElement * e)
1309 {
1310   GstElement *res = NULL;
1311   GstIterator *iter;
1312   gboolean done = FALSE;
1313   GValue data = { 0, };
1314
1315   if (!GST_IS_BIN (e))
1316     return e;
1317
1318   iter = gst_bin_iterate_sinks (GST_BIN (e));
1319   while (!done) {
1320     switch (gst_iterator_next (iter, &data)) {
1321       case GST_ITERATOR_OK:
1322       {
1323         GstElement *child = g_value_get_object (&data);
1324         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1325                 "location") != NULL) {
1326           res = child;
1327           done = TRUE;
1328         }
1329         g_value_reset (&data);
1330         break;
1331       }
1332       case GST_ITERATOR_RESYNC:
1333         gst_iterator_resync (iter);
1334         break;
1335       case GST_ITERATOR_DONE:
1336         done = TRUE;
1337         break;
1338       case GST_ITERATOR_ERROR:
1339         g_assert_not_reached ();
1340         break;
1341     }
1342   }
1343   g_value_unset (&data);
1344   gst_iterator_free (iter);
1345
1346   return res;
1347 }
1348
1349 static gboolean
1350 create_sink (GstSplitMuxSink * splitmux)
1351 {
1352   g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
1353
1354   if (splitmux->provided_sink == NULL) {
1355     if ((splitmux->sink =
1356             create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1357       goto fail;
1358     splitmux->active_sink = splitmux->sink;
1359   } else {
1360     if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_sink)) {
1361       g_warning ("Could not add sink elements - splitmuxsink will not work");
1362       goto fail;
1363     }
1364
1365     splitmux->active_sink = splitmux->provided_sink;
1366
1367     /* Find the sink element */
1368     splitmux->sink = find_sink (splitmux->active_sink);
1369     if (splitmux->sink == NULL) {
1370       g_warning
1371           ("Could not locate sink element in provided sink - splitmuxsink will not work");
1372       goto fail;
1373     }
1374   }
1375
1376   if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1377     g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1378     goto fail;
1379   }
1380
1381   return TRUE;
1382 fail:
1383   return FALSE;
1384 }
1385
1386 #ifdef __GNUC__
1387 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1388 #endif
1389 static void
1390 set_next_filename (GstSplitMuxSink * splitmux)
1391 {
1392   if (splitmux->location) {
1393     gchar *fname;
1394
1395     fname = g_strdup_printf (splitmux->location, splitmux->fragment_id);
1396     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1397     g_object_set (splitmux->sink, "location", fname, NULL);
1398     g_free (fname);
1399
1400     splitmux->fragment_id++;
1401   }
1402 }
1403
1404 static GstStateChangeReturn
1405 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1406 {
1407   GstStateChangeReturn ret;
1408   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1409
1410   switch (transition) {
1411     case GST_STATE_CHANGE_NULL_TO_READY:{
1412       GST_SPLITMUX_LOCK (splitmux);
1413       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1414         ret = GST_STATE_CHANGE_FAILURE;
1415         GST_SPLITMUX_UNLOCK (splitmux);
1416         goto beach;
1417       }
1418       GST_SPLITMUX_UNLOCK (splitmux);
1419       splitmux->fragment_id = 0;
1420       set_next_filename (splitmux);
1421       break;
1422     }
1423     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1424       GST_SPLITMUX_LOCK (splitmux);
1425       /* Start by collecting one input on each pad */
1426       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1427       splitmux->max_in_running_time = 0;
1428       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1429       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1430       GST_SPLITMUX_UNLOCK (splitmux);
1431       break;
1432     }
1433     case GST_STATE_CHANGE_PAUSED_TO_READY:
1434     case GST_STATE_CHANGE_READY_TO_NULL:
1435       GST_SPLITMUX_LOCK (splitmux);
1436       splitmux->state = SPLITMUX_STATE_STOPPED;
1437       /* Wake up any blocked threads */
1438       GST_LOG_OBJECT (splitmux,
1439           "State change -> NULL or READY. Waking threads");
1440       GST_SPLITMUX_BROADCAST (splitmux);
1441       GST_SPLITMUX_UNLOCK (splitmux);
1442       break;
1443     default:
1444       break;
1445   }
1446
1447   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1448   if (ret == GST_STATE_CHANGE_FAILURE)
1449     goto beach;
1450
1451   switch (transition) {
1452     case GST_STATE_CHANGE_READY_TO_NULL:
1453       GST_SPLITMUX_LOCK (splitmux);
1454       splitmux->fragment_id = 0;
1455       gst_splitmux_reset (splitmux);
1456       GST_SPLITMUX_UNLOCK (splitmux);
1457       break;
1458     default:
1459       break;
1460   }
1461
1462 beach:
1463
1464   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1465       ret == GST_STATE_CHANGE_FAILURE) {
1466     /* Cleanup elements on failed transition out of NULL */
1467     gst_splitmux_reset (splitmux);
1468   }
1469   return ret;
1470 }
1471
1472 gboolean
1473 register_splitmuxsink (GstPlugin * plugin)
1474 {
1475   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1476       "Split File Muxing Sink");
1477
1478   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1479       GST_TYPE_SPLITMUX_SINK);
1480 }