splitmuxsink: Mask async-start/done while switching files.
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * is required, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * The splitting process is driven by the video stream contents, and
37  * the video stream must contain closed GOPs for the output file parts
38  * to be played individually correctly.
39  *
40  * <refsect2>
41  * <title>Example pipelines</title>
42  * |[
43  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
44  * ]|
45  * Records a video stream captured from a v4l2 device and muxes it into
46  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
47  * and 1MB maximum size.
48  * </refsect2>
49  */
50
51 #ifdef HAVE_CONFIG_H
52 #include "config.h"
53 #endif
54
55 #include <string.h>
56 #include "gstsplitmuxsink.h"
57
58 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
59 #define GST_CAT_DEFAULT splitmux_debug
60
61 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
62 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
63 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
64 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
65
66 #define GST_SPLITMUX_WAIT_ASYNC_DONE(s) g_cond_wait (&(s)->async_cond, &(s)->lock)
67 #define GST_SPLITMUX_BROADCAST_ASYNC_DONE(s) g_cond_broadcast (&(s)->async_cond)
68
69 enum
70 {
71   PROP_0,
72   PROP_LOCATION,
73   PROP_MAX_SIZE_TIME,
74   PROP_MAX_SIZE_BYTES,
75   PROP_MUXER_OVERHEAD,
76   PROP_MUXER,
77   PROP_SINK
78 };
79
80 #define DEFAULT_MAX_SIZE_TIME       0
81 #define DEFAULT_MAX_SIZE_BYTES      0
82 #define DEFAULT_MUXER_OVERHEAD      0.02
83 #define DEFAULT_MUXER "mp4mux"
84 #define DEFAULT_SINK "filesink"
85
86 static GstStaticPadTemplate video_sink_template =
87 GST_STATIC_PAD_TEMPLATE ("video",
88     GST_PAD_SINK,
89     GST_PAD_REQUEST,
90     GST_STATIC_CAPS_ANY);
91 static GstStaticPadTemplate audio_sink_template =
92 GST_STATIC_PAD_TEMPLATE ("audio_%u",
93     GST_PAD_SINK,
94     GST_PAD_REQUEST,
95     GST_STATIC_CAPS_ANY);
96 static GstStaticPadTemplate subtitle_sink_template =
97 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
98     GST_PAD_SINK,
99     GST_PAD_REQUEST,
100     GST_STATIC_CAPS_ANY);
101
102 static GQuark PAD_CONTEXT;
103
104 static void
105 _do_init (void)
106 {
107   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
108 }
109
110 #define gst_splitmux_sink_parent_class parent_class
111 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
112     _do_init ());
113
114 static gboolean create_elements (GstSplitMuxSink * splitmux);
115 static gboolean create_sink (GstSplitMuxSink * splitmux);
116 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
117     const GValue * value, GParamSpec * pspec);
118 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
119     GValue * value, GParamSpec * pspec);
120 static void gst_splitmux_sink_dispose (GObject * object);
121 static void gst_splitmux_sink_finalize (GObject * object);
122
123 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
124     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
125 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
126
127 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
128     element, GstStateChange transition);
129
130 static void bus_handler (GstBin * bin, GstMessage * msg);
131 static void set_next_filename (GstSplitMuxSink * splitmux);
132 static void start_next_fragment (GstSplitMuxSink * splitmux);
133 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
134 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
135
136 static MqStreamBuf *
137 mq_stream_buf_new (void)
138 {
139   return g_slice_new0 (MqStreamBuf);
140 }
141
142 static void
143 mq_stream_buf_free (MqStreamBuf * data)
144 {
145   g_slice_free (MqStreamBuf, data);
146 }
147
148 static void
149 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
150 {
151   GObjectClass *gobject_class = (GObjectClass *) klass;
152   GstElementClass *gstelement_class = (GstElementClass *) klass;
153   GstBinClass *gstbin_class = (GstBinClass *) klass;
154
155   gobject_class->set_property = gst_splitmux_sink_set_property;
156   gobject_class->get_property = gst_splitmux_sink_get_property;
157   gobject_class->dispose = gst_splitmux_sink_dispose;
158   gobject_class->finalize = gst_splitmux_sink_finalize;
159
160   gst_element_class_set_static_metadata (gstelement_class,
161       "Split Muxing Bin", "Generic/Bin/Muxer",
162       "Convenience bin that muxes incoming streams into multiple time/size limited files",
163       "Jan Schmidt <jan@centricular.com>");
164
165   gst_element_class_add_pad_template (gstelement_class,
166       gst_static_pad_template_get (&video_sink_template));
167   gst_element_class_add_pad_template (gstelement_class,
168       gst_static_pad_template_get (&audio_sink_template));
169   gst_element_class_add_pad_template (gstelement_class,
170       gst_static_pad_template_get (&subtitle_sink_template));
171
172   gstelement_class->change_state =
173       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
174   gstelement_class->request_new_pad =
175       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
176   gstelement_class->release_pad =
177       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
178
179   gstbin_class->handle_message = bus_handler;
180
181   g_object_class_install_property (gobject_class, PROP_LOCATION,
182       g_param_spec_string ("location", "File Output Pattern",
183           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
184           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
186       g_param_spec_double ("mux-overhead", "Muxing Overhead",
187           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
188           DEFAULT_MUXER_OVERHEAD,
189           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
190
191   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
192       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
193           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
194           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
195   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
196       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
197           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
198           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
199
200   g_object_class_install_property (gobject_class, PROP_MUXER,
201       g_param_spec_object ("muxer", "Muxer",
202           "The muxer element to use (NULL = default mp4mux)",
203           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
204   g_object_class_install_property (gobject_class, PROP_SINK,
205       g_param_spec_object ("sink", "Sink",
206           "The sink element (or element chain) to use (NULL = default filesink)",
207           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
208 }
209
210 static void
211 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
212 {
213   g_mutex_init (&splitmux->lock);
214   g_cond_init (&splitmux->data_cond);
215   g_cond_init (&splitmux->async_cond);
216
217   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
218   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
219   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
220
221   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
222 }
223
224 static void
225 gst_splitmux_reset (GstSplitMuxSink * splitmux)
226 {
227   if (splitmux->mq)
228     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
229   if (splitmux->muxer)
230     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
231   if (splitmux->active_sink)
232     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
233
234   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
235       NULL;
236 }
237
238 static void
239 gst_splitmux_sink_dispose (GObject * object)
240 {
241   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
242
243   G_OBJECT_CLASS (parent_class)->dispose (object);
244
245   /* Calling parent dispose invalidates all child pointers */
246   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
247       NULL;
248 }
249
250 static void
251 gst_splitmux_sink_finalize (GObject * object)
252 {
253   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
254   g_cond_clear (&splitmux->data_cond);
255   g_cond_clear (&splitmux->async_cond);
256   g_mutex_clear (&splitmux->lock);
257   if (splitmux->provided_sink)
258     gst_object_unref (splitmux->provided_sink);
259   if (splitmux->provided_muxer)
260     gst_object_unref (splitmux->provided_muxer);
261
262   g_free (splitmux->location);
263
264   /* Make sure to free any un-released contexts */
265   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
266   g_list_free (splitmux->contexts);
267
268   G_OBJECT_CLASS (parent_class)->finalize (object);
269 }
270
271 static void
272 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
273     const GValue * value, GParamSpec * pspec)
274 {
275   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
276
277   switch (prop_id) {
278     case PROP_LOCATION:{
279       GST_OBJECT_LOCK (splitmux);
280       g_free (splitmux->location);
281       splitmux->location = g_value_dup_string (value);
282       GST_OBJECT_UNLOCK (splitmux);
283       break;
284     }
285     case PROP_MAX_SIZE_BYTES:
286       GST_OBJECT_LOCK (splitmux);
287       splitmux->threshold_bytes = g_value_get_uint64 (value);
288       GST_OBJECT_UNLOCK (splitmux);
289       break;
290     case PROP_MAX_SIZE_TIME:
291       GST_OBJECT_LOCK (splitmux);
292       splitmux->threshold_time = g_value_get_uint64 (value);
293       GST_OBJECT_UNLOCK (splitmux);
294       break;
295     case PROP_MUXER_OVERHEAD:
296       GST_OBJECT_LOCK (splitmux);
297       splitmux->mux_overhead = g_value_get_double (value);
298       GST_OBJECT_UNLOCK (splitmux);
299       break;
300     case PROP_SINK:
301       GST_OBJECT_LOCK (splitmux);
302       if (splitmux->provided_sink)
303         gst_object_unref (splitmux->provided_sink);
304       splitmux->provided_sink = g_value_dup_object (value);
305       GST_OBJECT_UNLOCK (splitmux);
306       break;
307     case PROP_MUXER:
308       GST_OBJECT_LOCK (splitmux);
309       if (splitmux->provided_muxer)
310         gst_object_unref (splitmux->provided_muxer);
311       splitmux->provided_muxer = g_value_dup_object (value);
312       GST_OBJECT_UNLOCK (splitmux);
313       break;
314     default:
315       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
316       break;
317   }
318 }
319
320 static void
321 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
322     GValue * value, GParamSpec * pspec)
323 {
324   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
325
326   switch (prop_id) {
327     case PROP_LOCATION:
328       GST_OBJECT_LOCK (splitmux);
329       g_value_set_string (value, splitmux->location);
330       GST_OBJECT_UNLOCK (splitmux);
331       break;
332     case PROP_MAX_SIZE_BYTES:
333       GST_OBJECT_LOCK (splitmux);
334       g_value_set_uint64 (value, splitmux->threshold_bytes);
335       GST_OBJECT_UNLOCK (splitmux);
336       break;
337     case PROP_MAX_SIZE_TIME:
338       GST_OBJECT_LOCK (splitmux);
339       g_value_set_uint64 (value, splitmux->threshold_time);
340       GST_OBJECT_UNLOCK (splitmux);
341       break;
342     case PROP_MUXER_OVERHEAD:
343       GST_OBJECT_LOCK (splitmux);
344       g_value_set_double (value, splitmux->mux_overhead);
345       GST_OBJECT_UNLOCK (splitmux);
346       break;
347     case PROP_SINK:
348       GST_OBJECT_LOCK (splitmux);
349       g_value_set_object (value, splitmux->provided_sink);
350       GST_OBJECT_UNLOCK (splitmux);
351       break;
352     case PROP_MUXER:
353       GST_OBJECT_LOCK (splitmux);
354       g_value_set_object (value, splitmux->provided_muxer);
355       GST_OBJECT_UNLOCK (splitmux);
356       break;
357     default:
358       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
359       break;
360   }
361 }
362
363 static GstPad *
364 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
365 {
366   gchar *tmp, *sinkname, *srcname;
367   GstPad *mq_src;
368
369   sinkname = gst_pad_get_name (sink_pad);
370   tmp = sinkname + 5;
371   srcname = g_strdup_printf ("src_%s", tmp);
372
373   mq_src = gst_element_get_static_pad (mq, srcname);
374
375   g_free (sinkname);
376   g_free (srcname);
377
378   return mq_src;
379 }
380
381 static gboolean
382 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
383     GstPad ** src_pad)
384 {
385   GstPad *mq_sink;
386   GstPad *mq_src;
387
388   /* Request a pad from multiqueue, then connect this one, then
389    * discover the corresponding output pad and return both */
390   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
391   if (mq_sink == NULL)
392     return FALSE;
393
394   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
395   if (mq_src == NULL)
396     goto fail;
397
398   *sink_pad = mq_sink;
399   *src_pad = mq_src;
400
401   return TRUE;
402
403 fail:
404   gst_element_release_request_pad (splitmux->mq, mq_sink);
405   return FALSE;
406 }
407
408 static MqStreamCtx *
409 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
410 {
411   MqStreamCtx *ctx;
412
413   ctx = g_new0 (MqStreamCtx, 1);
414   g_atomic_int_set (&ctx->refcount, 1);
415   ctx->splitmux = splitmux;
416   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
417   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
418   ctx->in_running_time = ctx->out_running_time = 0;
419   g_queue_init (&ctx->queued_bufs);
420   return ctx;
421 }
422
423 static void
424 mq_stream_ctx_free (MqStreamCtx * ctx)
425 {
426   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
427   g_queue_clear (&ctx->queued_bufs);
428   g_free (ctx);
429 }
430
431 static void
432 mq_stream_ctx_unref (MqStreamCtx * ctx)
433 {
434   if (g_atomic_int_dec_and_test (&ctx->refcount))
435     mq_stream_ctx_free (ctx);
436 }
437
438 static void
439 mq_stream_ctx_ref (MqStreamCtx * ctx)
440 {
441   g_atomic_int_inc (&ctx->refcount);
442 }
443
444 static void
445 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
446 {
447   ctx->sink_pad_block_id = 0;
448   mq_stream_ctx_unref (ctx);
449 }
450
451 static void
452 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
453 {
454   ctx->src_pad_block_id = 0;
455   mq_stream_ctx_unref (ctx);
456 }
457
458 /* Called with lock held, drops the lock to send EOS to the
459  * pad
460  */
461 static void
462 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
463 {
464   GstEvent *eos;
465   GstPad *pad;
466
467   eos = gst_event_new_eos ();
468   pad = gst_pad_get_peer (ctx->srcpad);
469
470   ctx->out_eos = TRUE;
471
472   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
473   GST_SPLITMUX_UNLOCK (splitmux);
474   gst_pad_send_event (pad, eos);
475   GST_SPLITMUX_LOCK (splitmux);
476
477   gst_object_unref (pad);
478 }
479
480 /* Called with splitmux lock held to check if this output
481  * context needs to sleep to wait for the release of the
482  * next GOP, or to send EOS to close out the current file
483  */
484 static void
485 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
486 {
487   do {
488
489     GST_LOG_OBJECT (ctx->srcpad,
490         "Checking running time %" GST_TIME_FORMAT " against max %"
491         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
492         GST_TIME_ARGS (splitmux->max_out_running_time));
493
494     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
495         ctx->out_running_time < splitmux->max_out_running_time) {
496       splitmux->have_muxed_something = TRUE;
497       return;
498     }
499
500     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
501       return;
502
503     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
504       if (ctx->out_eos == FALSE) {
505         send_eos (splitmux, ctx);
506         continue;
507       }
508     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
509       start_next_fragment (splitmux);
510       continue;
511     }
512
513     GST_INFO_OBJECT (ctx->srcpad,
514         "Sleeping for running time %"
515         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
516         GST_TIME_ARGS (ctx->out_running_time),
517         GST_TIME_ARGS (splitmux->max_out_running_time));
518     ctx->out_blocked = TRUE;
519     /* Expand the mq if needed before sleeping */
520     check_queue_length (splitmux, ctx);
521     GST_SPLITMUX_WAIT (splitmux);
522     ctx->out_blocked = FALSE;
523     GST_INFO_OBJECT (ctx->srcpad,
524         "Woken for new max running time %" GST_TIME_FORMAT,
525         GST_TIME_ARGS (splitmux->max_out_running_time));
526   } while (1);
527 }
528
529 static GstPadProbeReturn
530 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
531 {
532   GstSplitMuxSink *splitmux = ctx->splitmux;
533   MqStreamBuf *buf_info = NULL;
534
535   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
536
537   /* FIXME: Handle buffer lists, until then make it clear they won't work */
538   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
539     g_warning ("Buffer list handling not implemented");
540     return GST_PAD_PROBE_DROP;
541   }
542   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
543     GstEvent *event = gst_pad_probe_info_get_event (info);
544
545     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
546
547     switch (GST_EVENT_TYPE (event)) {
548       case GST_EVENT_SEGMENT:
549         gst_event_copy_segment (event, &ctx->out_segment);
550         break;
551       case GST_EVENT_FLUSH_STOP:
552         GST_SPLITMUX_LOCK (splitmux);
553         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
554         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
555         g_queue_clear (&ctx->queued_bufs);
556         ctx->flushing = FALSE;
557         GST_SPLITMUX_UNLOCK (splitmux);
558         break;
559       case GST_EVENT_FLUSH_START:
560         GST_SPLITMUX_LOCK (splitmux);
561         GST_LOG_OBJECT (pad, "Flush start");
562         ctx->flushing = TRUE;
563         GST_SPLITMUX_BROADCAST (splitmux);
564         GST_SPLITMUX_UNLOCK (splitmux);
565         break;
566       case GST_EVENT_EOS:
567         GST_SPLITMUX_LOCK (splitmux);
568         if (splitmux->state == SPLITMUX_STATE_STOPPED)
569           goto beach;
570         ctx->out_eos = TRUE;
571         GST_SPLITMUX_UNLOCK (splitmux);
572         break;
573       case GST_EVENT_GAP:{
574         GstClockTime gap_ts;
575
576         gst_event_parse_gap (event, &gap_ts, NULL);
577         if (gap_ts == GST_CLOCK_TIME_NONE)
578           break;
579
580         GST_SPLITMUX_LOCK (splitmux);
581
582         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
583             GST_FORMAT_TIME, gap_ts);
584
585         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
586             GST_TIME_ARGS (gap_ts));
587
588         if (splitmux->state == SPLITMUX_STATE_STOPPED)
589           goto beach;
590         ctx->out_running_time = gap_ts;
591         complete_or_wait_on_out (splitmux, ctx);
592         GST_SPLITMUX_UNLOCK (splitmux);
593         break;
594       }
595       default:
596         break;
597     }
598     return GST_PAD_PROBE_PASS;
599   }
600
601   /* Allow everything through until the configured next stopping point */
602   GST_SPLITMUX_LOCK (splitmux);
603
604   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
605   if (buf_info == NULL)
606     /* Can only happen due to a poorly timed flush */
607     goto beach;
608
609   /* If we have popped a keyframe, decrement the queued_gop count */
610   if (buf_info->keyframe && splitmux->queued_gops > 0)
611     splitmux->queued_gops--;
612
613   ctx->out_running_time = buf_info->run_ts;
614
615   GST_LOG_OBJECT (splitmux,
616       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
617       " size %" G_GSIZE_FORMAT,
618       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
619
620   complete_or_wait_on_out (splitmux, ctx);
621
622   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
623       splitmux->muxed_out_time < buf_info->run_ts)
624     splitmux->muxed_out_time = buf_info->run_ts;
625
626   splitmux->muxed_out_bytes += buf_info->buf_size;
627
628 #ifndef GST_DISABLE_GST_DEBUG
629   {
630     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
631     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
632         " run ts %" GST_TIME_FORMAT, buf,
633         GST_TIME_ARGS (ctx->out_running_time));
634   }
635 #endif
636
637   GST_SPLITMUX_UNLOCK (splitmux);
638
639   mq_stream_buf_free (buf_info);
640
641   return GST_PAD_PROBE_PASS;
642
643 beach:
644   GST_SPLITMUX_UNLOCK (splitmux);
645   return GST_PAD_PROBE_DROP;
646 }
647
648 static gboolean
649 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
650 {
651   return gst_pad_send_event (peer, gst_event_ref (*event));
652 }
653
654 static void
655 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
656 {
657   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
658
659   gst_pad_sticky_events_foreach (ctx->srcpad,
660       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
661
662   /* Clear EOS flag */
663   ctx->out_eos = FALSE;
664
665   gst_object_unref (peer);
666 }
667
668 /* Called with lock held when a fragment
669  * reaches EOS and it is time to restart
670  * a new fragment
671  */
672 static void
673 start_next_fragment (GstSplitMuxSink * splitmux)
674 {
675   if (splitmux->is_async) {
676     /* Wait for async done on the sink */
677     GST_SPLITMUX_WAIT_ASYNC_DONE (splitmux);
678   }
679
680   /* 1 change to new file */
681   splitmux->restarting = TRUE;
682
683   GST_SPLITMUX_UNLOCK (splitmux);
684   GST_STATE_LOCK (splitmux);
685
686   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
687   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
688
689   set_next_filename (splitmux);
690
691   gst_element_sync_state_with_parent (splitmux->active_sink);
692   gst_element_sync_state_with_parent (splitmux->muxer);
693
694   GST_STATE_UNLOCK (splitmux);
695   GST_SPLITMUX_LOCK (splitmux);
696
697   splitmux->restarting = FALSE;
698
699   GST_LOG_OBJECT (splitmux, "Restarting contexts to push more data");
700   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
701
702   /* Switch state and go back to processing */
703   splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
704
705   if (!splitmux->video_ctx->in_eos) {
706     splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
707   } else {
708     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
709     splitmux->have_muxed_something = FALSE;
710   }
711   splitmux->have_muxed_something =
712       (splitmux->video_ctx->in_running_time > splitmux->muxed_out_time);
713
714   /* Store the overflow parameters as the basis for the next fragment */
715   splitmux->mux_start_time = splitmux->muxed_out_time;
716   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
717
718   GST_DEBUG_OBJECT (splitmux,
719       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
720       GST_TIME_ARGS (splitmux->max_out_running_time));
721
722   GST_SPLITMUX_BROADCAST (splitmux);
723 }
724
725 static void
726 bus_handler (GstBin * bin, GstMessage * message)
727 {
728   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
729
730   switch (GST_MESSAGE_TYPE (message)) {
731     case GST_MESSAGE_EOS:
732       /* If the state is draining out the current file, drop this EOS */
733       GST_SPLITMUX_LOCK (splitmux);
734       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
735           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
736         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
737         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
738         GST_SPLITMUX_BROADCAST (splitmux);
739         goto drop;
740       }
741       GST_SPLITMUX_UNLOCK (splitmux);
742       GST_INFO_OBJECT (splitmux, "Passing EOS message");
743       break;
744     case GST_MESSAGE_ASYNC_START:
745       GST_SPLITMUX_LOCK (splitmux);
746       if (splitmux->restarting)
747         goto drop;
748       splitmux->is_async = TRUE;
749       GST_SPLITMUX_UNLOCK (splitmux);
750       break;
751     case GST_MESSAGE_ASYNC_DONE:
752       GST_SPLITMUX_LOCK (splitmux);
753       if (splitmux->is_async) {
754         GST_SPLITMUX_BROADCAST_ASYNC_DONE (splitmux);
755         splitmux->is_async = FALSE;
756       }
757       if (splitmux->restarting)
758         goto drop;
759       GST_SPLITMUX_UNLOCK (splitmux);
760       break;
761     default:
762       break;
763   }
764
765   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
766   return;
767
768 drop:
769   gst_message_unref (message);
770   GST_SPLITMUX_UNLOCK (splitmux);
771 }
772
773 /* Called with splitmux lock held */
774 /* Called when entering ProcessingCompleteGop state
775  * Assess if mq contents overflowed the current file
776  *   -> If yes, need to switch to new file
777  *   -> if no, set max_out_running_time to let this GOP in and
778  *      go to COLLECTING_GOP_START state
779  */
780 static void
781 handle_gathered_gop (GstSplitMuxSink * splitmux)
782 {
783   GList *cur;
784   gsize queued_bytes = 0;
785   GstClockTime queued_time = 0;
786
787   /* Assess if the multiqueue contents overflowed the current file */
788   for (cur = g_list_first (splitmux->contexts);
789       cur != NULL; cur = g_list_next (cur)) {
790     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
791     if (tmpctx->in_running_time > queued_time)
792       queued_time = tmpctx->in_running_time;
793     queued_bytes += tmpctx->in_bytes;
794   }
795
796   g_assert (queued_bytes >= splitmux->mux_start_bytes);
797   g_assert (queued_time >= splitmux->mux_start_time);
798
799   queued_bytes -= splitmux->mux_start_bytes;
800   queued_time -= splitmux->mux_start_time;
801
802   /* Expand queued bytes estimate by muxer overhead */
803   queued_bytes += (queued_bytes * splitmux->mux_overhead);
804
805   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
806       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
807
808   /* Check for overrun - have we output at least one byte and overrun
809    * either threshold? */
810   if (splitmux->have_muxed_something &&
811       ((splitmux->threshold_bytes > 0 &&
812               queued_bytes >= splitmux->threshold_bytes) ||
813           (splitmux->threshold_time > 0 &&
814               queued_time >= splitmux->threshold_time))) {
815
816     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
817
818     GST_INFO_OBJECT (splitmux,
819         "mq overflowed since last, draining out. max out TS is %"
820         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
821     GST_SPLITMUX_BROADCAST (splitmux);
822
823   } else {
824     /* No overflow */
825     GST_LOG_OBJECT (splitmux,
826         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
827         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
828         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
829         queued_bytes, GST_TIME_ARGS (queued_time));
830
831     /* Wake everyone up to push this one GOP, then sleep */
832     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
833     splitmux->have_muxed_something = TRUE;
834
835     if (!splitmux->video_ctx->in_eos)
836       splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
837     else
838       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
839
840     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
841         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
842     GST_SPLITMUX_BROADCAST (splitmux);
843   }
844
845 }
846
847 /* Called with splitmux lock held */
848 /* Called from each input pad when it is has all the pieces
849  * for a GOP or EOS, starting with the video pad which has set the
850  * splitmux->max_in_running_time
851  */
852 static void
853 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
854 {
855   GList *cur;
856   gboolean ready = TRUE;
857
858   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
859     /* Iterate each pad, and check that the input running time is at least
860      * up to the video runnning time, and if so handle the collected GOP */
861     GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
862     for (cur = g_list_first (splitmux->contexts);
863         cur != NULL; cur = g_list_next (cur)) {
864       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
865
866       GST_LOG_OBJECT (splitmux,
867           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
868           " EOS %d", tmpctx, tmpctx->srcpad,
869           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
870
871       if (tmpctx->in_running_time < splitmux->max_in_running_time &&
872           !tmpctx->in_eos) {
873         GST_LOG_OBJECT (splitmux,
874             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
875             tmpctx, tmpctx->srcpad);
876         ready = FALSE;
877         break;
878       }
879     }
880     if (ready) {
881       GST_DEBUG_OBJECT (splitmux,
882           "Collected GOP is complete. Processing (ctx %p)", ctx);
883       /* All pads have a complete GOP, release it into the multiqueue */
884       handle_gathered_gop (splitmux);
885     }
886   }
887
888   /* Some pad is not yet ready, or GOP is being pushed
889    * either way, sleep and wait to get woken */
890   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
891           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
892       !ctx->flushing) {
893
894     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
895         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
896         "GOP complete" : "EOF draining", ctx);
897     GST_SPLITMUX_WAIT (splitmux);
898
899     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
900   }
901 }
902
903 static void
904 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
905 {
906   GList *cur;
907   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
908
909   GST_DEBUG_OBJECT (ctx->sinkpad,
910       "Checking queue length len %u cur_max %u queued gops %u",
911       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
912
913   if (cur_len >= splitmux->mq_max_buffers) {
914     gboolean allow_grow = FALSE;
915
916     /* If collecting a GOP and this pad might block,
917      * and there isn't already a pending GOP in the queue
918      * then grow
919      */
920     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
921         ctx->in_running_time < splitmux->max_in_running_time &&
922         splitmux->queued_gops <= 1) {
923       allow_grow = TRUE;
924     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
925         ctx->is_video && splitmux->queued_gops <= 1) {
926       allow_grow = TRUE;
927     }
928
929     if (!allow_grow) {
930       for (cur = g_list_first (splitmux->contexts);
931           cur != NULL; cur = g_list_next (cur)) {
932         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
933         GST_DEBUG_OBJECT (tmpctx->sinkpad,
934             " len %u out_blocked %d",
935             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
936         /* If another stream is starving, grow */
937         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
938           allow_grow = TRUE;
939         }
940       }
941     }
942
943     if (allow_grow) {
944       splitmux->mq_max_buffers = cur_len + 1;
945
946       GST_INFO_OBJECT (splitmux,
947           "Multiqueue overrun - enlarging to %u buffers ctx %p",
948           splitmux->mq_max_buffers, ctx);
949
950       g_object_set (splitmux->mq, "max-size-buffers",
951           splitmux->mq_max_buffers, NULL);
952     }
953   }
954 }
955
956 static GstPadProbeReturn
957 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
958 {
959   GstSplitMuxSink *splitmux = ctx->splitmux;
960   GstBuffer *buf;
961   MqStreamBuf *buf_info = NULL;
962   GstClockTime ts;
963   gboolean loop_again;
964   gboolean keyframe = FALSE;
965
966   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
967
968   /* FIXME: Handle buffer lists, until then make it clear they won't work */
969   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
970     g_warning ("Buffer list handling not implemented");
971     return GST_PAD_PROBE_DROP;
972   }
973   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
974     GstEvent *event = gst_pad_probe_info_get_event (info);
975     switch (GST_EVENT_TYPE (event)) {
976       case GST_EVENT_SEGMENT:
977         gst_event_copy_segment (event, &ctx->in_segment);
978         break;
979       case GST_EVENT_FLUSH_STOP:
980         GST_SPLITMUX_LOCK (splitmux);
981         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
982         ctx->in_eos = FALSE;
983         ctx->in_bytes = 0;
984         ctx->in_running_time = 0;
985         GST_SPLITMUX_UNLOCK (splitmux);
986         break;
987       case GST_EVENT_EOS:
988         GST_SPLITMUX_LOCK (splitmux);
989         ctx->in_eos = TRUE;
990
991         if (splitmux->state == SPLITMUX_STATE_STOPPED)
992           goto beach;
993
994         if (ctx->is_video) {
995           GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
996           /* Act as if this is a new keyframe with infinite timestamp */
997           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
998           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
999           /* Wake up other input pads to collect this GOP */
1000           GST_SPLITMUX_BROADCAST (splitmux);
1001           check_completed_gop (splitmux, ctx);
1002         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1003           /* If we are waiting for a GOP to be completed (ie, for aux
1004            * pads to catch up), then this pad is complete, so check
1005            * if the whole GOP is.
1006            */
1007           check_completed_gop (splitmux, ctx);
1008         }
1009         GST_SPLITMUX_UNLOCK (splitmux);
1010         break;
1011       default:
1012         break;
1013     }
1014     return GST_PAD_PROBE_PASS;
1015   }
1016
1017   buf = gst_pad_probe_info_get_buffer (info);
1018   ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
1019       GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
1020   buf_info = mq_stream_buf_new ();
1021
1022   if (GST_BUFFER_PTS_IS_VALID (buf))
1023     ts = GST_BUFFER_PTS (buf);
1024   else
1025     ts = GST_BUFFER_DTS (buf);
1026
1027   GST_SPLITMUX_LOCK (splitmux);
1028
1029   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1030     goto beach;
1031
1032   /* If this buffer has a timestamp, advance the input timestamp of the
1033    * stream */
1034   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1035     GstClockTime running_time =
1036         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1037         GST_BUFFER_TIMESTAMP (buf));
1038
1039     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1040         (ctx->in_running_time == GST_CLOCK_TIME_NONE
1041             || running_time > ctx->in_running_time))
1042       ctx->in_running_time = running_time;
1043   }
1044
1045   /* Try to make sure we have a valid running time */
1046   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1047     ctx->in_running_time =
1048         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1049         ctx->in_segment.start);
1050   }
1051
1052   buf_info->run_ts = ctx->in_running_time;
1053   buf_info->buf_size = gst_buffer_get_size (buf);
1054
1055   /* Update total input byte counter for overflow detect */
1056   ctx->in_bytes += buf_info->buf_size;
1057
1058   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1059       " total in_bytes %" G_GSIZE_FORMAT,
1060       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1061
1062   loop_again = TRUE;
1063   do {
1064     if (ctx->flushing)
1065       break;
1066
1067     switch (splitmux->state) {
1068       case SPLITMUX_STATE_COLLECTING_GOP_START:
1069         if (ctx->is_video) {
1070           /* If a keyframe, we have a complete GOP */
1071           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1072               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1073               splitmux->max_in_running_time >= ctx->in_running_time) {
1074             /* Pass this buffer through */
1075             loop_again = FALSE;
1076             break;
1077           }
1078           GST_INFO_OBJECT (pad,
1079               "Have keyframe with running time %" GST_TIME_FORMAT,
1080               GST_TIME_ARGS (ctx->in_running_time));
1081           keyframe = TRUE;
1082           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1083           splitmux->max_in_running_time = ctx->in_running_time;
1084           /* Wake up other input pads to collect this GOP */
1085           GST_SPLITMUX_BROADCAST (splitmux);
1086           check_completed_gop (splitmux, ctx);
1087         } else {
1088           /* We're still waiting for a keyframe on the video pad, sleep */
1089           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1090           GST_SPLITMUX_WAIT (splitmux);
1091           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1092               splitmux->state);
1093         }
1094         break;
1095       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1096         /* After a GOP start is found, this buffer might complete the GOP */
1097         /* If we overran the target timestamp, it might be time to process
1098          * the GOP, otherwise bail out for more data
1099          */
1100         GST_LOG_OBJECT (pad,
1101             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1102             GST_TIME_ARGS (ctx->in_running_time),
1103             GST_TIME_ARGS (splitmux->max_in_running_time));
1104
1105         if (ctx->in_running_time < splitmux->max_in_running_time) {
1106           loop_again = FALSE;
1107           break;
1108         }
1109
1110         GST_LOG_OBJECT (pad,
1111             "Collected last packet of GOP. Checking other pads");
1112         check_completed_gop (splitmux, ctx);
1113         break;
1114       case SPLITMUX_STATE_ENDING_FILE:
1115       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1116         /* A fragment is ending, wait until that's done before continuing */
1117         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1118         GST_SPLITMUX_WAIT (splitmux);
1119         GST_DEBUG_OBJECT (pad,
1120             "Done sleeping for fragment restart state now %d", splitmux->state);
1121         break;
1122       default:
1123         loop_again = FALSE;
1124         break;
1125     }
1126   } while (loop_again);
1127
1128   if (keyframe) {
1129     splitmux->queued_gops++;
1130     buf_info->keyframe = TRUE;
1131   }
1132
1133   /* Now add this buffer to the queue just before returning */
1134   g_queue_push_head (&ctx->queued_bufs, buf_info);
1135
1136   /* Check the buffer will fit in the mq */
1137   check_queue_length (splitmux, ctx);
1138
1139   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1140       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1141
1142   GST_SPLITMUX_UNLOCK (splitmux);
1143   return GST_PAD_PROBE_PASS;
1144
1145 beach:
1146   GST_SPLITMUX_UNLOCK (splitmux);
1147   if (buf_info)
1148     mq_stream_buf_free (buf_info);
1149   return GST_PAD_PROBE_PASS;
1150 }
1151
1152 static GstPad *
1153 gst_splitmux_sink_request_new_pad (GstElement * element,
1154     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1155 {
1156   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1157   GstPadTemplate *mux_template = NULL;
1158   GstPad *res = NULL;
1159   GstPad *mq_sink, *mq_src;
1160   gchar *gname;
1161   gboolean is_video = FALSE;
1162   MqStreamCtx *ctx;
1163
1164   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1165
1166   GST_SPLITMUX_LOCK (splitmux);
1167   if (!create_elements (splitmux))
1168     goto fail;
1169
1170   if (templ->name_template) {
1171     if (g_str_equal (templ->name_template, "video")) {
1172       /* FIXME: Look for a pad template with matching caps, rather than by name */
1173       mux_template =
1174           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1175           (splitmux->muxer), "video_%u");
1176       is_video = TRUE;
1177       name = NULL;
1178     } else {
1179       mux_template =
1180           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1181           (splitmux->muxer), templ->name_template);
1182     }
1183   }
1184
1185   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1186   if (res == NULL)
1187     goto fail;
1188
1189   if (is_video)
1190     gname = g_strdup ("video");
1191   else if (name == NULL)
1192     gname = gst_pad_get_name (res);
1193   else
1194     gname = g_strdup (name);
1195
1196   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1197     gst_element_release_request_pad (splitmux->muxer, res);
1198     gst_object_unref (GST_OBJECT (res));
1199     goto fail;
1200   }
1201
1202   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1203     gst_element_release_request_pad (splitmux->muxer, res);
1204     gst_object_unref (GST_OBJECT (res));
1205     gst_element_release_request_pad (splitmux->mq, mq_sink);
1206     gst_object_unref (GST_OBJECT (mq_sink));
1207     goto fail;
1208   }
1209
1210   gst_object_unref (GST_OBJECT (res));
1211
1212   ctx = mq_stream_ctx_new (splitmux);
1213   ctx->is_video = is_video;
1214   ctx->srcpad = mq_src;
1215   ctx->sinkpad = mq_sink;
1216
1217   mq_stream_ctx_ref (ctx);
1218   ctx->src_pad_block_id =
1219       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1220       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1221       _pad_block_destroy_src_notify);
1222   if (is_video)
1223     splitmux->video_ctx = ctx;
1224
1225   res = gst_ghost_pad_new (gname, mq_sink);
1226   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1227
1228   mq_stream_ctx_ref (ctx);
1229   ctx->sink_pad_block_id =
1230       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1231       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1232       _pad_block_destroy_sink_notify);
1233
1234   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1235       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1236
1237   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1238
1239   g_free (gname);
1240
1241   gst_object_unref (mq_sink);
1242   gst_object_unref (mq_src);
1243
1244   gst_pad_set_active (res, TRUE);
1245   gst_element_add_pad (element, res);
1246   GST_SPLITMUX_UNLOCK (splitmux);
1247
1248   return res;
1249 fail:
1250   GST_SPLITMUX_UNLOCK (splitmux);
1251   return NULL;
1252 }
1253
1254 static void
1255 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1256 {
1257   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1258   GstPad *mqsink, *mqsrc, *muxpad;
1259   MqStreamCtx *ctx =
1260       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1261
1262   GST_SPLITMUX_LOCK (splitmux);
1263
1264   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1265     goto fail;                  /* Elements don't exist yet - nothing to release */
1266
1267   GST_INFO_OBJECT (pad, "releasing request pad");
1268
1269   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1270   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1271   muxpad = gst_pad_get_peer (mqsrc);
1272
1273   /* Remove the context from our consideration */
1274   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1275
1276   if (ctx->sink_pad_block_id)
1277     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1278
1279   if (ctx->src_pad_block_id)
1280     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1281
1282   /* Can release the context now */
1283   mq_stream_ctx_unref (ctx);
1284
1285   /* Release and free the mq input */
1286   gst_element_release_request_pad (splitmux->mq, mqsink);
1287
1288   /* Release and free the muxer input */
1289   gst_element_release_request_pad (splitmux->muxer, muxpad);
1290
1291   gst_object_unref (mqsink);
1292   gst_object_unref (mqsrc);
1293   gst_object_unref (muxpad);
1294
1295   gst_element_remove_pad (element, pad);
1296
1297 fail:
1298   GST_SPLITMUX_UNLOCK (splitmux);
1299 }
1300
1301 static GstElement *
1302 create_element (GstSplitMuxSink * splitmux,
1303     const gchar * factory, const gchar * name)
1304 {
1305   GstElement *ret = gst_element_factory_make (factory, name);
1306   if (ret == NULL) {
1307     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1308     return NULL;
1309   }
1310
1311   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1312     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1313     gst_object_unref (ret);
1314     return NULL;
1315   }
1316
1317   return ret;
1318 }
1319
1320 static gboolean
1321 create_elements (GstSplitMuxSink * splitmux)
1322 {
1323   /* Create internal elements */
1324   if (splitmux->mq == NULL) {
1325     if ((splitmux->mq =
1326             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1327       goto fail;
1328
1329     splitmux->mq_max_buffers = 5;
1330     /* No bytes or time limit, we limit buffers manually */
1331     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1332         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1333   }
1334
1335   if (splitmux->muxer == NULL) {
1336     GstElement *provided_muxer = NULL;
1337
1338     GST_OBJECT_LOCK (splitmux);
1339     if (splitmux->provided_muxer != NULL)
1340       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1341     GST_OBJECT_UNLOCK (splitmux);
1342
1343     if (provided_muxer == NULL) {
1344       if ((splitmux->muxer =
1345               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1346         goto fail;
1347     } else {
1348       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1349         g_warning ("Could not add muxer element - splitmuxsink will not work");
1350         gst_object_unref (provided_muxer);
1351         goto fail;
1352       }
1353
1354       splitmux->muxer = provided_muxer;
1355       gst_object_unref (provided_muxer);
1356     }
1357   }
1358
1359   return TRUE;
1360 fail:
1361   return FALSE;
1362 }
1363
1364 static GstElement *
1365 find_sink (GstElement * e)
1366 {
1367   GstElement *res = NULL;
1368   GstIterator *iter;
1369   gboolean done = FALSE;
1370   GValue data = { 0, };
1371
1372   if (!GST_IS_BIN (e))
1373     return e;
1374
1375   iter = gst_bin_iterate_sinks (GST_BIN (e));
1376   while (!done) {
1377     switch (gst_iterator_next (iter, &data)) {
1378       case GST_ITERATOR_OK:
1379       {
1380         GstElement *child = g_value_get_object (&data);
1381         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1382                 "location") != NULL) {
1383           res = child;
1384           done = TRUE;
1385         }
1386         g_value_reset (&data);
1387         break;
1388       }
1389       case GST_ITERATOR_RESYNC:
1390         gst_iterator_resync (iter);
1391         break;
1392       case GST_ITERATOR_DONE:
1393         done = TRUE;
1394         break;
1395       case GST_ITERATOR_ERROR:
1396         g_assert_not_reached ();
1397         break;
1398     }
1399   }
1400   g_value_unset (&data);
1401   gst_iterator_free (iter);
1402
1403   return res;
1404 }
1405
1406 static gboolean
1407 create_sink (GstSplitMuxSink * splitmux)
1408 {
1409   GstElement *provided_sink = NULL;
1410
1411   g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
1412
1413   GST_OBJECT_LOCK (splitmux);
1414   if (splitmux->provided_sink != NULL)
1415     provided_sink = gst_object_ref (splitmux->provided_sink);
1416   GST_OBJECT_UNLOCK (splitmux);
1417
1418   if (provided_sink == NULL) {
1419     if ((splitmux->sink =
1420             create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1421       goto fail;
1422     splitmux->active_sink = splitmux->sink;
1423   } else {
1424     if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1425       g_warning ("Could not add sink elements - splitmuxsink will not work");
1426       gst_object_unref (provided_sink);
1427       goto fail;
1428     }
1429
1430     splitmux->active_sink = provided_sink;
1431
1432     /* The bin holds a ref now, we can drop our tmp ref */
1433     gst_object_unref (provided_sink);
1434
1435     /* Find the sink element */
1436     splitmux->sink = find_sink (splitmux->active_sink);
1437     if (splitmux->sink == NULL) {
1438       g_warning
1439           ("Could not locate sink element in provided sink - splitmuxsink will not work");
1440       goto fail;
1441     }
1442   }
1443
1444   if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1445     g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1446     goto fail;
1447   }
1448
1449   return TRUE;
1450 fail:
1451   return FALSE;
1452 }
1453
1454 #ifdef __GNUC__
1455 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1456 #endif
1457 static void
1458 set_next_filename (GstSplitMuxSink * splitmux)
1459 {
1460   if (splitmux->location) {
1461     gchar *fname;
1462
1463     fname = g_strdup_printf (splitmux->location, splitmux->fragment_id);
1464     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1465     g_object_set (splitmux->sink, "location", fname, NULL);
1466     g_free (fname);
1467
1468     splitmux->fragment_id++;
1469   }
1470 }
1471
1472 static GstStateChangeReturn
1473 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1474 {
1475   GstStateChangeReturn ret;
1476   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1477
1478   switch (transition) {
1479     case GST_STATE_CHANGE_NULL_TO_READY:{
1480       GST_SPLITMUX_LOCK (splitmux);
1481       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1482         ret = GST_STATE_CHANGE_FAILURE;
1483         GST_SPLITMUX_UNLOCK (splitmux);
1484         goto beach;
1485       }
1486       GST_SPLITMUX_UNLOCK (splitmux);
1487       splitmux->fragment_id = 0;
1488       set_next_filename (splitmux);
1489       break;
1490     }
1491     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1492       GST_SPLITMUX_LOCK (splitmux);
1493       /* Start by collecting one input on each pad */
1494       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1495       splitmux->max_in_running_time = 0;
1496       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1497       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1498       GST_SPLITMUX_UNLOCK (splitmux);
1499       break;
1500     }
1501     case GST_STATE_CHANGE_PAUSED_TO_READY:
1502     case GST_STATE_CHANGE_READY_TO_NULL:
1503       GST_SPLITMUX_LOCK (splitmux);
1504       splitmux->state = SPLITMUX_STATE_STOPPED;
1505       /* Wake up any blocked threads */
1506       GST_LOG_OBJECT (splitmux,
1507           "State change -> NULL or READY. Waking threads");
1508       GST_SPLITMUX_BROADCAST (splitmux);
1509       GST_SPLITMUX_UNLOCK (splitmux);
1510       break;
1511     default:
1512       break;
1513   }
1514
1515   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1516   if (ret == GST_STATE_CHANGE_FAILURE)
1517     goto beach;
1518
1519   switch (transition) {
1520     case GST_STATE_CHANGE_READY_TO_NULL:
1521       GST_SPLITMUX_LOCK (splitmux);
1522       splitmux->fragment_id = 0;
1523       gst_splitmux_reset (splitmux);
1524       GST_SPLITMUX_UNLOCK (splitmux);
1525       break;
1526     default:
1527       break;
1528   }
1529
1530 beach:
1531
1532   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1533       ret == GST_STATE_CHANGE_FAILURE) {
1534     /* Cleanup elements on failed transition out of NULL */
1535     gst_splitmux_reset (splitmux);
1536   }
1537   return ret;
1538 }
1539
1540 gboolean
1541 register_splitmuxsink (GstPlugin * plugin)
1542 {
1543   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1544       "Split File Muxing Sink");
1545
1546   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1547       GST_TYPE_SPLITMUX_SINK);
1548 }