splitmuxsink: Drop lock when sending dummy event
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MAX_FILES,
75   PROP_MUXER_OVERHEAD,
76   PROP_MUXER,
77   PROP_SINK
78 };
79
80 #define DEFAULT_MAX_SIZE_TIME       0
81 #define DEFAULT_MAX_SIZE_BYTES      0
82 #define DEFAULT_MAX_FILES           0
83 #define DEFAULT_MUXER_OVERHEAD      0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
86
87 enum
88 {
89   SIGNAL_FORMAT_LOCATION,
90   SIGNAL_LAST
91 };
92
93 static guint signals[SIGNAL_LAST];
94
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
102     GST_PAD_SINK,
103     GST_PAD_REQUEST,
104     GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
107     GST_PAD_SINK,
108     GST_PAD_REQUEST,
109     GST_STATIC_CAPS_ANY);
110
111 static GQuark PAD_CONTEXT;
112
113 static void
114 _do_init (void)
115 {
116   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
117 }
118
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
121     _do_init ());
122
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126     const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128     GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
131
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
135
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137     element, GstStateChange transition);
138
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
144
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
146
147 static MqStreamBuf *
148 mq_stream_buf_new (void)
149 {
150   return g_slice_new0 (MqStreamBuf);
151 }
152
153 static void
154 mq_stream_buf_free (MqStreamBuf * data)
155 {
156   g_slice_free (MqStreamBuf, data);
157 }
158
159 static void
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
161 {
162   GObjectClass *gobject_class = (GObjectClass *) klass;
163   GstElementClass *gstelement_class = (GstElementClass *) klass;
164   GstBinClass *gstbin_class = (GstBinClass *) klass;
165
166   gobject_class->set_property = gst_splitmux_sink_set_property;
167   gobject_class->get_property = gst_splitmux_sink_get_property;
168   gobject_class->dispose = gst_splitmux_sink_dispose;
169   gobject_class->finalize = gst_splitmux_sink_finalize;
170
171   gst_element_class_set_static_metadata (gstelement_class,
172       "Split Muxing Bin", "Generic/Bin/Muxer",
173       "Convenience bin that muxes incoming streams into multiple time/size limited files",
174       "Jan Schmidt <jan@centricular.com>");
175
176   gst_element_class_add_static_pad_template (gstelement_class,
177       &video_sink_template);
178   gst_element_class_add_static_pad_template (gstelement_class,
179       &audio_sink_template);
180   gst_element_class_add_static_pad_template (gstelement_class,
181       &subtitle_sink_template);
182
183   gstelement_class->change_state =
184       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185   gstelement_class->request_new_pad =
186       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187   gstelement_class->release_pad =
188       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
189
190   gstbin_class->handle_message = bus_handler;
191
192   g_object_class_install_property (gobject_class, PROP_LOCATION,
193       g_param_spec_string ("location", "File Output Pattern",
194           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197       g_param_spec_double ("mux-overhead", "Muxing Overhead",
198           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199           DEFAULT_MUXER_OVERHEAD,
200           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
201
202   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211       g_param_spec_uint ("max-files", "Max files",
212           "Maximum number of files to keep on disk. Once the maximum is reached,"
213           "old files start to be deleted to make room for new ones.",
214           0, G_MAXUINT, DEFAULT_MAX_FILES,
215           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
216
217
218   g_object_class_install_property (gobject_class, PROP_MUXER,
219       g_param_spec_object ("muxer", "Muxer",
220           "The muxer element to use (NULL = default mp4mux)",
221           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_SINK,
223       g_param_spec_object ("sink", "Sink",
224           "The sink element (or element chain) to use (NULL = default filesink)",
225           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   /**
228    * GstSplitMuxSink::format-location:
229    * @splitmux: the #GstSplitMuxSink
230    * @fragment_id: the sequence number of the file to be created
231    *
232    * Returns: the location to be used for the next output file
233    */
234   signals[SIGNAL_FORMAT_LOCATION] =
235       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
237 }
238
239 static void
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
241 {
242   g_mutex_init (&splitmux->lock);
243   g_cond_init (&splitmux->data_cond);
244
245   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248   splitmux->max_files = DEFAULT_MAX_FILES;
249
250   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251   g_object_set (splitmux, "async-handling", TRUE, NULL);
252 }
253
254 static void
255 gst_splitmux_reset (GstSplitMuxSink * splitmux)
256 {
257   if (splitmux->mq)
258     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
259   if (splitmux->muxer)
260     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
261   if (splitmux->active_sink)
262     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
263
264   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
265       NULL;
266 }
267
268 static void
269 gst_splitmux_sink_dispose (GObject * object)
270 {
271   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
272
273   G_OBJECT_CLASS (parent_class)->dispose (object);
274
275   /* Calling parent dispose invalidates all child pointers */
276   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
277       NULL;
278 }
279
280 static void
281 gst_splitmux_sink_finalize (GObject * object)
282 {
283   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
284   g_cond_clear (&splitmux->data_cond);
285   g_mutex_clear (&splitmux->lock);
286   if (splitmux->provided_sink)
287     gst_object_unref (splitmux->provided_sink);
288   if (splitmux->provided_muxer)
289     gst_object_unref (splitmux->provided_muxer);
290
291   g_free (splitmux->location);
292
293   /* Make sure to free any un-released contexts */
294   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
295   g_list_free (splitmux->contexts);
296
297   G_OBJECT_CLASS (parent_class)->finalize (object);
298 }
299
300 static void
301 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
302     const GValue * value, GParamSpec * pspec)
303 {
304   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
305
306   switch (prop_id) {
307     case PROP_LOCATION:{
308       GST_OBJECT_LOCK (splitmux);
309       g_free (splitmux->location);
310       splitmux->location = g_value_dup_string (value);
311       GST_OBJECT_UNLOCK (splitmux);
312       break;
313     }
314     case PROP_MAX_SIZE_BYTES:
315       GST_OBJECT_LOCK (splitmux);
316       splitmux->threshold_bytes = g_value_get_uint64 (value);
317       GST_OBJECT_UNLOCK (splitmux);
318       break;
319     case PROP_MAX_SIZE_TIME:
320       GST_OBJECT_LOCK (splitmux);
321       splitmux->threshold_time = g_value_get_uint64 (value);
322       GST_OBJECT_UNLOCK (splitmux);
323       break;
324     case PROP_MAX_FILES:
325       GST_OBJECT_LOCK (splitmux);
326       splitmux->max_files = g_value_get_uint (value);
327       GST_OBJECT_UNLOCK (splitmux);
328       break;
329     case PROP_MUXER_OVERHEAD:
330       GST_OBJECT_LOCK (splitmux);
331       splitmux->mux_overhead = g_value_get_double (value);
332       GST_OBJECT_UNLOCK (splitmux);
333       break;
334     case PROP_SINK:
335       GST_OBJECT_LOCK (splitmux);
336       if (splitmux->provided_sink)
337         gst_object_unref (splitmux->provided_sink);
338       splitmux->provided_sink = g_value_dup_object (value);
339       GST_OBJECT_UNLOCK (splitmux);
340       break;
341     case PROP_MUXER:
342       GST_OBJECT_LOCK (splitmux);
343       if (splitmux->provided_muxer)
344         gst_object_unref (splitmux->provided_muxer);
345       splitmux->provided_muxer = g_value_dup_object (value);
346       GST_OBJECT_UNLOCK (splitmux);
347       break;
348     default:
349       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
350       break;
351   }
352 }
353
354 static void
355 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
356     GValue * value, GParamSpec * pspec)
357 {
358   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
359
360   switch (prop_id) {
361     case PROP_LOCATION:
362       GST_OBJECT_LOCK (splitmux);
363       g_value_set_string (value, splitmux->location);
364       GST_OBJECT_UNLOCK (splitmux);
365       break;
366     case PROP_MAX_SIZE_BYTES:
367       GST_OBJECT_LOCK (splitmux);
368       g_value_set_uint64 (value, splitmux->threshold_bytes);
369       GST_OBJECT_UNLOCK (splitmux);
370       break;
371     case PROP_MAX_SIZE_TIME:
372       GST_OBJECT_LOCK (splitmux);
373       g_value_set_uint64 (value, splitmux->threshold_time);
374       GST_OBJECT_UNLOCK (splitmux);
375       break;
376     case PROP_MAX_FILES:
377       GST_OBJECT_LOCK (splitmux);
378       g_value_set_uint (value, splitmux->max_files);
379       GST_OBJECT_UNLOCK (splitmux);
380       break;
381     case PROP_MUXER_OVERHEAD:
382       GST_OBJECT_LOCK (splitmux);
383       g_value_set_double (value, splitmux->mux_overhead);
384       GST_OBJECT_UNLOCK (splitmux);
385       break;
386     case PROP_SINK:
387       GST_OBJECT_LOCK (splitmux);
388       g_value_set_object (value, splitmux->provided_sink);
389       GST_OBJECT_UNLOCK (splitmux);
390       break;
391     case PROP_MUXER:
392       GST_OBJECT_LOCK (splitmux);
393       g_value_set_object (value, splitmux->provided_muxer);
394       GST_OBJECT_UNLOCK (splitmux);
395       break;
396     default:
397       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
398       break;
399   }
400 }
401
402 static GstPad *
403 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
404 {
405   gchar *tmp, *sinkname, *srcname;
406   GstPad *mq_src;
407
408   sinkname = gst_pad_get_name (sink_pad);
409   tmp = sinkname + 5;
410   srcname = g_strdup_printf ("src_%s", tmp);
411
412   mq_src = gst_element_get_static_pad (mq, srcname);
413
414   g_free (sinkname);
415   g_free (srcname);
416
417   return mq_src;
418 }
419
420 static gboolean
421 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
422     GstPad ** src_pad)
423 {
424   GstPad *mq_sink;
425   GstPad *mq_src;
426
427   /* Request a pad from multiqueue, then connect this one, then
428    * discover the corresponding output pad and return both */
429   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
430   if (mq_sink == NULL)
431     return FALSE;
432
433   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
434   if (mq_src == NULL)
435     goto fail;
436
437   *sink_pad = mq_sink;
438   *src_pad = mq_src;
439
440   return TRUE;
441
442 fail:
443   gst_element_release_request_pad (splitmux->mq, mq_sink);
444   return FALSE;
445 }
446
447 static MqStreamCtx *
448 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
449 {
450   MqStreamCtx *ctx;
451
452   ctx = g_new0 (MqStreamCtx, 1);
453   g_atomic_int_set (&ctx->refcount, 1);
454   ctx->splitmux = splitmux;
455   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
456   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
457   ctx->in_running_time = ctx->out_running_time = 0;
458   g_queue_init (&ctx->queued_bufs);
459   return ctx;
460 }
461
462 static void
463 mq_stream_ctx_free (MqStreamCtx * ctx)
464 {
465   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
466   g_queue_clear (&ctx->queued_bufs);
467   g_free (ctx);
468 }
469
470 static void
471 mq_stream_ctx_unref (MqStreamCtx * ctx)
472 {
473   if (g_atomic_int_dec_and_test (&ctx->refcount))
474     mq_stream_ctx_free (ctx);
475 }
476
477 static void
478 mq_stream_ctx_ref (MqStreamCtx * ctx)
479 {
480   g_atomic_int_inc (&ctx->refcount);
481 }
482
483 static void
484 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
485 {
486   ctx->sink_pad_block_id = 0;
487   mq_stream_ctx_unref (ctx);
488 }
489
490 static void
491 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
492 {
493   ctx->src_pad_block_id = 0;
494   mq_stream_ctx_unref (ctx);
495 }
496
497 static void
498 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
499 {
500   gchar *location = NULL;
501   GstMessage *msg;
502   const gchar *msg_name = opened ?
503       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
504
505   g_object_get (splitmux->sink, "location", &location, NULL);
506
507   msg = gst_message_new_element (GST_OBJECT (splitmux),
508       gst_structure_new (msg_name,
509           "location", G_TYPE_STRING, location,
510           "running-time", GST_TYPE_CLOCK_TIME,
511           splitmux->reference_ctx->out_running_time, NULL));
512   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
513
514   g_free (location);
515 }
516
517 /* Called with lock held, drops the lock to send EOS to the
518  * pad
519  */
520 static void
521 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
522 {
523   GstEvent *eos;
524   GstPad *pad;
525
526   eos = gst_event_new_eos ();
527   pad = gst_pad_get_peer (ctx->srcpad);
528
529   ctx->out_eos = TRUE;
530
531   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
532   GST_SPLITMUX_UNLOCK (splitmux);
533   gst_pad_send_event (pad, eos);
534   GST_SPLITMUX_LOCK (splitmux);
535
536   gst_object_unref (pad);
537 }
538
539 /* Called with splitmux lock held to check if this output
540  * context needs to sleep to wait for the release of the
541  * next GOP, or to send EOS to close out the current file
542  */
543 static void
544 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
545 {
546   do {
547
548     GST_LOG_OBJECT (ctx->srcpad,
549         "Checking running time %" GST_TIME_FORMAT " against max %"
550         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
551         GST_TIME_ARGS (splitmux->max_out_running_time));
552
553     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
554         ctx->out_running_time < splitmux->max_out_running_time) {
555       splitmux->have_muxed_something = TRUE;
556       return;
557     }
558
559     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
560       return;
561
562     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
563       if (ctx->out_eos == FALSE) {
564         send_eos (splitmux, ctx);
565         continue;
566       }
567     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
568       start_next_fragment (splitmux);
569       continue;
570     }
571
572     GST_INFO_OBJECT (ctx->srcpad,
573         "Sleeping for running time %"
574         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
575         GST_TIME_ARGS (ctx->out_running_time),
576         GST_TIME_ARGS (splitmux->max_out_running_time));
577     ctx->out_blocked = TRUE;
578     /* Expand the mq if needed before sleeping */
579     check_queue_length (splitmux, ctx);
580     GST_SPLITMUX_WAIT (splitmux);
581     ctx->out_blocked = FALSE;
582     GST_INFO_OBJECT (ctx->srcpad,
583         "Woken for new max running time %" GST_TIME_FORMAT,
584         GST_TIME_ARGS (splitmux->max_out_running_time));
585   } while (1);
586 }
587
588 static GstPadProbeReturn
589 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
590 {
591   GstSplitMuxSink *splitmux = ctx->splitmux;
592   MqStreamBuf *buf_info = NULL;
593
594   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
595
596   /* FIXME: Handle buffer lists, until then make it clear they won't work */
597   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
598     g_warning ("Buffer list handling not implemented");
599     return GST_PAD_PROBE_DROP;
600   }
601   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
602     GstEvent *event = gst_pad_probe_info_get_event (info);
603
604     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
605
606     switch (GST_EVENT_TYPE (event)) {
607       case GST_EVENT_SEGMENT:
608         gst_event_copy_segment (event, &ctx->out_segment);
609         break;
610       case GST_EVENT_FLUSH_STOP:
611         GST_SPLITMUX_LOCK (splitmux);
612         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
613         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
614         g_queue_clear (&ctx->queued_bufs);
615         ctx->flushing = FALSE;
616         GST_SPLITMUX_UNLOCK (splitmux);
617         break;
618       case GST_EVENT_FLUSH_START:
619         GST_SPLITMUX_LOCK (splitmux);
620         GST_LOG_OBJECT (pad, "Flush start");
621         ctx->flushing = TRUE;
622         GST_SPLITMUX_BROADCAST (splitmux);
623         GST_SPLITMUX_UNLOCK (splitmux);
624         break;
625       case GST_EVENT_EOS:
626         GST_SPLITMUX_LOCK (splitmux);
627         if (splitmux->state == SPLITMUX_STATE_STOPPED)
628           goto beach;
629         ctx->out_eos = TRUE;
630         GST_SPLITMUX_UNLOCK (splitmux);
631         break;
632       case GST_EVENT_GAP:{
633         GstClockTime gap_ts;
634
635         gst_event_parse_gap (event, &gap_ts, NULL);
636         if (gap_ts == GST_CLOCK_TIME_NONE)
637           break;
638
639         GST_SPLITMUX_LOCK (splitmux);
640
641         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
642             GST_FORMAT_TIME, gap_ts);
643
644         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
645             GST_TIME_ARGS (gap_ts));
646
647         if (splitmux->state == SPLITMUX_STATE_STOPPED)
648           goto beach;
649         ctx->out_running_time = gap_ts;
650         complete_or_wait_on_out (splitmux, ctx);
651         GST_SPLITMUX_UNLOCK (splitmux);
652         break;
653       }
654       case GST_EVENT_CUSTOM_DOWNSTREAM:{
655         const GstStructure *s;
656         GstClockTime ts = 0;
657
658         s = gst_event_get_structure (event);
659         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
660           break;
661
662         gst_structure_get_uint64 (s, "timestamp", &ts);
663
664         GST_SPLITMUX_LOCK (splitmux);
665
666         if (splitmux->state == SPLITMUX_STATE_STOPPED)
667           goto beach;
668         ctx->out_running_time = ts;
669         complete_or_wait_on_out (splitmux, ctx);
670         GST_SPLITMUX_UNLOCK (splitmux);
671         return GST_PAD_PROBE_DROP;
672       }
673       default:
674         break;
675     }
676     return GST_PAD_PROBE_PASS;
677   }
678
679   /* Allow everything through until the configured next stopping point */
680   GST_SPLITMUX_LOCK (splitmux);
681
682   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
683   if (buf_info == NULL)
684     /* Can only happen due to a poorly timed flush */
685     goto beach;
686
687   /* If we have popped a keyframe, decrement the queued_gop count */
688   if (buf_info->keyframe && splitmux->queued_gops > 0)
689     splitmux->queued_gops--;
690
691   ctx->out_running_time = buf_info->run_ts;
692
693   GST_LOG_OBJECT (splitmux,
694       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
695       " size %" G_GSIZE_FORMAT,
696       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
697
698   if (splitmux->opening_first_fragment) {
699     send_fragment_opened_closed_msg (splitmux, TRUE);
700     splitmux->opening_first_fragment = FALSE;
701   }
702
703   complete_or_wait_on_out (splitmux, ctx);
704
705   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
706       splitmux->muxed_out_time < buf_info->run_ts)
707     splitmux->muxed_out_time = buf_info->run_ts;
708
709   splitmux->muxed_out_bytes += buf_info->buf_size;
710
711 #ifndef GST_DISABLE_GST_DEBUG
712   {
713     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
714     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
715         " run ts %" GST_TIME_FORMAT, buf,
716         GST_TIME_ARGS (ctx->out_running_time));
717   }
718 #endif
719
720   GST_SPLITMUX_UNLOCK (splitmux);
721
722   mq_stream_buf_free (buf_info);
723
724   return GST_PAD_PROBE_PASS;
725
726 beach:
727   GST_SPLITMUX_UNLOCK (splitmux);
728   return GST_PAD_PROBE_DROP;
729 }
730
731 static gboolean
732 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
733 {
734   return gst_pad_send_event (peer, gst_event_ref (*event));
735 }
736
737 static void
738 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
739 {
740   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
741
742   gst_pad_sticky_events_foreach (ctx->srcpad,
743       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
744
745   /* Clear EOS flag if not actually EOS */
746   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
747
748   gst_object_unref (peer);
749 }
750
751 /* Called with lock held when a fragment
752  * reaches EOS and it is time to restart
753  * a new fragment
754  */
755 static void
756 start_next_fragment (GstSplitMuxSink * splitmux)
757 {
758   /* 1 change to new file */
759   gst_element_set_locked_state (splitmux->muxer, TRUE);
760   gst_element_set_locked_state (splitmux->active_sink, TRUE);
761   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
762   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
763
764   set_next_filename (splitmux);
765
766   gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
767   gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
768   gst_element_set_locked_state (splitmux->muxer, FALSE);
769   gst_element_set_locked_state (splitmux->active_sink, FALSE);
770
771   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
772
773   /* Switch state and go back to processing */
774   if (!splitmux->reference_ctx->in_eos) {
775     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
776     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
777   } else {
778     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
779     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
780     splitmux->have_muxed_something = FALSE;
781   }
782   splitmux->have_muxed_something =
783       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
784
785   /* Store the overflow parameters as the basis for the next fragment */
786   splitmux->mux_start_time = splitmux->muxed_out_time;
787   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
788
789   GST_DEBUG_OBJECT (splitmux,
790       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
791       GST_TIME_ARGS (splitmux->max_out_running_time));
792
793   send_fragment_opened_closed_msg (splitmux, TRUE);
794
795   GST_SPLITMUX_BROADCAST (splitmux);
796 }
797
798 static void
799 bus_handler (GstBin * bin, GstMessage * message)
800 {
801   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
802
803   switch (GST_MESSAGE_TYPE (message)) {
804     case GST_MESSAGE_EOS:
805       /* If the state is draining out the current file, drop this EOS */
806       GST_SPLITMUX_LOCK (splitmux);
807
808       send_fragment_opened_closed_msg (splitmux, FALSE);
809
810       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
811           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
812         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
813         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
814         GST_SPLITMUX_BROADCAST (splitmux);
815
816         gst_message_unref (message);
817         GST_SPLITMUX_UNLOCK (splitmux);
818         return;
819       }
820       GST_SPLITMUX_UNLOCK (splitmux);
821       break;
822     default:
823       break;
824   }
825
826   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
827 }
828
829 /* Called with splitmux lock held */
830 /* Called when entering ProcessingCompleteGop state
831  * Assess if mq contents overflowed the current file
832  *   -> If yes, need to switch to new file
833  *   -> if no, set max_out_running_time to let this GOP in and
834  *      go to COLLECTING_GOP_START state
835  */
836 static void
837 handle_gathered_gop (GstSplitMuxSink * splitmux)
838 {
839   GList *cur;
840   gsize queued_bytes = 0;
841   GstClockTime queued_time = 0;
842
843   /* Assess if the multiqueue contents overflowed the current file */
844   for (cur = g_list_first (splitmux->contexts);
845       cur != NULL; cur = g_list_next (cur)) {
846     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
847     if (tmpctx->in_running_time > queued_time)
848       queued_time = tmpctx->in_running_time;
849     queued_bytes += tmpctx->in_bytes;
850   }
851
852   g_assert (queued_bytes >= splitmux->mux_start_bytes);
853   g_assert (queued_time >= splitmux->mux_start_time);
854
855   queued_bytes -= splitmux->mux_start_bytes;
856   queued_time -= splitmux->mux_start_time;
857
858   /* Expand queued bytes estimate by muxer overhead */
859   queued_bytes += (queued_bytes * splitmux->mux_overhead);
860
861   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
862       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
863
864   /* Check for overrun - have we output at least one byte and overrun
865    * either threshold? */
866   if ((splitmux->have_muxed_something &&
867           ((splitmux->threshold_bytes > 0 &&
868                   queued_bytes >= splitmux->threshold_bytes) ||
869               (splitmux->threshold_time > 0 &&
870                   queued_time >= splitmux->threshold_time)))) {
871
872     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
873
874     GST_INFO_OBJECT (splitmux,
875         "mq overflowed since last, draining out. max out TS is %"
876         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
877     GST_SPLITMUX_BROADCAST (splitmux);
878
879   } else {
880     /* No overflow */
881     GST_LOG_OBJECT (splitmux,
882         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
883         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
884         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
885         queued_bytes, GST_TIME_ARGS (queued_time));
886
887     /* Wake everyone up to push this one GOP, then sleep */
888     splitmux->have_muxed_something = TRUE;
889
890     if (!splitmux->reference_ctx->in_eos) {
891       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
892       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
893     } else {
894       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
895       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
896     }
897
898     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
899         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
900     GST_SPLITMUX_BROADCAST (splitmux);
901   }
902
903 }
904
905 /* Called with splitmux lock held */
906 /* Called from each input pad when it is has all the pieces
907  * for a GOP or EOS, starting with the reference pad which has set the
908  * splitmux->max_in_running_time
909  */
910 static void
911 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
912 {
913   GList *cur;
914   gboolean ready = TRUE;
915   GstClockTime current_max_in_running_time;
916
917   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
918     /* Iterate each pad, and check that the input running time is at least
919      * up to the reference running time, and if so handle the collected GOP */
920     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
921         GST_TIME_FORMAT " ctx %p",
922         GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
923     for (cur = g_list_first (splitmux->contexts); cur != NULL;
924         cur = g_list_next (cur)) {
925       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
926
927       GST_LOG_OBJECT (splitmux,
928           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
929           " EOS %d", tmpctx, tmpctx->srcpad,
930           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
931
932       if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
933           tmpctx->in_running_time < splitmux->max_in_running_time &&
934           !tmpctx->in_eos) {
935         GST_LOG_OBJECT (splitmux,
936             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
937             tmpctx, tmpctx->srcpad);
938         ready = FALSE;
939         break;
940       }
941     }
942     if (ready) {
943       GST_DEBUG_OBJECT (splitmux,
944           "Collected GOP is complete. Processing (ctx %p)", ctx);
945       /* All pads have a complete GOP, release it into the multiqueue */
946       handle_gathered_gop (splitmux);
947     }
948   }
949
950   /* If upstream reached EOS we are not expecting more data, no need to wait
951    * here. */
952   if (ctx->in_eos)
953     return;
954
955   /* Some pad is not yet ready, or GOP is being pushed
956    * either way, sleep and wait to get woken */
957   current_max_in_running_time = splitmux->max_in_running_time;
958   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
959           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
960       !ctx->flushing &&
961       (current_max_in_running_time == splitmux->max_in_running_time)) {
962
963     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
964         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
965         "GOP complete" : "EOF draining", ctx);
966     GST_SPLITMUX_WAIT (splitmux);
967
968     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
969   }
970 }
971
972 static void
973 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
974 {
975   GList *cur;
976   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
977
978   GST_DEBUG_OBJECT (ctx->sinkpad,
979       "Checking queue length len %u cur_max %u queued gops %u",
980       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
981
982   if (cur_len >= splitmux->mq_max_buffers) {
983     gboolean allow_grow = FALSE;
984
985     /* If collecting a GOP and this pad might block,
986      * and there isn't already a pending GOP in the queue
987      * then grow
988      */
989     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
990         ctx->in_running_time < splitmux->max_in_running_time &&
991         splitmux->queued_gops <= 1) {
992       allow_grow = TRUE;
993     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
994         ctx->is_reference && splitmux->queued_gops <= 1) {
995       allow_grow = TRUE;
996     }
997
998     if (!allow_grow) {
999       for (cur = g_list_first (splitmux->contexts);
1000           cur != NULL; cur = g_list_next (cur)) {
1001         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1002         GST_DEBUG_OBJECT (tmpctx->sinkpad,
1003             " len %u out_blocked %d",
1004             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1005         /* If another stream is starving, grow */
1006         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1007           allow_grow = TRUE;
1008         }
1009       }
1010     }
1011
1012     if (allow_grow) {
1013       splitmux->mq_max_buffers = cur_len + 1;
1014
1015       GST_INFO_OBJECT (splitmux,
1016           "Multiqueue overrun - enlarging to %u buffers ctx %p",
1017           splitmux->mq_max_buffers, ctx);
1018
1019       g_object_set (splitmux->mq, "max-size-buffers",
1020           splitmux->mq_max_buffers, NULL);
1021     }
1022   }
1023 }
1024
1025 static GstPadProbeReturn
1026 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1027 {
1028   GstSplitMuxSink *splitmux = ctx->splitmux;
1029   GstBuffer *buf;
1030   MqStreamBuf *buf_info = NULL;
1031   GstClockTime ts;
1032   gboolean loop_again;
1033   gboolean keyframe = FALSE;
1034
1035   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1036
1037   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1038   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1039     g_warning ("Buffer list handling not implemented");
1040     return GST_PAD_PROBE_DROP;
1041   }
1042   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1043     GstEvent *event = gst_pad_probe_info_get_event (info);
1044     switch (GST_EVENT_TYPE (event)) {
1045       case GST_EVENT_SEGMENT:
1046         gst_event_copy_segment (event, &ctx->in_segment);
1047         break;
1048       case GST_EVENT_FLUSH_STOP:
1049         GST_SPLITMUX_LOCK (splitmux);
1050         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1051         ctx->in_eos = FALSE;
1052         ctx->in_bytes = 0;
1053         ctx->in_running_time = 0;
1054         GST_SPLITMUX_UNLOCK (splitmux);
1055         break;
1056       case GST_EVENT_EOS:
1057         GST_SPLITMUX_LOCK (splitmux);
1058         ctx->in_eos = TRUE;
1059
1060         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1061           goto beach;
1062
1063         if (ctx->is_reference) {
1064           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1065           /* Act as if this is a new keyframe with infinite timestamp */
1066           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
1067           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1068           /* Wake up other input pads to collect this GOP */
1069           GST_SPLITMUX_BROADCAST (splitmux);
1070           check_completed_gop (splitmux, ctx);
1071         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1072           /* If we are waiting for a GOP to be completed (ie, for aux
1073            * pads to catch up), then this pad is complete, so check
1074            * if the whole GOP is.
1075            */
1076           check_completed_gop (splitmux, ctx);
1077         }
1078         GST_SPLITMUX_UNLOCK (splitmux);
1079         break;
1080       default:
1081         break;
1082     }
1083     return GST_PAD_PROBE_PASS;
1084   }
1085
1086   buf = gst_pad_probe_info_get_buffer (info);
1087   buf_info = mq_stream_buf_new ();
1088
1089   if (GST_BUFFER_PTS_IS_VALID (buf))
1090     ts = GST_BUFFER_PTS (buf);
1091   else
1092     ts = GST_BUFFER_DTS (buf);
1093
1094   GST_SPLITMUX_LOCK (splitmux);
1095
1096   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1097     goto beach;
1098
1099   /* If this buffer has a timestamp, advance the input timestamp of the
1100    * stream */
1101   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1102     GstClockTime running_time =
1103         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1104         GST_BUFFER_TIMESTAMP (buf));
1105
1106     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1107         (ctx->in_running_time == GST_CLOCK_TIME_NONE
1108             || running_time > ctx->in_running_time))
1109       ctx->in_running_time = running_time;
1110   }
1111
1112   /* Try to make sure we have a valid running time */
1113   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1114     ctx->in_running_time =
1115         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1116         ctx->in_segment.start);
1117   }
1118
1119   buf_info->run_ts = ctx->in_running_time;
1120   buf_info->buf_size = gst_buffer_get_size (buf);
1121
1122   /* Update total input byte counter for overflow detect */
1123   ctx->in_bytes += buf_info->buf_size;
1124
1125   /* initialize mux_start_time */
1126   if (ctx->is_reference && splitmux->mux_start_time == 0)
1127     splitmux->mux_start_time = buf_info->run_ts;
1128
1129   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1130       " total in_bytes %" G_GSIZE_FORMAT,
1131       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1132
1133   loop_again = TRUE;
1134   do {
1135     if (ctx->flushing)
1136       break;
1137
1138     switch (splitmux->state) {
1139       case SPLITMUX_STATE_COLLECTING_GOP_START:
1140         if (ctx->is_reference) {
1141           /* If a keyframe, we have a complete GOP */
1142           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1143               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1144               splitmux->max_in_running_time >= ctx->in_running_time) {
1145             /* Pass this buffer through */
1146             loop_again = FALSE;
1147             break;
1148           }
1149           GST_INFO_OBJECT (pad,
1150               "Have keyframe with running time %" GST_TIME_FORMAT,
1151               GST_TIME_ARGS (ctx->in_running_time));
1152           keyframe = TRUE;
1153           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1154           splitmux->max_in_running_time = ctx->in_running_time;
1155           /* Wake up other input pads to collect this GOP */
1156           GST_SPLITMUX_BROADCAST (splitmux);
1157           check_completed_gop (splitmux, ctx);
1158         } else {
1159           /* We're still waiting for a keyframe on the reference pad, sleep */
1160           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1161           GST_SPLITMUX_WAIT (splitmux);
1162           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1163               splitmux->state);
1164         }
1165         break;
1166       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1167         /* After a GOP start is found, this buffer might complete the GOP */
1168         /* If we overran the target timestamp, it might be time to process
1169          * the GOP, otherwise bail out for more data
1170          */
1171         GST_LOG_OBJECT (pad,
1172             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1173             GST_TIME_ARGS (ctx->in_running_time),
1174             GST_TIME_ARGS (splitmux->max_in_running_time));
1175
1176         if (ctx->in_running_time < splitmux->max_in_running_time) {
1177           loop_again = FALSE;
1178           break;
1179         }
1180
1181         GST_LOG_OBJECT (pad,
1182             "Collected last packet of GOP. Checking other pads");
1183         check_completed_gop (splitmux, ctx);
1184         break;
1185       case SPLITMUX_STATE_ENDING_FILE:{
1186         GstEvent *event;
1187
1188         /* If somes streams received no buffer during the last GOP that overran,
1189          * because its next buffer has a timestamp bigger than
1190          * ctx->max_in_running_time, its queue is empty. In that case the only
1191          * way to wakeup the output thread is by injecting an event in the
1192          * queue. This usually happen with subtitle streams.
1193          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1194         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1195         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1196             GST_EVENT_TYPE_SERIALIZED,
1197             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1198                 G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
1199
1200         GST_SPLITMUX_UNLOCK (splitmux);
1201         gst_pad_send_event (ctx->sinkpad, event);
1202         GST_SPLITMUX_LOCK (splitmux);
1203         /* fallthrough */
1204       }
1205       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1206         /* A fragment is ending, wait until that's done before continuing */
1207         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1208         GST_SPLITMUX_WAIT (splitmux);
1209         GST_DEBUG_OBJECT (pad,
1210             "Done sleeping for fragment restart state now %d", splitmux->state);
1211         break;
1212       default:
1213         loop_again = FALSE;
1214         break;
1215     }
1216   } while (loop_again);
1217
1218   if (keyframe) {
1219     splitmux->queued_gops++;
1220     buf_info->keyframe = TRUE;
1221   }
1222
1223   /* Now add this buffer to the queue just before returning */
1224   g_queue_push_head (&ctx->queued_bufs, buf_info);
1225
1226   /* Check the buffer will fit in the mq */
1227   check_queue_length (splitmux, ctx);
1228
1229   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1230       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1231
1232   GST_SPLITMUX_UNLOCK (splitmux);
1233   return GST_PAD_PROBE_PASS;
1234
1235 beach:
1236   GST_SPLITMUX_UNLOCK (splitmux);
1237   if (buf_info)
1238     mq_stream_buf_free (buf_info);
1239   return GST_PAD_PROBE_PASS;
1240 }
1241
1242 static GstPad *
1243 gst_splitmux_sink_request_new_pad (GstElement * element,
1244     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1245 {
1246   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1247   GstPadTemplate *mux_template = NULL;
1248   GstPad *res = NULL;
1249   GstPad *mq_sink, *mq_src;
1250   gchar *gname;
1251   gboolean is_video = FALSE;
1252   MqStreamCtx *ctx;
1253
1254   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1255
1256   GST_SPLITMUX_LOCK (splitmux);
1257   if (!create_elements (splitmux))
1258     goto fail;
1259
1260   if (templ->name_template) {
1261     if (g_str_equal (templ->name_template, "video")) {
1262       /* FIXME: Look for a pad template with matching caps, rather than by name */
1263       mux_template =
1264           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1265           (splitmux->muxer), "video_%u");
1266       is_video = TRUE;
1267       name = NULL;
1268     } else {
1269       mux_template =
1270           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1271           (splitmux->muxer), templ->name_template);
1272     }
1273     if (mux_template == NULL) {
1274       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1275       mux_template =
1276           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1277           (splitmux->muxer), "sink_%d");
1278     }
1279   }
1280
1281   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1282   if (res == NULL)
1283     goto fail;
1284
1285   if (is_video)
1286     gname = g_strdup ("video");
1287   else if (name == NULL)
1288     gname = gst_pad_get_name (res);
1289   else
1290     gname = g_strdup (name);
1291
1292   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1293     gst_element_release_request_pad (splitmux->muxer, res);
1294     gst_object_unref (GST_OBJECT (res));
1295     goto fail;
1296   }
1297
1298   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1299     gst_element_release_request_pad (splitmux->muxer, res);
1300     gst_object_unref (GST_OBJECT (res));
1301     gst_element_release_request_pad (splitmux->mq, mq_sink);
1302     gst_object_unref (GST_OBJECT (mq_sink));
1303     goto fail;
1304   }
1305
1306   gst_object_unref (GST_OBJECT (res));
1307
1308   ctx = mq_stream_ctx_new (splitmux);
1309   ctx->srcpad = mq_src;
1310   ctx->sinkpad = mq_sink;
1311
1312   mq_stream_ctx_ref (ctx);
1313   ctx->src_pad_block_id =
1314       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1315       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1316       _pad_block_destroy_src_notify);
1317   if (is_video && splitmux->reference_ctx != NULL) {
1318     splitmux->reference_ctx->is_reference = FALSE;
1319     splitmux->reference_ctx = NULL;
1320   }
1321   if (splitmux->reference_ctx == NULL) {
1322     splitmux->reference_ctx = ctx;
1323     ctx->is_reference = TRUE;
1324   }
1325
1326   res = gst_ghost_pad_new (gname, mq_sink);
1327   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1328
1329   mq_stream_ctx_ref (ctx);
1330   ctx->sink_pad_block_id =
1331       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1332       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1333       _pad_block_destroy_sink_notify);
1334
1335   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1336       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1337
1338   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1339
1340   g_free (gname);
1341
1342   gst_object_unref (mq_sink);
1343   gst_object_unref (mq_src);
1344
1345   gst_pad_set_active (res, TRUE);
1346   gst_element_add_pad (element, res);
1347   GST_SPLITMUX_UNLOCK (splitmux);
1348
1349   return res;
1350 fail:
1351   GST_SPLITMUX_UNLOCK (splitmux);
1352   return NULL;
1353 }
1354
1355 static void
1356 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1357 {
1358   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1359   GstPad *mqsink, *mqsrc, *muxpad;
1360   MqStreamCtx *ctx =
1361       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1362
1363   GST_SPLITMUX_LOCK (splitmux);
1364
1365   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1366     goto fail;                  /* Elements don't exist yet - nothing to release */
1367
1368   GST_INFO_OBJECT (pad, "releasing request pad");
1369
1370   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1371   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1372   muxpad = gst_pad_get_peer (mqsrc);
1373
1374   /* Remove the context from our consideration */
1375   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1376
1377   if (ctx->sink_pad_block_id)
1378     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1379
1380   if (ctx->src_pad_block_id)
1381     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1382
1383   /* Can release the context now */
1384   mq_stream_ctx_unref (ctx);
1385
1386   /* Release and free the mq input */
1387   gst_element_release_request_pad (splitmux->mq, mqsink);
1388
1389   /* Release and free the muxer input */
1390   gst_element_release_request_pad (splitmux->muxer, muxpad);
1391
1392   gst_object_unref (mqsink);
1393   gst_object_unref (mqsrc);
1394   gst_object_unref (muxpad);
1395
1396   gst_element_remove_pad (element, pad);
1397
1398   /* Reset the internal elements only after all request pads are released */
1399   if (splitmux->contexts == NULL)
1400     gst_splitmux_reset (splitmux);
1401
1402 fail:
1403   GST_SPLITMUX_UNLOCK (splitmux);
1404 }
1405
1406 static GstElement *
1407 create_element (GstSplitMuxSink * splitmux,
1408     const gchar * factory, const gchar * name)
1409 {
1410   GstElement *ret = gst_element_factory_make (factory, name);
1411   if (ret == NULL) {
1412     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1413     return NULL;
1414   }
1415
1416   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1417     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1418     gst_object_unref (ret);
1419     return NULL;
1420   }
1421
1422   return ret;
1423 }
1424
1425 static gboolean
1426 create_elements (GstSplitMuxSink * splitmux)
1427 {
1428   /* Create internal elements */
1429   if (splitmux->mq == NULL) {
1430     if ((splitmux->mq =
1431             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1432       goto fail;
1433
1434     splitmux->mq_max_buffers = 5;
1435     /* No bytes or time limit, we limit buffers manually */
1436     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1437         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1438   }
1439
1440   if (splitmux->muxer == NULL) {
1441     GstElement *provided_muxer = NULL;
1442
1443     GST_OBJECT_LOCK (splitmux);
1444     if (splitmux->provided_muxer != NULL)
1445       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1446     GST_OBJECT_UNLOCK (splitmux);
1447
1448     if (provided_muxer == NULL) {
1449       if ((splitmux->muxer =
1450               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1451         goto fail;
1452     } else {
1453       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1454         g_warning ("Could not add muxer element - splitmuxsink will not work");
1455         gst_object_unref (provided_muxer);
1456         goto fail;
1457       }
1458
1459       splitmux->muxer = provided_muxer;
1460       gst_object_unref (provided_muxer);
1461     }
1462   }
1463
1464   return TRUE;
1465 fail:
1466   return FALSE;
1467 }
1468
1469 static GstElement *
1470 find_sink (GstElement * e)
1471 {
1472   GstElement *res = NULL;
1473   GstIterator *iter;
1474   gboolean done = FALSE;
1475   GValue data = { 0, };
1476
1477   if (!GST_IS_BIN (e))
1478     return e;
1479
1480   iter = gst_bin_iterate_sinks (GST_BIN (e));
1481   while (!done) {
1482     switch (gst_iterator_next (iter, &data)) {
1483       case GST_ITERATOR_OK:
1484       {
1485         GstElement *child = g_value_get_object (&data);
1486         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1487                 "location") != NULL) {
1488           res = child;
1489           done = TRUE;
1490         }
1491         g_value_reset (&data);
1492         break;
1493       }
1494       case GST_ITERATOR_RESYNC:
1495         gst_iterator_resync (iter);
1496         break;
1497       case GST_ITERATOR_DONE:
1498         done = TRUE;
1499         break;
1500       case GST_ITERATOR_ERROR:
1501         g_assert_not_reached ();
1502         break;
1503     }
1504   }
1505   g_value_unset (&data);
1506   gst_iterator_free (iter);
1507
1508   return res;
1509 }
1510
1511 static gboolean
1512 create_sink (GstSplitMuxSink * splitmux)
1513 {
1514   GstElement *provided_sink = NULL;
1515
1516   if (splitmux->active_sink == NULL) {
1517
1518     GST_OBJECT_LOCK (splitmux);
1519     if (splitmux->provided_sink != NULL)
1520       provided_sink = gst_object_ref (splitmux->provided_sink);
1521     GST_OBJECT_UNLOCK (splitmux);
1522
1523     if (provided_sink == NULL) {
1524       if ((splitmux->sink =
1525               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1526         goto fail;
1527       splitmux->active_sink = splitmux->sink;
1528     } else {
1529       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1530         g_warning ("Could not add sink elements - splitmuxsink will not work");
1531         gst_object_unref (provided_sink);
1532         goto fail;
1533       }
1534
1535       splitmux->active_sink = provided_sink;
1536
1537       /* The bin holds a ref now, we can drop our tmp ref */
1538       gst_object_unref (provided_sink);
1539
1540       /* Find the sink element */
1541       splitmux->sink = find_sink (splitmux->active_sink);
1542       if (splitmux->sink == NULL) {
1543         g_warning
1544             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1545         goto fail;
1546       }
1547     }
1548
1549     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1550       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1551       goto fail;
1552     }
1553   }
1554
1555   return TRUE;
1556 fail:
1557   return FALSE;
1558 }
1559
1560 #ifdef __GNUC__
1561 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1562 #endif
1563 static void
1564 set_next_filename (GstSplitMuxSink * splitmux)
1565 {
1566   gchar *fname = NULL;
1567   gst_splitmux_sink_ensure_max_files (splitmux);
1568
1569   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1570       splitmux->fragment_id, &fname);
1571
1572   if (!fname)
1573     fname = splitmux->location ?
1574         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1575
1576   if (fname) {
1577     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1578     g_object_set (splitmux->sink, "location", fname, NULL);
1579     g_free (fname);
1580
1581     splitmux->fragment_id++;
1582   }
1583 }
1584
1585 static GstStateChangeReturn
1586 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1587 {
1588   GstStateChangeReturn ret;
1589   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1590
1591   switch (transition) {
1592     case GST_STATE_CHANGE_NULL_TO_READY:{
1593       GST_SPLITMUX_LOCK (splitmux);
1594       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1595         ret = GST_STATE_CHANGE_FAILURE;
1596         GST_SPLITMUX_UNLOCK (splitmux);
1597         goto beach;
1598       }
1599       GST_SPLITMUX_UNLOCK (splitmux);
1600       splitmux->fragment_id = 0;
1601       set_next_filename (splitmux);
1602       break;
1603     }
1604     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1605       GST_SPLITMUX_LOCK (splitmux);
1606       /* Start by collecting one input on each pad */
1607       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1608       splitmux->max_in_running_time = 0;
1609       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1610       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1611       splitmux->opening_first_fragment = TRUE;
1612       GST_SPLITMUX_UNLOCK (splitmux);
1613       break;
1614     }
1615     case GST_STATE_CHANGE_PAUSED_TO_READY:
1616     case GST_STATE_CHANGE_READY_TO_NULL:
1617       GST_SPLITMUX_LOCK (splitmux);
1618       splitmux->state = SPLITMUX_STATE_STOPPED;
1619       /* Wake up any blocked threads */
1620       GST_LOG_OBJECT (splitmux,
1621           "State change -> NULL or READY. Waking threads");
1622       GST_SPLITMUX_BROADCAST (splitmux);
1623       GST_SPLITMUX_UNLOCK (splitmux);
1624       break;
1625     default:
1626       break;
1627   }
1628
1629   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1630   if (ret == GST_STATE_CHANGE_FAILURE)
1631     goto beach;
1632
1633   switch (transition) {
1634     case GST_STATE_CHANGE_READY_TO_NULL:
1635       GST_SPLITMUX_LOCK (splitmux);
1636       splitmux->fragment_id = 0;
1637       /* Reset internal elements only if no pad contexts are using them */
1638       if (splitmux->contexts == NULL)
1639         gst_splitmux_reset (splitmux);
1640       GST_SPLITMUX_UNLOCK (splitmux);
1641       break;
1642     default:
1643       break;
1644   }
1645
1646 beach:
1647
1648   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1649       ret == GST_STATE_CHANGE_FAILURE) {
1650     /* Cleanup elements on failed transition out of NULL */
1651     gst_splitmux_reset (splitmux);
1652   }
1653   return ret;
1654 }
1655
1656 gboolean
1657 register_splitmuxsink (GstPlugin * plugin)
1658 {
1659   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1660       "Split File Muxing Sink");
1661
1662   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1663       GST_TYPE_SPLITMUX_SINK);
1664 }
1665
1666 static void
1667 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1668 {
1669   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1670     splitmux->fragment_id = 0;
1671   }
1672 }