splitmuxsink: Fix deadlock case when source reaches EOS
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MAX_FILES,
75   PROP_MUXER_OVERHEAD,
76   PROP_MUXER,
77   PROP_SINK
78 };
79
80 #define DEFAULT_MAX_SIZE_TIME       0
81 #define DEFAULT_MAX_SIZE_BYTES      0
82 #define DEFAULT_MAX_FILES           0
83 #define DEFAULT_MUXER_OVERHEAD      0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
86
87 enum
88 {
89   SIGNAL_FORMAT_LOCATION,
90   SIGNAL_LAST
91 };
92
93 static guint signals[SIGNAL_LAST];
94
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
102     GST_PAD_SINK,
103     GST_PAD_REQUEST,
104     GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
107     GST_PAD_SINK,
108     GST_PAD_REQUEST,
109     GST_STATIC_CAPS_ANY);
110
111 static GQuark PAD_CONTEXT;
112
113 static void
114 _do_init (void)
115 {
116   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
117 }
118
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
121     _do_init ());
122
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126     const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128     GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
131
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
135
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137     element, GstStateChange transition);
138
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
144
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
146
147 static MqStreamBuf *
148 mq_stream_buf_new (void)
149 {
150   return g_slice_new0 (MqStreamBuf);
151 }
152
153 static void
154 mq_stream_buf_free (MqStreamBuf * data)
155 {
156   g_slice_free (MqStreamBuf, data);
157 }
158
159 static void
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
161 {
162   GObjectClass *gobject_class = (GObjectClass *) klass;
163   GstElementClass *gstelement_class = (GstElementClass *) klass;
164   GstBinClass *gstbin_class = (GstBinClass *) klass;
165
166   gobject_class->set_property = gst_splitmux_sink_set_property;
167   gobject_class->get_property = gst_splitmux_sink_get_property;
168   gobject_class->dispose = gst_splitmux_sink_dispose;
169   gobject_class->finalize = gst_splitmux_sink_finalize;
170
171   gst_element_class_set_static_metadata (gstelement_class,
172       "Split Muxing Bin", "Generic/Bin/Muxer",
173       "Convenience bin that muxes incoming streams into multiple time/size limited files",
174       "Jan Schmidt <jan@centricular.com>");
175
176   gst_element_class_add_static_pad_template (gstelement_class,
177       &video_sink_template);
178   gst_element_class_add_static_pad_template (gstelement_class,
179       &audio_sink_template);
180   gst_element_class_add_static_pad_template (gstelement_class,
181       &subtitle_sink_template);
182
183   gstelement_class->change_state =
184       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185   gstelement_class->request_new_pad =
186       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187   gstelement_class->release_pad =
188       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
189
190   gstbin_class->handle_message = bus_handler;
191
192   g_object_class_install_property (gobject_class, PROP_LOCATION,
193       g_param_spec_string ("location", "File Output Pattern",
194           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197       g_param_spec_double ("mux-overhead", "Muxing Overhead",
198           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199           DEFAULT_MUXER_OVERHEAD,
200           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
201
202   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211       g_param_spec_uint ("max-files", "Max files",
212           "Maximum number of files to keep on disk. Once the maximum is reached,"
213           "old files start to be deleted to make room for new ones.",
214           0, G_MAXUINT, DEFAULT_MAX_FILES,
215           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
216
217
218   g_object_class_install_property (gobject_class, PROP_MUXER,
219       g_param_spec_object ("muxer", "Muxer",
220           "The muxer element to use (NULL = default mp4mux)",
221           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_SINK,
223       g_param_spec_object ("sink", "Sink",
224           "The sink element (or element chain) to use (NULL = default filesink)",
225           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   /**
228    * GstSplitMuxSink::format-location:
229    * @splitmux: the #GstSplitMuxSink
230    * @fragment_id: the sequence number of the file to be created
231    *
232    * Returns: the location to be used for the next output file
233    */
234   signals[SIGNAL_FORMAT_LOCATION] =
235       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
237 }
238
239 static void
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
241 {
242   g_mutex_init (&splitmux->lock);
243   g_cond_init (&splitmux->data_cond);
244
245   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248   splitmux->max_files = DEFAULT_MAX_FILES;
249
250   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251 }
252
253 static void
254 gst_splitmux_reset (GstSplitMuxSink * splitmux)
255 {
256   if (splitmux->mq)
257     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
258   if (splitmux->muxer)
259     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
260   if (splitmux->active_sink)
261     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
262
263   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
264       NULL;
265 }
266
267 static void
268 gst_splitmux_sink_dispose (GObject * object)
269 {
270   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
271
272   G_OBJECT_CLASS (parent_class)->dispose (object);
273
274   /* Calling parent dispose invalidates all child pointers */
275   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
276       NULL;
277 }
278
279 static void
280 gst_splitmux_sink_finalize (GObject * object)
281 {
282   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
283   g_cond_clear (&splitmux->data_cond);
284   g_mutex_clear (&splitmux->lock);
285   if (splitmux->provided_sink)
286     gst_object_unref (splitmux->provided_sink);
287   if (splitmux->provided_muxer)
288     gst_object_unref (splitmux->provided_muxer);
289
290   g_free (splitmux->location);
291
292   /* Make sure to free any un-released contexts */
293   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
294   g_list_free (splitmux->contexts);
295
296   G_OBJECT_CLASS (parent_class)->finalize (object);
297 }
298
299 static void
300 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
301     const GValue * value, GParamSpec * pspec)
302 {
303   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
304
305   switch (prop_id) {
306     case PROP_LOCATION:{
307       GST_OBJECT_LOCK (splitmux);
308       g_free (splitmux->location);
309       splitmux->location = g_value_dup_string (value);
310       GST_OBJECT_UNLOCK (splitmux);
311       break;
312     }
313     case PROP_MAX_SIZE_BYTES:
314       GST_OBJECT_LOCK (splitmux);
315       splitmux->threshold_bytes = g_value_get_uint64 (value);
316       GST_OBJECT_UNLOCK (splitmux);
317       break;
318     case PROP_MAX_SIZE_TIME:
319       GST_OBJECT_LOCK (splitmux);
320       splitmux->threshold_time = g_value_get_uint64 (value);
321       GST_OBJECT_UNLOCK (splitmux);
322       break;
323     case PROP_MAX_FILES:
324       GST_OBJECT_LOCK (splitmux);
325       splitmux->max_files = g_value_get_uint (value);
326       GST_OBJECT_UNLOCK (splitmux);
327       break;
328     case PROP_MUXER_OVERHEAD:
329       GST_OBJECT_LOCK (splitmux);
330       splitmux->mux_overhead = g_value_get_double (value);
331       GST_OBJECT_UNLOCK (splitmux);
332       break;
333     case PROP_SINK:
334       GST_OBJECT_LOCK (splitmux);
335       if (splitmux->provided_sink)
336         gst_object_unref (splitmux->provided_sink);
337       splitmux->provided_sink = g_value_dup_object (value);
338       GST_OBJECT_UNLOCK (splitmux);
339       break;
340     case PROP_MUXER:
341       GST_OBJECT_LOCK (splitmux);
342       if (splitmux->provided_muxer)
343         gst_object_unref (splitmux->provided_muxer);
344       splitmux->provided_muxer = g_value_dup_object (value);
345       GST_OBJECT_UNLOCK (splitmux);
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
349       break;
350   }
351 }
352
353 static void
354 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
355     GValue * value, GParamSpec * pspec)
356 {
357   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
358
359   switch (prop_id) {
360     case PROP_LOCATION:
361       GST_OBJECT_LOCK (splitmux);
362       g_value_set_string (value, splitmux->location);
363       GST_OBJECT_UNLOCK (splitmux);
364       break;
365     case PROP_MAX_SIZE_BYTES:
366       GST_OBJECT_LOCK (splitmux);
367       g_value_set_uint64 (value, splitmux->threshold_bytes);
368       GST_OBJECT_UNLOCK (splitmux);
369       break;
370     case PROP_MAX_SIZE_TIME:
371       GST_OBJECT_LOCK (splitmux);
372       g_value_set_uint64 (value, splitmux->threshold_time);
373       GST_OBJECT_UNLOCK (splitmux);
374       break;
375     case PROP_MAX_FILES:
376       GST_OBJECT_LOCK (splitmux);
377       g_value_set_uint (value, splitmux->max_files);
378       GST_OBJECT_UNLOCK (splitmux);
379       break;
380     case PROP_MUXER_OVERHEAD:
381       GST_OBJECT_LOCK (splitmux);
382       g_value_set_double (value, splitmux->mux_overhead);
383       GST_OBJECT_UNLOCK (splitmux);
384       break;
385     case PROP_SINK:
386       GST_OBJECT_LOCK (splitmux);
387       g_value_set_object (value, splitmux->provided_sink);
388       GST_OBJECT_UNLOCK (splitmux);
389       break;
390     case PROP_MUXER:
391       GST_OBJECT_LOCK (splitmux);
392       g_value_set_object (value, splitmux->provided_muxer);
393       GST_OBJECT_UNLOCK (splitmux);
394       break;
395     default:
396       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
397       break;
398   }
399 }
400
401 static GstPad *
402 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
403 {
404   gchar *tmp, *sinkname, *srcname;
405   GstPad *mq_src;
406
407   sinkname = gst_pad_get_name (sink_pad);
408   tmp = sinkname + 5;
409   srcname = g_strdup_printf ("src_%s", tmp);
410
411   mq_src = gst_element_get_static_pad (mq, srcname);
412
413   g_free (sinkname);
414   g_free (srcname);
415
416   return mq_src;
417 }
418
419 static gboolean
420 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
421     GstPad ** src_pad)
422 {
423   GstPad *mq_sink;
424   GstPad *mq_src;
425
426   /* Request a pad from multiqueue, then connect this one, then
427    * discover the corresponding output pad and return both */
428   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
429   if (mq_sink == NULL)
430     return FALSE;
431
432   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
433   if (mq_src == NULL)
434     goto fail;
435
436   *sink_pad = mq_sink;
437   *src_pad = mq_src;
438
439   return TRUE;
440
441 fail:
442   gst_element_release_request_pad (splitmux->mq, mq_sink);
443   return FALSE;
444 }
445
446 static MqStreamCtx *
447 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
448 {
449   MqStreamCtx *ctx;
450
451   ctx = g_new0 (MqStreamCtx, 1);
452   g_atomic_int_set (&ctx->refcount, 1);
453   ctx->splitmux = splitmux;
454   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
455   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
456   ctx->in_running_time = ctx->out_running_time = 0;
457   g_queue_init (&ctx->queued_bufs);
458   return ctx;
459 }
460
461 static void
462 mq_stream_ctx_free (MqStreamCtx * ctx)
463 {
464   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
465   g_queue_clear (&ctx->queued_bufs);
466   g_free (ctx);
467 }
468
469 static void
470 mq_stream_ctx_unref (MqStreamCtx * ctx)
471 {
472   if (g_atomic_int_dec_and_test (&ctx->refcount))
473     mq_stream_ctx_free (ctx);
474 }
475
476 static void
477 mq_stream_ctx_ref (MqStreamCtx * ctx)
478 {
479   g_atomic_int_inc (&ctx->refcount);
480 }
481
482 static void
483 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
484 {
485   ctx->sink_pad_block_id = 0;
486   mq_stream_ctx_unref (ctx);
487 }
488
489 static void
490 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
491 {
492   ctx->src_pad_block_id = 0;
493   mq_stream_ctx_unref (ctx);
494 }
495
496 static void
497 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
498 {
499   gchar *location = NULL;
500   GstMessage *msg;
501   const gchar *msg_name = opened ?
502       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
503
504   g_object_get (splitmux->sink, "location", &location, NULL);
505
506   msg = gst_message_new_element (GST_OBJECT (splitmux),
507       gst_structure_new (msg_name,
508           "location", G_TYPE_STRING, location,
509           "running-time", GST_TYPE_CLOCK_TIME,
510           splitmux->reference_ctx->out_running_time, NULL));
511   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
512
513   g_free (location);
514 }
515
516 /* Called with lock held, drops the lock to send EOS to the
517  * pad
518  */
519 static void
520 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
521 {
522   GstEvent *eos;
523   GstPad *pad;
524
525   eos = gst_event_new_eos ();
526   pad = gst_pad_get_peer (ctx->srcpad);
527
528   ctx->out_eos = TRUE;
529
530   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
531   GST_SPLITMUX_UNLOCK (splitmux);
532   gst_pad_send_event (pad, eos);
533   GST_SPLITMUX_LOCK (splitmux);
534
535   gst_object_unref (pad);
536 }
537
538 /* Called with splitmux lock held to check if this output
539  * context needs to sleep to wait for the release of the
540  * next GOP, or to send EOS to close out the current file
541  */
542 static void
543 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
544 {
545   do {
546
547     GST_LOG_OBJECT (ctx->srcpad,
548         "Checking running time %" GST_TIME_FORMAT " against max %"
549         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
550         GST_TIME_ARGS (splitmux->max_out_running_time));
551
552     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
553         ctx->out_running_time < splitmux->max_out_running_time) {
554       splitmux->have_muxed_something = TRUE;
555       return;
556     }
557
558     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
559       return;
560
561     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
562       if (ctx->out_eos == FALSE) {
563         send_eos (splitmux, ctx);
564         continue;
565       }
566     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
567       start_next_fragment (splitmux);
568       continue;
569     }
570
571     GST_INFO_OBJECT (ctx->srcpad,
572         "Sleeping for running time %"
573         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
574         GST_TIME_ARGS (ctx->out_running_time),
575         GST_TIME_ARGS (splitmux->max_out_running_time));
576     ctx->out_blocked = TRUE;
577     /* Expand the mq if needed before sleeping */
578     check_queue_length (splitmux, ctx);
579     GST_SPLITMUX_WAIT (splitmux);
580     ctx->out_blocked = FALSE;
581     GST_INFO_OBJECT (ctx->srcpad,
582         "Woken for new max running time %" GST_TIME_FORMAT,
583         GST_TIME_ARGS (splitmux->max_out_running_time));
584   } while (1);
585 }
586
587 static GstPadProbeReturn
588 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
589 {
590   GstSplitMuxSink *splitmux = ctx->splitmux;
591   MqStreamBuf *buf_info = NULL;
592
593   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
594
595   /* FIXME: Handle buffer lists, until then make it clear they won't work */
596   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
597     g_warning ("Buffer list handling not implemented");
598     return GST_PAD_PROBE_DROP;
599   }
600   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
601     GstEvent *event = gst_pad_probe_info_get_event (info);
602
603     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
604
605     switch (GST_EVENT_TYPE (event)) {
606       case GST_EVENT_SEGMENT:
607         gst_event_copy_segment (event, &ctx->out_segment);
608         break;
609       case GST_EVENT_FLUSH_STOP:
610         GST_SPLITMUX_LOCK (splitmux);
611         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
612         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
613         g_queue_clear (&ctx->queued_bufs);
614         ctx->flushing = FALSE;
615         GST_SPLITMUX_UNLOCK (splitmux);
616         break;
617       case GST_EVENT_FLUSH_START:
618         GST_SPLITMUX_LOCK (splitmux);
619         GST_LOG_OBJECT (pad, "Flush start");
620         ctx->flushing = TRUE;
621         GST_SPLITMUX_BROADCAST (splitmux);
622         GST_SPLITMUX_UNLOCK (splitmux);
623         break;
624       case GST_EVENT_EOS:
625         GST_SPLITMUX_LOCK (splitmux);
626         if (splitmux->state == SPLITMUX_STATE_STOPPED)
627           goto beach;
628         ctx->out_eos = TRUE;
629         GST_SPLITMUX_UNLOCK (splitmux);
630         break;
631       case GST_EVENT_GAP:{
632         GstClockTime gap_ts;
633
634         gst_event_parse_gap (event, &gap_ts, NULL);
635         if (gap_ts == GST_CLOCK_TIME_NONE)
636           break;
637
638         GST_SPLITMUX_LOCK (splitmux);
639
640         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
641             GST_FORMAT_TIME, gap_ts);
642
643         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
644             GST_TIME_ARGS (gap_ts));
645
646         if (splitmux->state == SPLITMUX_STATE_STOPPED)
647           goto beach;
648         ctx->out_running_time = gap_ts;
649         complete_or_wait_on_out (splitmux, ctx);
650         GST_SPLITMUX_UNLOCK (splitmux);
651         break;
652       }
653       case GST_EVENT_CUSTOM_DOWNSTREAM:{
654         const GstStructure *s;
655         GstClockTime ts = 0;
656
657         s = gst_event_get_structure (event);
658         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
659           break;
660
661         gst_structure_get_uint64 (s, "timestamp", &ts);
662
663         GST_SPLITMUX_LOCK (splitmux);
664
665         if (splitmux->state == SPLITMUX_STATE_STOPPED)
666           goto beach;
667         ctx->out_running_time = ts;
668         complete_or_wait_on_out (splitmux, ctx);
669         GST_SPLITMUX_UNLOCK (splitmux);
670         return GST_PAD_PROBE_DROP;
671       }
672       default:
673         break;
674     }
675     return GST_PAD_PROBE_PASS;
676   }
677
678   /* Allow everything through until the configured next stopping point */
679   GST_SPLITMUX_LOCK (splitmux);
680
681   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
682   if (buf_info == NULL)
683     /* Can only happen due to a poorly timed flush */
684     goto beach;
685
686   /* If we have popped a keyframe, decrement the queued_gop count */
687   if (buf_info->keyframe && splitmux->queued_gops > 0)
688     splitmux->queued_gops--;
689
690   ctx->out_running_time = buf_info->run_ts;
691
692   GST_LOG_OBJECT (splitmux,
693       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
694       " size %" G_GSIZE_FORMAT,
695       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
696
697   if (splitmux->opening_first_fragment) {
698     send_fragment_opened_closed_msg (splitmux, TRUE);
699     splitmux->opening_first_fragment = FALSE;
700   }
701
702   complete_or_wait_on_out (splitmux, ctx);
703
704   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
705       splitmux->muxed_out_time < buf_info->run_ts)
706     splitmux->muxed_out_time = buf_info->run_ts;
707
708   splitmux->muxed_out_bytes += buf_info->buf_size;
709
710 #ifndef GST_DISABLE_GST_DEBUG
711   {
712     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
713     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
714         " run ts %" GST_TIME_FORMAT, buf,
715         GST_TIME_ARGS (ctx->out_running_time));
716   }
717 #endif
718
719   GST_SPLITMUX_UNLOCK (splitmux);
720
721   mq_stream_buf_free (buf_info);
722
723   return GST_PAD_PROBE_PASS;
724
725 beach:
726   GST_SPLITMUX_UNLOCK (splitmux);
727   return GST_PAD_PROBE_DROP;
728 }
729
730 static gboolean
731 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
732 {
733   return gst_pad_send_event (peer, gst_event_ref (*event));
734 }
735
736 static void
737 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
738 {
739   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
740
741   gst_pad_sticky_events_foreach (ctx->srcpad,
742       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
743
744   /* Clear EOS flag if not actually EOS */
745   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
746
747   gst_object_unref (peer);
748 }
749
750 /* Called with lock held when a fragment
751  * reaches EOS and it is time to restart
752  * a new fragment
753  */
754 static void
755 start_next_fragment (GstSplitMuxSink * splitmux)
756 {
757   /* 1 change to new file */
758   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
759   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
760
761   set_next_filename (splitmux);
762
763   gst_element_sync_state_with_parent (splitmux->active_sink);
764   gst_element_sync_state_with_parent (splitmux->muxer);
765
766   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
767
768   /* Switch state and go back to processing */
769   if (!splitmux->reference_ctx->in_eos) {
770     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
771     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
772   } else {
773     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
774     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
775     splitmux->have_muxed_something = FALSE;
776   }
777   splitmux->have_muxed_something =
778       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
779
780   /* Store the overflow parameters as the basis for the next fragment */
781   splitmux->mux_start_time = splitmux->muxed_out_time;
782   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
783
784   GST_DEBUG_OBJECT (splitmux,
785       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
786       GST_TIME_ARGS (splitmux->max_out_running_time));
787
788   send_fragment_opened_closed_msg (splitmux, TRUE);
789
790   GST_SPLITMUX_BROADCAST (splitmux);
791 }
792
793 static void
794 bus_handler (GstBin * bin, GstMessage * message)
795 {
796   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
797
798   switch (GST_MESSAGE_TYPE (message)) {
799     case GST_MESSAGE_EOS:
800       /* If the state is draining out the current file, drop this EOS */
801       GST_SPLITMUX_LOCK (splitmux);
802
803       send_fragment_opened_closed_msg (splitmux, FALSE);
804
805       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
806           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
807         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
808         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
809         GST_SPLITMUX_BROADCAST (splitmux);
810
811         gst_message_unref (message);
812         GST_SPLITMUX_UNLOCK (splitmux);
813         return;
814       }
815       GST_SPLITMUX_UNLOCK (splitmux);
816       break;
817     default:
818       break;
819   }
820
821   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
822 }
823
824 /* Called with splitmux lock held */
825 /* Called when entering ProcessingCompleteGop state
826  * Assess if mq contents overflowed the current file
827  *   -> If yes, need to switch to new file
828  *   -> if no, set max_out_running_time to let this GOP in and
829  *      go to COLLECTING_GOP_START state
830  */
831 static void
832 handle_gathered_gop (GstSplitMuxSink * splitmux)
833 {
834   GList *cur;
835   gsize queued_bytes = 0;
836   GstClockTime queued_time = 0;
837
838   /* Assess if the multiqueue contents overflowed the current file */
839   for (cur = g_list_first (splitmux->contexts);
840       cur != NULL; cur = g_list_next (cur)) {
841     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
842     if (tmpctx->in_running_time > queued_time)
843       queued_time = tmpctx->in_running_time;
844     queued_bytes += tmpctx->in_bytes;
845   }
846
847   g_assert (queued_bytes >= splitmux->mux_start_bytes);
848   g_assert (queued_time >= splitmux->mux_start_time);
849
850   queued_bytes -= splitmux->mux_start_bytes;
851   queued_time -= splitmux->mux_start_time;
852
853   /* Expand queued bytes estimate by muxer overhead */
854   queued_bytes += (queued_bytes * splitmux->mux_overhead);
855
856   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
857       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
858
859   /* Check for overrun - have we output at least one byte and overrun
860    * either threshold? */
861   if ((splitmux->have_muxed_something &&
862           ((splitmux->threshold_bytes > 0 &&
863                   queued_bytes >= splitmux->threshold_bytes) ||
864               (splitmux->threshold_time > 0 &&
865                   queued_time >= splitmux->threshold_time)))) {
866
867     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
868
869     GST_INFO_OBJECT (splitmux,
870         "mq overflowed since last, draining out. max out TS is %"
871         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
872     GST_SPLITMUX_BROADCAST (splitmux);
873
874   } else {
875     /* No overflow */
876     GST_LOG_OBJECT (splitmux,
877         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
878         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
879         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
880         queued_bytes, GST_TIME_ARGS (queued_time));
881
882     /* Wake everyone up to push this one GOP, then sleep */
883     splitmux->have_muxed_something = TRUE;
884
885     if (!splitmux->reference_ctx->in_eos) {
886       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
887       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
888     } else {
889       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
890       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
891     }
892
893     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
894         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
895     GST_SPLITMUX_BROADCAST (splitmux);
896   }
897
898 }
899
900 /* Called with splitmux lock held */
901 /* Called from each input pad when it is has all the pieces
902  * for a GOP or EOS, starting with the reference pad which has set the
903  * splitmux->max_in_running_time
904  */
905 static void
906 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
907 {
908   GList *cur;
909   gboolean ready = TRUE;
910   GstClockTime current_max_in_running_time;
911
912   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
913     /* Iterate each pad, and check that the input running time is at least
914      * up to the reference running time, and if so handle the collected GOP */
915     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
916         GST_TIME_FORMAT " ctx %p",
917         GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
918     for (cur = g_list_first (splitmux->contexts); cur != NULL;
919         cur = g_list_next (cur)) {
920       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
921
922       GST_LOG_OBJECT (splitmux,
923           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
924           " EOS %d", tmpctx, tmpctx->srcpad,
925           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
926
927       if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
928           tmpctx->in_running_time < splitmux->max_in_running_time &&
929           !tmpctx->in_eos) {
930         GST_LOG_OBJECT (splitmux,
931             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
932             tmpctx, tmpctx->srcpad);
933         ready = FALSE;
934         break;
935       }
936     }
937     if (ready) {
938       GST_DEBUG_OBJECT (splitmux,
939           "Collected GOP is complete. Processing (ctx %p)", ctx);
940       /* All pads have a complete GOP, release it into the multiqueue */
941       handle_gathered_gop (splitmux);
942     }
943   }
944
945   /* If upstream reached EOS we are not expecting more data, no need to wait
946    * here. */
947   if (ctx->in_eos)
948     return;
949
950   /* Some pad is not yet ready, or GOP is being pushed
951    * either way, sleep and wait to get woken */
952   current_max_in_running_time = splitmux->max_in_running_time;
953   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
954           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
955       !ctx->flushing &&
956       (current_max_in_running_time == splitmux->max_in_running_time)) {
957
958     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
959         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
960         "GOP complete" : "EOF draining", ctx);
961     GST_SPLITMUX_WAIT (splitmux);
962
963     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
964   }
965 }
966
967 static void
968 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
969 {
970   GList *cur;
971   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
972
973   GST_DEBUG_OBJECT (ctx->sinkpad,
974       "Checking queue length len %u cur_max %u queued gops %u",
975       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
976
977   if (cur_len >= splitmux->mq_max_buffers) {
978     gboolean allow_grow = FALSE;
979
980     /* If collecting a GOP and this pad might block,
981      * and there isn't already a pending GOP in the queue
982      * then grow
983      */
984     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
985         ctx->in_running_time < splitmux->max_in_running_time &&
986         splitmux->queued_gops <= 1) {
987       allow_grow = TRUE;
988     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
989         ctx->is_reference && splitmux->queued_gops <= 1) {
990       allow_grow = TRUE;
991     }
992
993     if (!allow_grow) {
994       for (cur = g_list_first (splitmux->contexts);
995           cur != NULL; cur = g_list_next (cur)) {
996         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
997         GST_DEBUG_OBJECT (tmpctx->sinkpad,
998             " len %u out_blocked %d",
999             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1000         /* If another stream is starving, grow */
1001         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1002           allow_grow = TRUE;
1003         }
1004       }
1005     }
1006
1007     if (allow_grow) {
1008       splitmux->mq_max_buffers = cur_len + 1;
1009
1010       GST_INFO_OBJECT (splitmux,
1011           "Multiqueue overrun - enlarging to %u buffers ctx %p",
1012           splitmux->mq_max_buffers, ctx);
1013
1014       g_object_set (splitmux->mq, "max-size-buffers",
1015           splitmux->mq_max_buffers, NULL);
1016     }
1017   }
1018 }
1019
1020 static GstPadProbeReturn
1021 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1022 {
1023   GstSplitMuxSink *splitmux = ctx->splitmux;
1024   GstBuffer *buf;
1025   MqStreamBuf *buf_info = NULL;
1026   GstClockTime ts;
1027   gboolean loop_again;
1028   gboolean keyframe = FALSE;
1029
1030   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1031
1032   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1033   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1034     g_warning ("Buffer list handling not implemented");
1035     return GST_PAD_PROBE_DROP;
1036   }
1037   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1038     GstEvent *event = gst_pad_probe_info_get_event (info);
1039     switch (GST_EVENT_TYPE (event)) {
1040       case GST_EVENT_SEGMENT:
1041         gst_event_copy_segment (event, &ctx->in_segment);
1042         break;
1043       case GST_EVENT_FLUSH_STOP:
1044         GST_SPLITMUX_LOCK (splitmux);
1045         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1046         ctx->in_eos = FALSE;
1047         ctx->in_bytes = 0;
1048         ctx->in_running_time = 0;
1049         GST_SPLITMUX_UNLOCK (splitmux);
1050         break;
1051       case GST_EVENT_EOS:
1052         GST_SPLITMUX_LOCK (splitmux);
1053         ctx->in_eos = TRUE;
1054
1055         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1056           goto beach;
1057
1058         if (ctx->is_reference) {
1059           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1060           /* Act as if this is a new keyframe with infinite timestamp */
1061           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
1062           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1063           /* Wake up other input pads to collect this GOP */
1064           GST_SPLITMUX_BROADCAST (splitmux);
1065           check_completed_gop (splitmux, ctx);
1066         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1067           /* If we are waiting for a GOP to be completed (ie, for aux
1068            * pads to catch up), then this pad is complete, so check
1069            * if the whole GOP is.
1070            */
1071           check_completed_gop (splitmux, ctx);
1072         }
1073         GST_SPLITMUX_UNLOCK (splitmux);
1074         break;
1075       default:
1076         break;
1077     }
1078     return GST_PAD_PROBE_PASS;
1079   }
1080
1081   buf = gst_pad_probe_info_get_buffer (info);
1082   buf_info = mq_stream_buf_new ();
1083
1084   if (GST_BUFFER_PTS_IS_VALID (buf))
1085     ts = GST_BUFFER_PTS (buf);
1086   else
1087     ts = GST_BUFFER_DTS (buf);
1088
1089   GST_SPLITMUX_LOCK (splitmux);
1090
1091   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1092     goto beach;
1093
1094   /* If this buffer has a timestamp, advance the input timestamp of the
1095    * stream */
1096   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1097     GstClockTime running_time =
1098         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1099         GST_BUFFER_TIMESTAMP (buf));
1100
1101     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1102         (ctx->in_running_time == GST_CLOCK_TIME_NONE
1103             || running_time > ctx->in_running_time))
1104       ctx->in_running_time = running_time;
1105   }
1106
1107   /* Try to make sure we have a valid running time */
1108   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1109     ctx->in_running_time =
1110         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1111         ctx->in_segment.start);
1112   }
1113
1114   buf_info->run_ts = ctx->in_running_time;
1115   buf_info->buf_size = gst_buffer_get_size (buf);
1116
1117   /* Update total input byte counter for overflow detect */
1118   ctx->in_bytes += buf_info->buf_size;
1119
1120   /* initialize mux_start_time */
1121   if (ctx->is_reference && splitmux->mux_start_time == 0)
1122     splitmux->mux_start_time = buf_info->run_ts;
1123
1124   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1125       " total in_bytes %" G_GSIZE_FORMAT,
1126       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1127
1128   loop_again = TRUE;
1129   do {
1130     if (ctx->flushing)
1131       break;
1132
1133     switch (splitmux->state) {
1134       case SPLITMUX_STATE_COLLECTING_GOP_START:
1135         if (ctx->is_reference) {
1136           /* If a keyframe, we have a complete GOP */
1137           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1138               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1139               splitmux->max_in_running_time >= ctx->in_running_time) {
1140             /* Pass this buffer through */
1141             loop_again = FALSE;
1142             break;
1143           }
1144           GST_INFO_OBJECT (pad,
1145               "Have keyframe with running time %" GST_TIME_FORMAT,
1146               GST_TIME_ARGS (ctx->in_running_time));
1147           keyframe = TRUE;
1148           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1149           splitmux->max_in_running_time = ctx->in_running_time;
1150           /* Wake up other input pads to collect this GOP */
1151           GST_SPLITMUX_BROADCAST (splitmux);
1152           check_completed_gop (splitmux, ctx);
1153         } else {
1154           /* We're still waiting for a keyframe on the reference pad, sleep */
1155           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1156           GST_SPLITMUX_WAIT (splitmux);
1157           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1158               splitmux->state);
1159         }
1160         break;
1161       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1162         /* After a GOP start is found, this buffer might complete the GOP */
1163         /* If we overran the target timestamp, it might be time to process
1164          * the GOP, otherwise bail out for more data
1165          */
1166         GST_LOG_OBJECT (pad,
1167             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1168             GST_TIME_ARGS (ctx->in_running_time),
1169             GST_TIME_ARGS (splitmux->max_in_running_time));
1170
1171         if (ctx->in_running_time < splitmux->max_in_running_time) {
1172           loop_again = FALSE;
1173           break;
1174         }
1175
1176         GST_LOG_OBJECT (pad,
1177             "Collected last packet of GOP. Checking other pads");
1178         check_completed_gop (splitmux, ctx);
1179         break;
1180       case SPLITMUX_STATE_ENDING_FILE:{
1181         GstEvent *event;
1182
1183         /* If somes streams received no buffer during the last GOP that overran,
1184          * because its next buffer has a timestamp bigger than
1185          * ctx->max_in_running_time, its queue is empty. In that case the only
1186          * way to wakeup the output thread is by injecting an event in the
1187          * queue. This usually happen with subtitle streams.
1188          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1189         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1190         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1191             GST_EVENT_TYPE_SERIALIZED,
1192             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1193                 G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
1194         gst_pad_send_event (ctx->sinkpad, event);
1195         /* fallthrough */
1196       }
1197       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1198         /* A fragment is ending, wait until that's done before continuing */
1199         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1200         GST_SPLITMUX_WAIT (splitmux);
1201         GST_DEBUG_OBJECT (pad,
1202             "Done sleeping for fragment restart state now %d", splitmux->state);
1203         break;
1204       default:
1205         loop_again = FALSE;
1206         break;
1207     }
1208   } while (loop_again);
1209
1210   if (keyframe) {
1211     splitmux->queued_gops++;
1212     buf_info->keyframe = TRUE;
1213   }
1214
1215   /* Now add this buffer to the queue just before returning */
1216   g_queue_push_head (&ctx->queued_bufs, buf_info);
1217
1218   /* Check the buffer will fit in the mq */
1219   check_queue_length (splitmux, ctx);
1220
1221   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1222       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1223
1224   GST_SPLITMUX_UNLOCK (splitmux);
1225   return GST_PAD_PROBE_PASS;
1226
1227 beach:
1228   GST_SPLITMUX_UNLOCK (splitmux);
1229   if (buf_info)
1230     mq_stream_buf_free (buf_info);
1231   return GST_PAD_PROBE_PASS;
1232 }
1233
1234 static GstPad *
1235 gst_splitmux_sink_request_new_pad (GstElement * element,
1236     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1237 {
1238   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1239   GstPadTemplate *mux_template = NULL;
1240   GstPad *res = NULL;
1241   GstPad *mq_sink, *mq_src;
1242   gchar *gname;
1243   gboolean is_video = FALSE;
1244   MqStreamCtx *ctx;
1245
1246   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1247
1248   GST_SPLITMUX_LOCK (splitmux);
1249   if (!create_elements (splitmux))
1250     goto fail;
1251
1252   if (templ->name_template) {
1253     if (g_str_equal (templ->name_template, "video")) {
1254       /* FIXME: Look for a pad template with matching caps, rather than by name */
1255       mux_template =
1256           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1257           (splitmux->muxer), "video_%u");
1258       is_video = TRUE;
1259       name = NULL;
1260     } else {
1261       mux_template =
1262           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1263           (splitmux->muxer), templ->name_template);
1264     }
1265     if (mux_template == NULL) {
1266       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1267       mux_template =
1268           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1269           (splitmux->muxer), "sink_%d");
1270     }
1271   }
1272
1273   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1274   if (res == NULL)
1275     goto fail;
1276
1277   if (is_video)
1278     gname = g_strdup ("video");
1279   else if (name == NULL)
1280     gname = gst_pad_get_name (res);
1281   else
1282     gname = g_strdup (name);
1283
1284   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1285     gst_element_release_request_pad (splitmux->muxer, res);
1286     gst_object_unref (GST_OBJECT (res));
1287     goto fail;
1288   }
1289
1290   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1291     gst_element_release_request_pad (splitmux->muxer, res);
1292     gst_object_unref (GST_OBJECT (res));
1293     gst_element_release_request_pad (splitmux->mq, mq_sink);
1294     gst_object_unref (GST_OBJECT (mq_sink));
1295     goto fail;
1296   }
1297
1298   gst_object_unref (GST_OBJECT (res));
1299
1300   ctx = mq_stream_ctx_new (splitmux);
1301   ctx->srcpad = mq_src;
1302   ctx->sinkpad = mq_sink;
1303
1304   mq_stream_ctx_ref (ctx);
1305   ctx->src_pad_block_id =
1306       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1307       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1308       _pad_block_destroy_src_notify);
1309   if (is_video && splitmux->reference_ctx != NULL) {
1310     splitmux->reference_ctx->is_reference = FALSE;
1311     splitmux->reference_ctx = NULL;
1312   }
1313   if (splitmux->reference_ctx == NULL) {
1314     splitmux->reference_ctx = ctx;
1315     ctx->is_reference = TRUE;
1316   }
1317
1318   res = gst_ghost_pad_new (gname, mq_sink);
1319   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1320
1321   mq_stream_ctx_ref (ctx);
1322   ctx->sink_pad_block_id =
1323       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1324       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1325       _pad_block_destroy_sink_notify);
1326
1327   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1328       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1329
1330   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1331
1332   g_free (gname);
1333
1334   gst_object_unref (mq_sink);
1335   gst_object_unref (mq_src);
1336
1337   gst_pad_set_active (res, TRUE);
1338   gst_element_add_pad (element, res);
1339   GST_SPLITMUX_UNLOCK (splitmux);
1340
1341   return res;
1342 fail:
1343   GST_SPLITMUX_UNLOCK (splitmux);
1344   return NULL;
1345 }
1346
1347 static void
1348 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1349 {
1350   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1351   GstPad *mqsink, *mqsrc, *muxpad;
1352   MqStreamCtx *ctx =
1353       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1354
1355   GST_SPLITMUX_LOCK (splitmux);
1356
1357   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1358     goto fail;                  /* Elements don't exist yet - nothing to release */
1359
1360   GST_INFO_OBJECT (pad, "releasing request pad");
1361
1362   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1363   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1364   muxpad = gst_pad_get_peer (mqsrc);
1365
1366   /* Remove the context from our consideration */
1367   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1368
1369   if (ctx->sink_pad_block_id)
1370     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1371
1372   if (ctx->src_pad_block_id)
1373     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1374
1375   /* Can release the context now */
1376   mq_stream_ctx_unref (ctx);
1377
1378   /* Release and free the mq input */
1379   gst_element_release_request_pad (splitmux->mq, mqsink);
1380
1381   /* Release and free the muxer input */
1382   gst_element_release_request_pad (splitmux->muxer, muxpad);
1383
1384   gst_object_unref (mqsink);
1385   gst_object_unref (mqsrc);
1386   gst_object_unref (muxpad);
1387
1388   gst_element_remove_pad (element, pad);
1389
1390   /* Reset the internal elements only after all request pads are released */
1391   if (splitmux->contexts == NULL)
1392     gst_splitmux_reset (splitmux);
1393
1394 fail:
1395   GST_SPLITMUX_UNLOCK (splitmux);
1396 }
1397
1398 static GstElement *
1399 create_element (GstSplitMuxSink * splitmux,
1400     const gchar * factory, const gchar * name)
1401 {
1402   GstElement *ret = gst_element_factory_make (factory, name);
1403   if (ret == NULL) {
1404     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1405     return NULL;
1406   }
1407
1408   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1409     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1410     gst_object_unref (ret);
1411     return NULL;
1412   }
1413
1414   return ret;
1415 }
1416
1417 static gboolean
1418 create_elements (GstSplitMuxSink * splitmux)
1419 {
1420   /* Create internal elements */
1421   if (splitmux->mq == NULL) {
1422     if ((splitmux->mq =
1423             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1424       goto fail;
1425
1426     splitmux->mq_max_buffers = 5;
1427     /* No bytes or time limit, we limit buffers manually */
1428     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1429         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1430   }
1431
1432   if (splitmux->muxer == NULL) {
1433     GstElement *provided_muxer = NULL;
1434
1435     GST_OBJECT_LOCK (splitmux);
1436     if (splitmux->provided_muxer != NULL)
1437       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1438     GST_OBJECT_UNLOCK (splitmux);
1439
1440     if (provided_muxer == NULL) {
1441       if ((splitmux->muxer =
1442               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1443         goto fail;
1444     } else {
1445       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1446         g_warning ("Could not add muxer element - splitmuxsink will not work");
1447         gst_object_unref (provided_muxer);
1448         goto fail;
1449       }
1450
1451       splitmux->muxer = provided_muxer;
1452       gst_object_unref (provided_muxer);
1453     }
1454   }
1455
1456   return TRUE;
1457 fail:
1458   return FALSE;
1459 }
1460
1461 static GstElement *
1462 find_sink (GstElement * e)
1463 {
1464   GstElement *res = NULL;
1465   GstIterator *iter;
1466   gboolean done = FALSE;
1467   GValue data = { 0, };
1468
1469   if (!GST_IS_BIN (e))
1470     return e;
1471
1472   iter = gst_bin_iterate_sinks (GST_BIN (e));
1473   while (!done) {
1474     switch (gst_iterator_next (iter, &data)) {
1475       case GST_ITERATOR_OK:
1476       {
1477         GstElement *child = g_value_get_object (&data);
1478         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1479                 "location") != NULL) {
1480           res = child;
1481           done = TRUE;
1482         }
1483         g_value_reset (&data);
1484         break;
1485       }
1486       case GST_ITERATOR_RESYNC:
1487         gst_iterator_resync (iter);
1488         break;
1489       case GST_ITERATOR_DONE:
1490         done = TRUE;
1491         break;
1492       case GST_ITERATOR_ERROR:
1493         g_assert_not_reached ();
1494         break;
1495     }
1496   }
1497   g_value_unset (&data);
1498   gst_iterator_free (iter);
1499
1500   return res;
1501 }
1502
1503 static gboolean
1504 create_sink (GstSplitMuxSink * splitmux)
1505 {
1506   GstElement *provided_sink = NULL;
1507
1508   if (splitmux->active_sink == NULL) {
1509
1510     GST_OBJECT_LOCK (splitmux);
1511     if (splitmux->provided_sink != NULL)
1512       provided_sink = gst_object_ref (splitmux->provided_sink);
1513     GST_OBJECT_UNLOCK (splitmux);
1514
1515     if (provided_sink == NULL) {
1516       if ((splitmux->sink =
1517               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1518         goto fail;
1519       splitmux->active_sink = splitmux->sink;
1520     } else {
1521       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1522         g_warning ("Could not add sink elements - splitmuxsink will not work");
1523         gst_object_unref (provided_sink);
1524         goto fail;
1525       }
1526
1527       splitmux->active_sink = provided_sink;
1528
1529       /* The bin holds a ref now, we can drop our tmp ref */
1530       gst_object_unref (provided_sink);
1531
1532       /* Find the sink element */
1533       splitmux->sink = find_sink (splitmux->active_sink);
1534       if (splitmux->sink == NULL) {
1535         g_warning
1536             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1537         goto fail;
1538       }
1539     }
1540
1541     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1542       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1543       goto fail;
1544     }
1545   }
1546
1547   return TRUE;
1548 fail:
1549   return FALSE;
1550 }
1551
1552 #ifdef __GNUC__
1553 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1554 #endif
1555 static void
1556 set_next_filename (GstSplitMuxSink * splitmux)
1557 {
1558   gchar *fname = NULL;
1559   gst_splitmux_sink_ensure_max_files (splitmux);
1560
1561   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1562       splitmux->fragment_id, &fname);
1563
1564   if (!fname)
1565     fname = splitmux->location ?
1566         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1567
1568   if (fname) {
1569     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1570     g_object_set (splitmux->sink, "location", fname, NULL);
1571     g_free (fname);
1572
1573     splitmux->fragment_id++;
1574   }
1575 }
1576
1577 static GstStateChangeReturn
1578 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1579 {
1580   GstStateChangeReturn ret;
1581   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1582
1583   switch (transition) {
1584     case GST_STATE_CHANGE_NULL_TO_READY:{
1585       GST_SPLITMUX_LOCK (splitmux);
1586       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1587         ret = GST_STATE_CHANGE_FAILURE;
1588         GST_SPLITMUX_UNLOCK (splitmux);
1589         goto beach;
1590       }
1591       GST_SPLITMUX_UNLOCK (splitmux);
1592       splitmux->fragment_id = 0;
1593       set_next_filename (splitmux);
1594       break;
1595     }
1596     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1597       GST_SPLITMUX_LOCK (splitmux);
1598       /* Start by collecting one input on each pad */
1599       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1600       splitmux->max_in_running_time = 0;
1601       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1602       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1603       splitmux->opening_first_fragment = TRUE;
1604       GST_SPLITMUX_UNLOCK (splitmux);
1605       break;
1606     }
1607     case GST_STATE_CHANGE_PAUSED_TO_READY:
1608     case GST_STATE_CHANGE_READY_TO_NULL:
1609       GST_SPLITMUX_LOCK (splitmux);
1610       splitmux->state = SPLITMUX_STATE_STOPPED;
1611       /* Wake up any blocked threads */
1612       GST_LOG_OBJECT (splitmux,
1613           "State change -> NULL or READY. Waking threads");
1614       GST_SPLITMUX_BROADCAST (splitmux);
1615       GST_SPLITMUX_UNLOCK (splitmux);
1616       break;
1617     default:
1618       break;
1619   }
1620
1621   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1622   if (ret == GST_STATE_CHANGE_FAILURE)
1623     goto beach;
1624
1625   switch (transition) {
1626     case GST_STATE_CHANGE_READY_TO_NULL:
1627       GST_SPLITMUX_LOCK (splitmux);
1628       splitmux->fragment_id = 0;
1629       /* Reset internal elements only if no pad contexts are using them */
1630       if (splitmux->contexts == NULL)
1631         gst_splitmux_reset (splitmux);
1632       GST_SPLITMUX_UNLOCK (splitmux);
1633       break;
1634     default:
1635       break;
1636   }
1637
1638 beach:
1639
1640   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1641       ret == GST_STATE_CHANGE_FAILURE) {
1642     /* Cleanup elements on failed transition out of NULL */
1643     gst_splitmux_reset (splitmux);
1644   }
1645   return ret;
1646 }
1647
1648 gboolean
1649 register_splitmuxsink (GstPlugin * plugin)
1650 {
1651   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1652       "Split File Muxing Sink");
1653
1654   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1655       GST_TYPE_SPLITMUX_SINK);
1656 }
1657
1658 static void
1659 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1660 {
1661   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1662     splitmux->fragment_id = 0;
1663   }
1664 }