splitmuxsrc: Connect to demux signals before activating
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MAX_FILES,
75   PROP_MUXER_OVERHEAD,
76   PROP_MUXER,
77   PROP_SINK
78 };
79
80 #define DEFAULT_MAX_SIZE_TIME       0
81 #define DEFAULT_MAX_SIZE_BYTES      0
82 #define DEFAULT_MAX_FILES           0
83 #define DEFAULT_MUXER_OVERHEAD      0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
86
87 enum
88 {
89   SIGNAL_FORMAT_LOCATION,
90   SIGNAL_LAST
91 };
92
93 static guint signals[SIGNAL_LAST];
94
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
102     GST_PAD_SINK,
103     GST_PAD_REQUEST,
104     GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
107     GST_PAD_SINK,
108     GST_PAD_REQUEST,
109     GST_STATIC_CAPS_ANY);
110
111 static GQuark PAD_CONTEXT;
112
113 static void
114 _do_init (void)
115 {
116   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
117 }
118
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
121     _do_init ());
122
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126     const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128     GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
131
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
135
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137     element, GstStateChange transition);
138
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
144
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
146
147 static MqStreamBuf *
148 mq_stream_buf_new (void)
149 {
150   return g_slice_new0 (MqStreamBuf);
151 }
152
153 static void
154 mq_stream_buf_free (MqStreamBuf * data)
155 {
156   g_slice_free (MqStreamBuf, data);
157 }
158
159 static void
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
161 {
162   GObjectClass *gobject_class = (GObjectClass *) klass;
163   GstElementClass *gstelement_class = (GstElementClass *) klass;
164   GstBinClass *gstbin_class = (GstBinClass *) klass;
165
166   gobject_class->set_property = gst_splitmux_sink_set_property;
167   gobject_class->get_property = gst_splitmux_sink_get_property;
168   gobject_class->dispose = gst_splitmux_sink_dispose;
169   gobject_class->finalize = gst_splitmux_sink_finalize;
170
171   gst_element_class_set_static_metadata (gstelement_class,
172       "Split Muxing Bin", "Generic/Bin/Muxer",
173       "Convenience bin that muxes incoming streams into multiple time/size limited files",
174       "Jan Schmidt <jan@centricular.com>");
175
176   gst_element_class_add_static_pad_template (gstelement_class,
177       &video_sink_template);
178   gst_element_class_add_static_pad_template (gstelement_class,
179       &audio_sink_template);
180   gst_element_class_add_static_pad_template (gstelement_class,
181       &subtitle_sink_template);
182
183   gstelement_class->change_state =
184       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185   gstelement_class->request_new_pad =
186       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187   gstelement_class->release_pad =
188       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
189
190   gstbin_class->handle_message = bus_handler;
191
192   g_object_class_install_property (gobject_class, PROP_LOCATION,
193       g_param_spec_string ("location", "File Output Pattern",
194           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197       g_param_spec_double ("mux-overhead", "Muxing Overhead",
198           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199           DEFAULT_MUXER_OVERHEAD,
200           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
201
202   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211       g_param_spec_uint ("max-files", "Max files",
212           "Maximum number of files to keep on disk. Once the maximum is reached,"
213           "old files start to be deleted to make room for new ones.",
214           0, G_MAXUINT, DEFAULT_MAX_FILES,
215           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
216
217
218   g_object_class_install_property (gobject_class, PROP_MUXER,
219       g_param_spec_object ("muxer", "Muxer",
220           "The muxer element to use (NULL = default mp4mux)",
221           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_SINK,
223       g_param_spec_object ("sink", "Sink",
224           "The sink element (or element chain) to use (NULL = default filesink)",
225           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   /**
228    * GstSplitMuxSink::format-location:
229    * @splitmux: the #GstSplitMuxSink
230    * @fragment_id: the sequence number of the file to be created
231    *
232    * Returns: the location to be used for the next output file
233    */
234   signals[SIGNAL_FORMAT_LOCATION] =
235       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
237 }
238
239 static void
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
241 {
242   g_mutex_init (&splitmux->lock);
243   g_cond_init (&splitmux->data_cond);
244
245   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248   splitmux->max_files = DEFAULT_MAX_FILES;
249
250   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251   g_object_set (splitmux, "async-handling", TRUE, NULL);
252 }
253
254 static void
255 gst_splitmux_reset (GstSplitMuxSink * splitmux)
256 {
257   if (splitmux->mq)
258     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
259   if (splitmux->muxer)
260     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
261   if (splitmux->active_sink)
262     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
263
264   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
265       NULL;
266 }
267
268 static void
269 gst_splitmux_sink_dispose (GObject * object)
270 {
271   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
272
273   G_OBJECT_CLASS (parent_class)->dispose (object);
274
275   /* Calling parent dispose invalidates all child pointers */
276   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
277       NULL;
278 }
279
280 static void
281 gst_splitmux_sink_finalize (GObject * object)
282 {
283   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
284   g_cond_clear (&splitmux->data_cond);
285   g_mutex_clear (&splitmux->lock);
286   if (splitmux->provided_sink)
287     gst_object_unref (splitmux->provided_sink);
288   if (splitmux->provided_muxer)
289     gst_object_unref (splitmux->provided_muxer);
290
291   g_free (splitmux->location);
292
293   /* Make sure to free any un-released contexts */
294   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
295   g_list_free (splitmux->contexts);
296
297   G_OBJECT_CLASS (parent_class)->finalize (object);
298 }
299
300 static void
301 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
302     const GValue * value, GParamSpec * pspec)
303 {
304   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
305
306   switch (prop_id) {
307     case PROP_LOCATION:{
308       GST_OBJECT_LOCK (splitmux);
309       g_free (splitmux->location);
310       splitmux->location = g_value_dup_string (value);
311       GST_OBJECT_UNLOCK (splitmux);
312       break;
313     }
314     case PROP_MAX_SIZE_BYTES:
315       GST_OBJECT_LOCK (splitmux);
316       splitmux->threshold_bytes = g_value_get_uint64 (value);
317       GST_OBJECT_UNLOCK (splitmux);
318       break;
319     case PROP_MAX_SIZE_TIME:
320       GST_OBJECT_LOCK (splitmux);
321       splitmux->threshold_time = g_value_get_uint64 (value);
322       GST_OBJECT_UNLOCK (splitmux);
323       break;
324     case PROP_MAX_FILES:
325       GST_OBJECT_LOCK (splitmux);
326       splitmux->max_files = g_value_get_uint (value);
327       GST_OBJECT_UNLOCK (splitmux);
328       break;
329     case PROP_MUXER_OVERHEAD:
330       GST_OBJECT_LOCK (splitmux);
331       splitmux->mux_overhead = g_value_get_double (value);
332       GST_OBJECT_UNLOCK (splitmux);
333       break;
334     case PROP_SINK:
335       GST_OBJECT_LOCK (splitmux);
336       if (splitmux->provided_sink)
337         gst_object_unref (splitmux->provided_sink);
338       splitmux->provided_sink = g_value_dup_object (value);
339       GST_OBJECT_UNLOCK (splitmux);
340       break;
341     case PROP_MUXER:
342       GST_OBJECT_LOCK (splitmux);
343       if (splitmux->provided_muxer)
344         gst_object_unref (splitmux->provided_muxer);
345       splitmux->provided_muxer = g_value_dup_object (value);
346       GST_OBJECT_UNLOCK (splitmux);
347       break;
348     default:
349       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
350       break;
351   }
352 }
353
354 static void
355 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
356     GValue * value, GParamSpec * pspec)
357 {
358   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
359
360   switch (prop_id) {
361     case PROP_LOCATION:
362       GST_OBJECT_LOCK (splitmux);
363       g_value_set_string (value, splitmux->location);
364       GST_OBJECT_UNLOCK (splitmux);
365       break;
366     case PROP_MAX_SIZE_BYTES:
367       GST_OBJECT_LOCK (splitmux);
368       g_value_set_uint64 (value, splitmux->threshold_bytes);
369       GST_OBJECT_UNLOCK (splitmux);
370       break;
371     case PROP_MAX_SIZE_TIME:
372       GST_OBJECT_LOCK (splitmux);
373       g_value_set_uint64 (value, splitmux->threshold_time);
374       GST_OBJECT_UNLOCK (splitmux);
375       break;
376     case PROP_MAX_FILES:
377       GST_OBJECT_LOCK (splitmux);
378       g_value_set_uint (value, splitmux->max_files);
379       GST_OBJECT_UNLOCK (splitmux);
380       break;
381     case PROP_MUXER_OVERHEAD:
382       GST_OBJECT_LOCK (splitmux);
383       g_value_set_double (value, splitmux->mux_overhead);
384       GST_OBJECT_UNLOCK (splitmux);
385       break;
386     case PROP_SINK:
387       GST_OBJECT_LOCK (splitmux);
388       g_value_set_object (value, splitmux->provided_sink);
389       GST_OBJECT_UNLOCK (splitmux);
390       break;
391     case PROP_MUXER:
392       GST_OBJECT_LOCK (splitmux);
393       g_value_set_object (value, splitmux->provided_muxer);
394       GST_OBJECT_UNLOCK (splitmux);
395       break;
396     default:
397       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
398       break;
399   }
400 }
401
402 static GstPad *
403 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
404 {
405   gchar *tmp, *sinkname, *srcname;
406   GstPad *mq_src;
407
408   sinkname = gst_pad_get_name (sink_pad);
409   tmp = sinkname + 5;
410   srcname = g_strdup_printf ("src_%s", tmp);
411
412   mq_src = gst_element_get_static_pad (mq, srcname);
413
414   g_free (sinkname);
415   g_free (srcname);
416
417   return mq_src;
418 }
419
420 static gboolean
421 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
422     GstPad ** src_pad)
423 {
424   GstPad *mq_sink;
425   GstPad *mq_src;
426
427   /* Request a pad from multiqueue, then connect this one, then
428    * discover the corresponding output pad and return both */
429   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
430   if (mq_sink == NULL)
431     return FALSE;
432
433   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
434   if (mq_src == NULL)
435     goto fail;
436
437   *sink_pad = mq_sink;
438   *src_pad = mq_src;
439
440   return TRUE;
441
442 fail:
443   gst_element_release_request_pad (splitmux->mq, mq_sink);
444   return FALSE;
445 }
446
447 static MqStreamCtx *
448 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
449 {
450   MqStreamCtx *ctx;
451
452   ctx = g_new0 (MqStreamCtx, 1);
453   g_atomic_int_set (&ctx->refcount, 1);
454   ctx->splitmux = splitmux;
455   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
456   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
457   ctx->in_running_time = ctx->out_running_time = 0;
458   g_queue_init (&ctx->queued_bufs);
459   return ctx;
460 }
461
462 static void
463 mq_stream_ctx_free (MqStreamCtx * ctx)
464 {
465   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
466   g_queue_clear (&ctx->queued_bufs);
467   g_free (ctx);
468 }
469
470 static void
471 mq_stream_ctx_unref (MqStreamCtx * ctx)
472 {
473   if (g_atomic_int_dec_and_test (&ctx->refcount))
474     mq_stream_ctx_free (ctx);
475 }
476
477 static void
478 mq_stream_ctx_ref (MqStreamCtx * ctx)
479 {
480   g_atomic_int_inc (&ctx->refcount);
481 }
482
483 static void
484 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
485 {
486   ctx->sink_pad_block_id = 0;
487   mq_stream_ctx_unref (ctx);
488 }
489
490 static void
491 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
492 {
493   ctx->src_pad_block_id = 0;
494   mq_stream_ctx_unref (ctx);
495 }
496
497 static void
498 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
499 {
500   gchar *location = NULL;
501   GstMessage *msg;
502   const gchar *msg_name = opened ?
503       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
504
505   g_object_get (splitmux->sink, "location", &location, NULL);
506
507   msg = gst_message_new_element (GST_OBJECT (splitmux),
508       gst_structure_new (msg_name,
509           "location", G_TYPE_STRING, location,
510           "running-time", GST_TYPE_CLOCK_TIME,
511           splitmux->reference_ctx->out_running_time, NULL));
512   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
513
514   g_free (location);
515 }
516
517 /* Called with lock held, drops the lock to send EOS to the
518  * pad
519  */
520 static void
521 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
522 {
523   GstEvent *eos;
524   GstPad *pad;
525
526   eos = gst_event_new_eos ();
527   pad = gst_pad_get_peer (ctx->srcpad);
528
529   ctx->out_eos = TRUE;
530
531   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
532   GST_SPLITMUX_UNLOCK (splitmux);
533   gst_pad_send_event (pad, eos);
534   GST_SPLITMUX_LOCK (splitmux);
535
536   gst_object_unref (pad);
537 }
538
539 /* Called with splitmux lock held to check if this output
540  * context needs to sleep to wait for the release of the
541  * next GOP, or to send EOS to close out the current file
542  */
543 static void
544 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
545 {
546   do {
547
548     GST_LOG_OBJECT (ctx->srcpad,
549         "Checking running time %" GST_TIME_FORMAT " against max %"
550         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
551         GST_TIME_ARGS (splitmux->max_out_running_time));
552
553     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
554         ctx->out_running_time < splitmux->max_out_running_time) {
555       splitmux->have_muxed_something = TRUE;
556       return;
557     }
558
559     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
560       return;
561
562     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
563       if (ctx->out_eos == FALSE) {
564         send_eos (splitmux, ctx);
565         continue;
566       }
567     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
568       start_next_fragment (splitmux);
569       continue;
570     }
571
572     GST_INFO_OBJECT (ctx->srcpad,
573         "Sleeping for running time %"
574         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
575         GST_TIME_ARGS (ctx->out_running_time),
576         GST_TIME_ARGS (splitmux->max_out_running_time));
577     ctx->out_blocked = TRUE;
578     /* Expand the mq if needed before sleeping */
579     check_queue_length (splitmux, ctx);
580     GST_SPLITMUX_WAIT (splitmux);
581     ctx->out_blocked = FALSE;
582     GST_INFO_OBJECT (ctx->srcpad,
583         "Woken for new max running time %" GST_TIME_FORMAT,
584         GST_TIME_ARGS (splitmux->max_out_running_time));
585   } while (1);
586 }
587
588 static GstPadProbeReturn
589 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
590 {
591   GstSplitMuxSink *splitmux = ctx->splitmux;
592   MqStreamBuf *buf_info = NULL;
593
594   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
595
596   /* FIXME: Handle buffer lists, until then make it clear they won't work */
597   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
598     g_warning ("Buffer list handling not implemented");
599     return GST_PAD_PROBE_DROP;
600   }
601   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
602     GstEvent *event = gst_pad_probe_info_get_event (info);
603
604     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
605
606     switch (GST_EVENT_TYPE (event)) {
607       case GST_EVENT_SEGMENT:
608         gst_event_copy_segment (event, &ctx->out_segment);
609         break;
610       case GST_EVENT_FLUSH_STOP:
611         GST_SPLITMUX_LOCK (splitmux);
612         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
613         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
614         g_queue_clear (&ctx->queued_bufs);
615         ctx->flushing = FALSE;
616         GST_SPLITMUX_UNLOCK (splitmux);
617         break;
618       case GST_EVENT_FLUSH_START:
619         GST_SPLITMUX_LOCK (splitmux);
620         GST_LOG_OBJECT (pad, "Flush start");
621         ctx->flushing = TRUE;
622         GST_SPLITMUX_BROADCAST (splitmux);
623         GST_SPLITMUX_UNLOCK (splitmux);
624         break;
625       case GST_EVENT_EOS:
626         GST_SPLITMUX_LOCK (splitmux);
627         if (splitmux->state == SPLITMUX_STATE_STOPPED)
628           goto beach;
629         ctx->out_eos = TRUE;
630         GST_SPLITMUX_UNLOCK (splitmux);
631         break;
632       case GST_EVENT_GAP:{
633         GstClockTime gap_ts;
634
635         gst_event_parse_gap (event, &gap_ts, NULL);
636         if (gap_ts == GST_CLOCK_TIME_NONE)
637           break;
638
639         GST_SPLITMUX_LOCK (splitmux);
640
641         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
642             GST_FORMAT_TIME, gap_ts);
643
644         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
645             GST_TIME_ARGS (gap_ts));
646
647         if (splitmux->state == SPLITMUX_STATE_STOPPED)
648           goto beach;
649         ctx->out_running_time = gap_ts;
650         complete_or_wait_on_out (splitmux, ctx);
651         GST_SPLITMUX_UNLOCK (splitmux);
652         break;
653       }
654       case GST_EVENT_CUSTOM_DOWNSTREAM:{
655         const GstStructure *s;
656         GstClockTime ts = 0;
657
658         s = gst_event_get_structure (event);
659         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
660           break;
661
662         gst_structure_get_uint64 (s, "timestamp", &ts);
663
664         GST_SPLITMUX_LOCK (splitmux);
665
666         if (splitmux->state == SPLITMUX_STATE_STOPPED)
667           goto beach;
668         ctx->out_running_time = ts;
669         complete_or_wait_on_out (splitmux, ctx);
670         GST_SPLITMUX_UNLOCK (splitmux);
671         return GST_PAD_PROBE_DROP;
672       }
673       default:
674         break;
675     }
676     return GST_PAD_PROBE_PASS;
677   }
678
679   /* Allow everything through until the configured next stopping point */
680   GST_SPLITMUX_LOCK (splitmux);
681
682   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
683   if (buf_info == NULL)
684     /* Can only happen due to a poorly timed flush */
685     goto beach;
686
687   /* If we have popped a keyframe, decrement the queued_gop count */
688   if (buf_info->keyframe && splitmux->queued_gops > 0)
689     splitmux->queued_gops--;
690
691   ctx->out_running_time = buf_info->run_ts;
692
693   GST_LOG_OBJECT (splitmux,
694       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
695       " size %" G_GSIZE_FORMAT,
696       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
697
698   if (splitmux->opening_first_fragment) {
699     send_fragment_opened_closed_msg (splitmux, TRUE);
700     splitmux->opening_first_fragment = FALSE;
701   }
702
703   complete_or_wait_on_out (splitmux, ctx);
704
705   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
706       splitmux->muxed_out_time < buf_info->run_ts)
707     splitmux->muxed_out_time = buf_info->run_ts;
708
709   splitmux->muxed_out_bytes += buf_info->buf_size;
710
711 #ifndef GST_DISABLE_GST_DEBUG
712   {
713     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
714     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
715         " run ts %" GST_TIME_FORMAT, buf,
716         GST_TIME_ARGS (ctx->out_running_time));
717   }
718 #endif
719
720   GST_SPLITMUX_UNLOCK (splitmux);
721
722   mq_stream_buf_free (buf_info);
723
724   return GST_PAD_PROBE_PASS;
725
726 beach:
727   GST_SPLITMUX_UNLOCK (splitmux);
728   return GST_PAD_PROBE_DROP;
729 }
730
731 static gboolean
732 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
733 {
734   return gst_pad_send_event (peer, gst_event_ref (*event));
735 }
736
737 static void
738 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
739 {
740   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
741
742   gst_pad_sticky_events_foreach (ctx->srcpad,
743       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
744
745   /* Clear EOS flag if not actually EOS */
746   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
747
748   gst_object_unref (peer);
749 }
750
751 /* Called with lock held when a fragment
752  * reaches EOS and it is time to restart
753  * a new fragment
754  */
755 static void
756 start_next_fragment (GstSplitMuxSink * splitmux)
757 {
758   /* 1 change to new file */
759   gst_element_set_locked_state (splitmux->muxer, TRUE);
760   gst_element_set_locked_state (splitmux->active_sink, TRUE);
761   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
762   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
763
764   set_next_filename (splitmux);
765
766   gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
767   gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
768   gst_element_set_locked_state (splitmux->muxer, FALSE);
769   gst_element_set_locked_state (splitmux->active_sink, FALSE);
770
771   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
772
773   /* Switch state and go back to processing */
774   if (!splitmux->reference_ctx->in_eos) {
775     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
776     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
777   } else {
778     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
779     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
780     splitmux->have_muxed_something = FALSE;
781   }
782   splitmux->have_muxed_something =
783       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
784
785   /* Store the overflow parameters as the basis for the next fragment */
786   splitmux->mux_start_time = splitmux->muxed_out_time;
787   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
788
789   GST_DEBUG_OBJECT (splitmux,
790       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
791       GST_TIME_ARGS (splitmux->max_out_running_time));
792
793   send_fragment_opened_closed_msg (splitmux, TRUE);
794
795   GST_SPLITMUX_BROADCAST (splitmux);
796 }
797
798 static void
799 bus_handler (GstBin * bin, GstMessage * message)
800 {
801   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
802
803   switch (GST_MESSAGE_TYPE (message)) {
804     case GST_MESSAGE_EOS:
805       /* If the state is draining out the current file, drop this EOS */
806       GST_SPLITMUX_LOCK (splitmux);
807
808       send_fragment_opened_closed_msg (splitmux, FALSE);
809
810       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
811           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
812         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
813         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
814         GST_SPLITMUX_BROADCAST (splitmux);
815
816         gst_message_unref (message);
817         GST_SPLITMUX_UNLOCK (splitmux);
818         return;
819       }
820       GST_SPLITMUX_UNLOCK (splitmux);
821       break;
822     default:
823       break;
824   }
825
826   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
827 }
828
829 /* Called with splitmux lock held */
830 /* Called when entering ProcessingCompleteGop state
831  * Assess if mq contents overflowed the current file
832  *   -> If yes, need to switch to new file
833  *   -> if no, set max_out_running_time to let this GOP in and
834  *      go to COLLECTING_GOP_START state
835  */
836 static void
837 handle_gathered_gop (GstSplitMuxSink * splitmux)
838 {
839   GList *cur;
840   gsize queued_bytes = 0;
841   GstClockTime queued_time = 0;
842
843   /* Assess if the multiqueue contents overflowed the current file */
844   for (cur = g_list_first (splitmux->contexts);
845       cur != NULL; cur = g_list_next (cur)) {
846     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
847     if (tmpctx->in_running_time > queued_time)
848       queued_time = tmpctx->in_running_time;
849     queued_bytes += tmpctx->in_bytes;
850   }
851
852   g_assert (queued_bytes >= splitmux->mux_start_bytes);
853   g_assert (queued_time >= splitmux->mux_start_time);
854
855   queued_bytes -= splitmux->mux_start_bytes;
856   queued_time -= splitmux->mux_start_time;
857
858   /* Expand queued bytes estimate by muxer overhead */
859   queued_bytes += (queued_bytes * splitmux->mux_overhead);
860
861   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
862       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
863
864   /* Check for overrun - have we output at least one byte and overrun
865    * either threshold? */
866   if ((splitmux->have_muxed_something &&
867           ((splitmux->threshold_bytes > 0 &&
868                   queued_bytes >= splitmux->threshold_bytes) ||
869               (splitmux->threshold_time > 0 &&
870                   queued_time >= splitmux->threshold_time)))) {
871
872     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
873
874     GST_INFO_OBJECT (splitmux,
875         "mq overflowed since last, draining out. max out TS is %"
876         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
877     GST_SPLITMUX_BROADCAST (splitmux);
878
879   } else {
880     /* No overflow */
881     GST_LOG_OBJECT (splitmux,
882         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
883         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
884         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
885         queued_bytes, GST_TIME_ARGS (queued_time));
886
887     /* Wake everyone up to push this one GOP, then sleep */
888     splitmux->have_muxed_something = TRUE;
889
890     if (!splitmux->reference_ctx->in_eos) {
891       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
892       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
893     } else {
894       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
895       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
896     }
897
898     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
899         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
900     GST_SPLITMUX_BROADCAST (splitmux);
901   }
902
903 }
904
905 /* Called with splitmux lock held */
906 /* Called from each input pad when it is has all the pieces
907  * for a GOP or EOS, starting with the reference pad which has set the
908  * splitmux->max_in_running_time
909  */
910 static void
911 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
912 {
913   GList *cur;
914   gboolean ready = TRUE;
915   GstClockTime current_max_in_running_time;
916
917   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
918     /* Iterate each pad, and check that the input running time is at least
919      * up to the reference running time, and if so handle the collected GOP */
920     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
921         GST_TIME_FORMAT " ctx %p",
922         GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
923     for (cur = g_list_first (splitmux->contexts); cur != NULL;
924         cur = g_list_next (cur)) {
925       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
926
927       GST_LOG_OBJECT (splitmux,
928           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
929           " EOS %d", tmpctx, tmpctx->srcpad,
930           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
931
932       if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
933           tmpctx->in_running_time < splitmux->max_in_running_time &&
934           !tmpctx->in_eos) {
935         GST_LOG_OBJECT (splitmux,
936             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
937             tmpctx, tmpctx->srcpad);
938         ready = FALSE;
939         break;
940       }
941     }
942     if (ready) {
943       GST_DEBUG_OBJECT (splitmux,
944           "Collected GOP is complete. Processing (ctx %p)", ctx);
945       /* All pads have a complete GOP, release it into the multiqueue */
946       handle_gathered_gop (splitmux);
947     }
948   }
949
950   /* If upstream reached EOS we are not expecting more data, no need to wait
951    * here. */
952   if (ctx->in_eos)
953     return;
954
955   /* Some pad is not yet ready, or GOP is being pushed
956    * either way, sleep and wait to get woken */
957   current_max_in_running_time = splitmux->max_in_running_time;
958   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
959           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
960       !ctx->flushing &&
961       (current_max_in_running_time == splitmux->max_in_running_time)) {
962
963     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
964         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
965         "GOP complete" : "EOF draining", ctx);
966     GST_SPLITMUX_WAIT (splitmux);
967
968     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
969   }
970 }
971
972 static void
973 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
974 {
975   GList *cur;
976   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
977
978   GST_DEBUG_OBJECT (ctx->sinkpad,
979       "Checking queue length len %u cur_max %u queued gops %u",
980       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
981
982   if (cur_len >= splitmux->mq_max_buffers) {
983     gboolean allow_grow = FALSE;
984
985     /* If collecting a GOP and this pad might block,
986      * and there isn't already a pending GOP in the queue
987      * then grow
988      */
989     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
990         ctx->in_running_time < splitmux->max_in_running_time &&
991         splitmux->queued_gops <= 1) {
992       allow_grow = TRUE;
993     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
994         ctx->is_reference && splitmux->queued_gops <= 1) {
995       allow_grow = TRUE;
996     }
997
998     if (!allow_grow) {
999       for (cur = g_list_first (splitmux->contexts);
1000           cur != NULL; cur = g_list_next (cur)) {
1001         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1002         GST_DEBUG_OBJECT (tmpctx->sinkpad,
1003             " len %u out_blocked %d",
1004             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1005         /* If another stream is starving, grow */
1006         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1007           allow_grow = TRUE;
1008         }
1009       }
1010     }
1011
1012     if (allow_grow) {
1013       splitmux->mq_max_buffers = cur_len + 1;
1014
1015       GST_INFO_OBJECT (splitmux,
1016           "Multiqueue overrun - enlarging to %u buffers ctx %p",
1017           splitmux->mq_max_buffers, ctx);
1018
1019       g_object_set (splitmux->mq, "max-size-buffers",
1020           splitmux->mq_max_buffers, NULL);
1021     }
1022   }
1023 }
1024
1025 static GstPadProbeReturn
1026 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1027 {
1028   GstSplitMuxSink *splitmux = ctx->splitmux;
1029   GstBuffer *buf;
1030   MqStreamBuf *buf_info = NULL;
1031   GstClockTime ts;
1032   gboolean loop_again;
1033   gboolean keyframe = FALSE;
1034
1035   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1036
1037   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1038   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1039     g_warning ("Buffer list handling not implemented");
1040     return GST_PAD_PROBE_DROP;
1041   }
1042   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1043     GstEvent *event = gst_pad_probe_info_get_event (info);
1044     switch (GST_EVENT_TYPE (event)) {
1045       case GST_EVENT_SEGMENT:
1046         gst_event_copy_segment (event, &ctx->in_segment);
1047         break;
1048       case GST_EVENT_FLUSH_STOP:
1049         GST_SPLITMUX_LOCK (splitmux);
1050         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1051         ctx->in_eos = FALSE;
1052         ctx->in_bytes = 0;
1053         ctx->in_running_time = 0;
1054         GST_SPLITMUX_UNLOCK (splitmux);
1055         break;
1056       case GST_EVENT_EOS:
1057         GST_SPLITMUX_LOCK (splitmux);
1058         ctx->in_eos = TRUE;
1059
1060         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1061           goto beach;
1062
1063         if (ctx->is_reference) {
1064           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1065           /* Act as if this is a new keyframe with infinite timestamp */
1066           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
1067           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1068           /* Wake up other input pads to collect this GOP */
1069           GST_SPLITMUX_BROADCAST (splitmux);
1070           check_completed_gop (splitmux, ctx);
1071         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1072           /* If we are waiting for a GOP to be completed (ie, for aux
1073            * pads to catch up), then this pad is complete, so check
1074            * if the whole GOP is.
1075            */
1076           check_completed_gop (splitmux, ctx);
1077         }
1078         GST_SPLITMUX_UNLOCK (splitmux);
1079         break;
1080       default:
1081         break;
1082     }
1083     return GST_PAD_PROBE_PASS;
1084   }
1085
1086   buf = gst_pad_probe_info_get_buffer (info);
1087   buf_info = mq_stream_buf_new ();
1088
1089   if (GST_BUFFER_PTS_IS_VALID (buf))
1090     ts = GST_BUFFER_PTS (buf);
1091   else
1092     ts = GST_BUFFER_DTS (buf);
1093
1094   GST_SPLITMUX_LOCK (splitmux);
1095
1096   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1097     goto beach;
1098
1099   /* If this buffer has a timestamp, advance the input timestamp of the
1100    * stream */
1101   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1102     GstClockTime running_time =
1103         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1104         GST_BUFFER_TIMESTAMP (buf));
1105
1106     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1107         (ctx->in_running_time == GST_CLOCK_TIME_NONE
1108             || running_time > ctx->in_running_time))
1109       ctx->in_running_time = running_time;
1110   }
1111
1112   /* Try to make sure we have a valid running time */
1113   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1114     ctx->in_running_time =
1115         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1116         ctx->in_segment.start);
1117   }
1118
1119   buf_info->run_ts = ctx->in_running_time;
1120   buf_info->buf_size = gst_buffer_get_size (buf);
1121
1122   /* Update total input byte counter for overflow detect */
1123   ctx->in_bytes += buf_info->buf_size;
1124
1125   /* initialize mux_start_time */
1126   if (ctx->is_reference && splitmux->mux_start_time == 0)
1127     splitmux->mux_start_time = buf_info->run_ts;
1128
1129   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1130       " total in_bytes %" G_GSIZE_FORMAT,
1131       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1132
1133   loop_again = TRUE;
1134   do {
1135     if (ctx->flushing)
1136       break;
1137
1138     switch (splitmux->state) {
1139       case SPLITMUX_STATE_COLLECTING_GOP_START:
1140         if (ctx->is_reference) {
1141           /* If a keyframe, we have a complete GOP */
1142           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1143               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1144               splitmux->max_in_running_time >= ctx->in_running_time) {
1145             /* Pass this buffer through */
1146             loop_again = FALSE;
1147             break;
1148           }
1149           GST_INFO_OBJECT (pad,
1150               "Have keyframe with running time %" GST_TIME_FORMAT,
1151               GST_TIME_ARGS (ctx->in_running_time));
1152           keyframe = TRUE;
1153           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1154           splitmux->max_in_running_time = ctx->in_running_time;
1155           /* Wake up other input pads to collect this GOP */
1156           GST_SPLITMUX_BROADCAST (splitmux);
1157           check_completed_gop (splitmux, ctx);
1158         } else {
1159           /* We're still waiting for a keyframe on the reference pad, sleep */
1160           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1161           GST_SPLITMUX_WAIT (splitmux);
1162           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1163               splitmux->state);
1164         }
1165         break;
1166       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1167         /* After a GOP start is found, this buffer might complete the GOP */
1168         /* If we overran the target timestamp, it might be time to process
1169          * the GOP, otherwise bail out for more data
1170          */
1171         GST_LOG_OBJECT (pad,
1172             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1173             GST_TIME_ARGS (ctx->in_running_time),
1174             GST_TIME_ARGS (splitmux->max_in_running_time));
1175
1176         if (ctx->in_running_time < splitmux->max_in_running_time) {
1177           loop_again = FALSE;
1178           break;
1179         }
1180
1181         GST_LOG_OBJECT (pad,
1182             "Collected last packet of GOP. Checking other pads");
1183         check_completed_gop (splitmux, ctx);
1184         break;
1185       case SPLITMUX_STATE_ENDING_FILE:{
1186         GstEvent *event;
1187
1188         /* If somes streams received no buffer during the last GOP that overran,
1189          * because its next buffer has a timestamp bigger than
1190          * ctx->max_in_running_time, its queue is empty. In that case the only
1191          * way to wakeup the output thread is by injecting an event in the
1192          * queue. This usually happen with subtitle streams.
1193          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1194         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1195         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1196             GST_EVENT_TYPE_SERIALIZED,
1197             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1198                 G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
1199         gst_pad_send_event (ctx->sinkpad, event);
1200         /* fallthrough */
1201       }
1202       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1203         /* A fragment is ending, wait until that's done before continuing */
1204         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1205         GST_SPLITMUX_WAIT (splitmux);
1206         GST_DEBUG_OBJECT (pad,
1207             "Done sleeping for fragment restart state now %d", splitmux->state);
1208         break;
1209       default:
1210         loop_again = FALSE;
1211         break;
1212     }
1213   } while (loop_again);
1214
1215   if (keyframe) {
1216     splitmux->queued_gops++;
1217     buf_info->keyframe = TRUE;
1218   }
1219
1220   /* Now add this buffer to the queue just before returning */
1221   g_queue_push_head (&ctx->queued_bufs, buf_info);
1222
1223   /* Check the buffer will fit in the mq */
1224   check_queue_length (splitmux, ctx);
1225
1226   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1227       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1228
1229   GST_SPLITMUX_UNLOCK (splitmux);
1230   return GST_PAD_PROBE_PASS;
1231
1232 beach:
1233   GST_SPLITMUX_UNLOCK (splitmux);
1234   if (buf_info)
1235     mq_stream_buf_free (buf_info);
1236   return GST_PAD_PROBE_PASS;
1237 }
1238
1239 static GstPad *
1240 gst_splitmux_sink_request_new_pad (GstElement * element,
1241     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1242 {
1243   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1244   GstPadTemplate *mux_template = NULL;
1245   GstPad *res = NULL;
1246   GstPad *mq_sink, *mq_src;
1247   gchar *gname;
1248   gboolean is_video = FALSE;
1249   MqStreamCtx *ctx;
1250
1251   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1252
1253   GST_SPLITMUX_LOCK (splitmux);
1254   if (!create_elements (splitmux))
1255     goto fail;
1256
1257   if (templ->name_template) {
1258     if (g_str_equal (templ->name_template, "video")) {
1259       /* FIXME: Look for a pad template with matching caps, rather than by name */
1260       mux_template =
1261           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1262           (splitmux->muxer), "video_%u");
1263       is_video = TRUE;
1264       name = NULL;
1265     } else {
1266       mux_template =
1267           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1268           (splitmux->muxer), templ->name_template);
1269     }
1270     if (mux_template == NULL) {
1271       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1272       mux_template =
1273           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1274           (splitmux->muxer), "sink_%d");
1275     }
1276   }
1277
1278   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1279   if (res == NULL)
1280     goto fail;
1281
1282   if (is_video)
1283     gname = g_strdup ("video");
1284   else if (name == NULL)
1285     gname = gst_pad_get_name (res);
1286   else
1287     gname = g_strdup (name);
1288
1289   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1290     gst_element_release_request_pad (splitmux->muxer, res);
1291     gst_object_unref (GST_OBJECT (res));
1292     goto fail;
1293   }
1294
1295   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1296     gst_element_release_request_pad (splitmux->muxer, res);
1297     gst_object_unref (GST_OBJECT (res));
1298     gst_element_release_request_pad (splitmux->mq, mq_sink);
1299     gst_object_unref (GST_OBJECT (mq_sink));
1300     goto fail;
1301   }
1302
1303   gst_object_unref (GST_OBJECT (res));
1304
1305   ctx = mq_stream_ctx_new (splitmux);
1306   ctx->srcpad = mq_src;
1307   ctx->sinkpad = mq_sink;
1308
1309   mq_stream_ctx_ref (ctx);
1310   ctx->src_pad_block_id =
1311       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1312       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1313       _pad_block_destroy_src_notify);
1314   if (is_video && splitmux->reference_ctx != NULL) {
1315     splitmux->reference_ctx->is_reference = FALSE;
1316     splitmux->reference_ctx = NULL;
1317   }
1318   if (splitmux->reference_ctx == NULL) {
1319     splitmux->reference_ctx = ctx;
1320     ctx->is_reference = TRUE;
1321   }
1322
1323   res = gst_ghost_pad_new (gname, mq_sink);
1324   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1325
1326   mq_stream_ctx_ref (ctx);
1327   ctx->sink_pad_block_id =
1328       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1329       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1330       _pad_block_destroy_sink_notify);
1331
1332   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1333       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1334
1335   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1336
1337   g_free (gname);
1338
1339   gst_object_unref (mq_sink);
1340   gst_object_unref (mq_src);
1341
1342   gst_pad_set_active (res, TRUE);
1343   gst_element_add_pad (element, res);
1344   GST_SPLITMUX_UNLOCK (splitmux);
1345
1346   return res;
1347 fail:
1348   GST_SPLITMUX_UNLOCK (splitmux);
1349   return NULL;
1350 }
1351
1352 static void
1353 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1354 {
1355   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1356   GstPad *mqsink, *mqsrc, *muxpad;
1357   MqStreamCtx *ctx =
1358       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1359
1360   GST_SPLITMUX_LOCK (splitmux);
1361
1362   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1363     goto fail;                  /* Elements don't exist yet - nothing to release */
1364
1365   GST_INFO_OBJECT (pad, "releasing request pad");
1366
1367   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1368   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1369   muxpad = gst_pad_get_peer (mqsrc);
1370
1371   /* Remove the context from our consideration */
1372   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1373
1374   if (ctx->sink_pad_block_id)
1375     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1376
1377   if (ctx->src_pad_block_id)
1378     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1379
1380   /* Can release the context now */
1381   mq_stream_ctx_unref (ctx);
1382
1383   /* Release and free the mq input */
1384   gst_element_release_request_pad (splitmux->mq, mqsink);
1385
1386   /* Release and free the muxer input */
1387   gst_element_release_request_pad (splitmux->muxer, muxpad);
1388
1389   gst_object_unref (mqsink);
1390   gst_object_unref (mqsrc);
1391   gst_object_unref (muxpad);
1392
1393   gst_element_remove_pad (element, pad);
1394
1395   /* Reset the internal elements only after all request pads are released */
1396   if (splitmux->contexts == NULL)
1397     gst_splitmux_reset (splitmux);
1398
1399 fail:
1400   GST_SPLITMUX_UNLOCK (splitmux);
1401 }
1402
1403 static GstElement *
1404 create_element (GstSplitMuxSink * splitmux,
1405     const gchar * factory, const gchar * name)
1406 {
1407   GstElement *ret = gst_element_factory_make (factory, name);
1408   if (ret == NULL) {
1409     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1410     return NULL;
1411   }
1412
1413   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1414     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1415     gst_object_unref (ret);
1416     return NULL;
1417   }
1418
1419   return ret;
1420 }
1421
1422 static gboolean
1423 create_elements (GstSplitMuxSink * splitmux)
1424 {
1425   /* Create internal elements */
1426   if (splitmux->mq == NULL) {
1427     if ((splitmux->mq =
1428             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1429       goto fail;
1430
1431     splitmux->mq_max_buffers = 5;
1432     /* No bytes or time limit, we limit buffers manually */
1433     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1434         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1435   }
1436
1437   if (splitmux->muxer == NULL) {
1438     GstElement *provided_muxer = NULL;
1439
1440     GST_OBJECT_LOCK (splitmux);
1441     if (splitmux->provided_muxer != NULL)
1442       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1443     GST_OBJECT_UNLOCK (splitmux);
1444
1445     if (provided_muxer == NULL) {
1446       if ((splitmux->muxer =
1447               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1448         goto fail;
1449     } else {
1450       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1451         g_warning ("Could not add muxer element - splitmuxsink will not work");
1452         gst_object_unref (provided_muxer);
1453         goto fail;
1454       }
1455
1456       splitmux->muxer = provided_muxer;
1457       gst_object_unref (provided_muxer);
1458     }
1459   }
1460
1461   return TRUE;
1462 fail:
1463   return FALSE;
1464 }
1465
1466 static GstElement *
1467 find_sink (GstElement * e)
1468 {
1469   GstElement *res = NULL;
1470   GstIterator *iter;
1471   gboolean done = FALSE;
1472   GValue data = { 0, };
1473
1474   if (!GST_IS_BIN (e))
1475     return e;
1476
1477   iter = gst_bin_iterate_sinks (GST_BIN (e));
1478   while (!done) {
1479     switch (gst_iterator_next (iter, &data)) {
1480       case GST_ITERATOR_OK:
1481       {
1482         GstElement *child = g_value_get_object (&data);
1483         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1484                 "location") != NULL) {
1485           res = child;
1486           done = TRUE;
1487         }
1488         g_value_reset (&data);
1489         break;
1490       }
1491       case GST_ITERATOR_RESYNC:
1492         gst_iterator_resync (iter);
1493         break;
1494       case GST_ITERATOR_DONE:
1495         done = TRUE;
1496         break;
1497       case GST_ITERATOR_ERROR:
1498         g_assert_not_reached ();
1499         break;
1500     }
1501   }
1502   g_value_unset (&data);
1503   gst_iterator_free (iter);
1504
1505   return res;
1506 }
1507
1508 static gboolean
1509 create_sink (GstSplitMuxSink * splitmux)
1510 {
1511   GstElement *provided_sink = NULL;
1512
1513   if (splitmux->active_sink == NULL) {
1514
1515     GST_OBJECT_LOCK (splitmux);
1516     if (splitmux->provided_sink != NULL)
1517       provided_sink = gst_object_ref (splitmux->provided_sink);
1518     GST_OBJECT_UNLOCK (splitmux);
1519
1520     if (provided_sink == NULL) {
1521       if ((splitmux->sink =
1522               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1523         goto fail;
1524       splitmux->active_sink = splitmux->sink;
1525     } else {
1526       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1527         g_warning ("Could not add sink elements - splitmuxsink will not work");
1528         gst_object_unref (provided_sink);
1529         goto fail;
1530       }
1531
1532       splitmux->active_sink = provided_sink;
1533
1534       /* The bin holds a ref now, we can drop our tmp ref */
1535       gst_object_unref (provided_sink);
1536
1537       /* Find the sink element */
1538       splitmux->sink = find_sink (splitmux->active_sink);
1539       if (splitmux->sink == NULL) {
1540         g_warning
1541             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1542         goto fail;
1543       }
1544     }
1545
1546     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1547       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1548       goto fail;
1549     }
1550   }
1551
1552   return TRUE;
1553 fail:
1554   return FALSE;
1555 }
1556
1557 #ifdef __GNUC__
1558 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1559 #endif
1560 static void
1561 set_next_filename (GstSplitMuxSink * splitmux)
1562 {
1563   gchar *fname = NULL;
1564   gst_splitmux_sink_ensure_max_files (splitmux);
1565
1566   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1567       splitmux->fragment_id, &fname);
1568
1569   if (!fname)
1570     fname = splitmux->location ?
1571         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1572
1573   if (fname) {
1574     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1575     g_object_set (splitmux->sink, "location", fname, NULL);
1576     g_free (fname);
1577
1578     splitmux->fragment_id++;
1579   }
1580 }
1581
1582 static GstStateChangeReturn
1583 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1584 {
1585   GstStateChangeReturn ret;
1586   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1587
1588   switch (transition) {
1589     case GST_STATE_CHANGE_NULL_TO_READY:{
1590       GST_SPLITMUX_LOCK (splitmux);
1591       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1592         ret = GST_STATE_CHANGE_FAILURE;
1593         GST_SPLITMUX_UNLOCK (splitmux);
1594         goto beach;
1595       }
1596       GST_SPLITMUX_UNLOCK (splitmux);
1597       splitmux->fragment_id = 0;
1598       set_next_filename (splitmux);
1599       break;
1600     }
1601     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1602       GST_SPLITMUX_LOCK (splitmux);
1603       /* Start by collecting one input on each pad */
1604       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1605       splitmux->max_in_running_time = 0;
1606       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1607       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1608       splitmux->opening_first_fragment = TRUE;
1609       GST_SPLITMUX_UNLOCK (splitmux);
1610       break;
1611     }
1612     case GST_STATE_CHANGE_PAUSED_TO_READY:
1613     case GST_STATE_CHANGE_READY_TO_NULL:
1614       GST_SPLITMUX_LOCK (splitmux);
1615       splitmux->state = SPLITMUX_STATE_STOPPED;
1616       /* Wake up any blocked threads */
1617       GST_LOG_OBJECT (splitmux,
1618           "State change -> NULL or READY. Waking threads");
1619       GST_SPLITMUX_BROADCAST (splitmux);
1620       GST_SPLITMUX_UNLOCK (splitmux);
1621       break;
1622     default:
1623       break;
1624   }
1625
1626   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1627   if (ret == GST_STATE_CHANGE_FAILURE)
1628     goto beach;
1629
1630   switch (transition) {
1631     case GST_STATE_CHANGE_READY_TO_NULL:
1632       GST_SPLITMUX_LOCK (splitmux);
1633       splitmux->fragment_id = 0;
1634       /* Reset internal elements only if no pad contexts are using them */
1635       if (splitmux->contexts == NULL)
1636         gst_splitmux_reset (splitmux);
1637       GST_SPLITMUX_UNLOCK (splitmux);
1638       break;
1639     default:
1640       break;
1641   }
1642
1643 beach:
1644
1645   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1646       ret == GST_STATE_CHANGE_FAILURE) {
1647     /* Cleanup elements on failed transition out of NULL */
1648     gst_splitmux_reset (splitmux);
1649   }
1650   return ret;
1651 }
1652
1653 gboolean
1654 register_splitmuxsink (GstPlugin * plugin)
1655 {
1656   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1657       "Split File Muxing Sink");
1658
1659   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1660       GST_TYPE_SPLITMUX_SINK);
1661 }
1662
1663 static void
1664 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1665 {
1666   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1667     splitmux->fragment_id = 0;
1668   }
1669 }