splitmux: Handle a hang draining out at EOS
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include "gstsplitmuxsink.h"
58
59 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
60 #define GST_CAT_DEFAULT splitmux_debug
61
62 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
63 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
64 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
65 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
66
67 enum
68 {
69   PROP_0,
70   PROP_LOCATION,
71   PROP_MAX_SIZE_TIME,
72   PROP_MAX_SIZE_BYTES,
73   PROP_MUXER_OVERHEAD,
74   PROP_MUXER,
75   PROP_SINK
76 };
77
78 #define DEFAULT_MAX_SIZE_TIME       0
79 #define DEFAULT_MAX_SIZE_BYTES      0
80 #define DEFAULT_MUXER_OVERHEAD      0.02
81 #define DEFAULT_MUXER "mp4mux"
82 #define DEFAULT_SINK "filesink"
83
84 enum
85 {
86   SIGNAL_FORMAT_LOCATION,
87   SIGNAL_LAST
88 };
89
90 static guint signals[SIGNAL_LAST];
91
92 static GstStaticPadTemplate video_sink_template =
93 GST_STATIC_PAD_TEMPLATE ("video",
94     GST_PAD_SINK,
95     GST_PAD_REQUEST,
96     GST_STATIC_CAPS_ANY);
97 static GstStaticPadTemplate audio_sink_template =
98 GST_STATIC_PAD_TEMPLATE ("audio_%u",
99     GST_PAD_SINK,
100     GST_PAD_REQUEST,
101     GST_STATIC_CAPS_ANY);
102 static GstStaticPadTemplate subtitle_sink_template =
103 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
104     GST_PAD_SINK,
105     GST_PAD_REQUEST,
106     GST_STATIC_CAPS_ANY);
107
108 static GQuark PAD_CONTEXT;
109
110 static void
111 _do_init (void)
112 {
113   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
114 }
115
116 #define gst_splitmux_sink_parent_class parent_class
117 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
118     _do_init ());
119
120 static gboolean create_elements (GstSplitMuxSink * splitmux);
121 static gboolean create_sink (GstSplitMuxSink * splitmux);
122 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
123     const GValue * value, GParamSpec * pspec);
124 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
125     GValue * value, GParamSpec * pspec);
126 static void gst_splitmux_sink_dispose (GObject * object);
127 static void gst_splitmux_sink_finalize (GObject * object);
128
129 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
130     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
131 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
132
133 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
134     element, GstStateChange transition);
135
136 static void bus_handler (GstBin * bin, GstMessage * msg);
137 static void set_next_filename (GstSplitMuxSink * splitmux);
138 static void start_next_fragment (GstSplitMuxSink * splitmux);
139 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
140 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
141
142 static MqStreamBuf *
143 mq_stream_buf_new (void)
144 {
145   return g_slice_new0 (MqStreamBuf);
146 }
147
148 static void
149 mq_stream_buf_free (MqStreamBuf * data)
150 {
151   g_slice_free (MqStreamBuf, data);
152 }
153
154 static void
155 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
156 {
157   GObjectClass *gobject_class = (GObjectClass *) klass;
158   GstElementClass *gstelement_class = (GstElementClass *) klass;
159   GstBinClass *gstbin_class = (GstBinClass *) klass;
160
161   gobject_class->set_property = gst_splitmux_sink_set_property;
162   gobject_class->get_property = gst_splitmux_sink_get_property;
163   gobject_class->dispose = gst_splitmux_sink_dispose;
164   gobject_class->finalize = gst_splitmux_sink_finalize;
165
166   gst_element_class_set_static_metadata (gstelement_class,
167       "Split Muxing Bin", "Generic/Bin/Muxer",
168       "Convenience bin that muxes incoming streams into multiple time/size limited files",
169       "Jan Schmidt <jan@centricular.com>");
170
171   gst_element_class_add_static_pad_template (gstelement_class,
172       &video_sink_template);
173   gst_element_class_add_static_pad_template (gstelement_class,
174       &audio_sink_template);
175   gst_element_class_add_static_pad_template (gstelement_class,
176       &subtitle_sink_template);
177
178   gstelement_class->change_state =
179       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
180   gstelement_class->request_new_pad =
181       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
182   gstelement_class->release_pad =
183       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
184
185   gstbin_class->handle_message = bus_handler;
186
187   g_object_class_install_property (gobject_class, PROP_LOCATION,
188       g_param_spec_string ("location", "File Output Pattern",
189           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
190           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
191   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
192       g_param_spec_double ("mux-overhead", "Muxing Overhead",
193           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
194           DEFAULT_MUXER_OVERHEAD,
195           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
196
197   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
198       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
199           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
200           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
202       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
203           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
204           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
205
206   g_object_class_install_property (gobject_class, PROP_MUXER,
207       g_param_spec_object ("muxer", "Muxer",
208           "The muxer element to use (NULL = default mp4mux)",
209           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_SINK,
211       g_param_spec_object ("sink", "Sink",
212           "The sink element (or element chain) to use (NULL = default filesink)",
213           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214
215   /**
216    * GstSplitMuxSink::format-location:
217    * @splitmux: the #GstSplitMuxSink
218    * @fragment_id: the sequence number of the file to be created
219    *
220    * Returns: the location to be used for the next output file
221    */
222   signals[SIGNAL_FORMAT_LOCATION] =
223       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
224       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
225 }
226
227 static void
228 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
229 {
230   g_mutex_init (&splitmux->lock);
231   g_cond_init (&splitmux->data_cond);
232
233   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
234   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
235   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
236
237   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
238 }
239
240 static void
241 gst_splitmux_reset (GstSplitMuxSink * splitmux)
242 {
243   if (splitmux->mq)
244     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
245   if (splitmux->muxer)
246     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
247   if (splitmux->active_sink)
248     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
249
250   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
251       NULL;
252 }
253
254 static void
255 gst_splitmux_sink_dispose (GObject * object)
256 {
257   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
258
259   G_OBJECT_CLASS (parent_class)->dispose (object);
260
261   /* Calling parent dispose invalidates all child pointers */
262   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
263       NULL;
264 }
265
266 static void
267 gst_splitmux_sink_finalize (GObject * object)
268 {
269   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
270   g_cond_clear (&splitmux->data_cond);
271   g_mutex_clear (&splitmux->lock);
272   if (splitmux->provided_sink)
273     gst_object_unref (splitmux->provided_sink);
274   if (splitmux->provided_muxer)
275     gst_object_unref (splitmux->provided_muxer);
276
277   g_free (splitmux->location);
278
279   /* Make sure to free any un-released contexts */
280   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
281   g_list_free (splitmux->contexts);
282
283   G_OBJECT_CLASS (parent_class)->finalize (object);
284 }
285
286 static void
287 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
288     const GValue * value, GParamSpec * pspec)
289 {
290   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
291
292   switch (prop_id) {
293     case PROP_LOCATION:{
294       GST_OBJECT_LOCK (splitmux);
295       g_free (splitmux->location);
296       splitmux->location = g_value_dup_string (value);
297       GST_OBJECT_UNLOCK (splitmux);
298       break;
299     }
300     case PROP_MAX_SIZE_BYTES:
301       GST_OBJECT_LOCK (splitmux);
302       splitmux->threshold_bytes = g_value_get_uint64 (value);
303       GST_OBJECT_UNLOCK (splitmux);
304       break;
305     case PROP_MAX_SIZE_TIME:
306       GST_OBJECT_LOCK (splitmux);
307       splitmux->threshold_time = g_value_get_uint64 (value);
308       GST_OBJECT_UNLOCK (splitmux);
309       break;
310     case PROP_MUXER_OVERHEAD:
311       GST_OBJECT_LOCK (splitmux);
312       splitmux->mux_overhead = g_value_get_double (value);
313       GST_OBJECT_UNLOCK (splitmux);
314       break;
315     case PROP_SINK:
316       GST_OBJECT_LOCK (splitmux);
317       if (splitmux->provided_sink)
318         gst_object_unref (splitmux->provided_sink);
319       splitmux->provided_sink = g_value_dup_object (value);
320       GST_OBJECT_UNLOCK (splitmux);
321       break;
322     case PROP_MUXER:
323       GST_OBJECT_LOCK (splitmux);
324       if (splitmux->provided_muxer)
325         gst_object_unref (splitmux->provided_muxer);
326       splitmux->provided_muxer = g_value_dup_object (value);
327       GST_OBJECT_UNLOCK (splitmux);
328       break;
329     default:
330       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
331       break;
332   }
333 }
334
335 static void
336 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
337     GValue * value, GParamSpec * pspec)
338 {
339   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
340
341   switch (prop_id) {
342     case PROP_LOCATION:
343       GST_OBJECT_LOCK (splitmux);
344       g_value_set_string (value, splitmux->location);
345       GST_OBJECT_UNLOCK (splitmux);
346       break;
347     case PROP_MAX_SIZE_BYTES:
348       GST_OBJECT_LOCK (splitmux);
349       g_value_set_uint64 (value, splitmux->threshold_bytes);
350       GST_OBJECT_UNLOCK (splitmux);
351       break;
352     case PROP_MAX_SIZE_TIME:
353       GST_OBJECT_LOCK (splitmux);
354       g_value_set_uint64 (value, splitmux->threshold_time);
355       GST_OBJECT_UNLOCK (splitmux);
356       break;
357     case PROP_MUXER_OVERHEAD:
358       GST_OBJECT_LOCK (splitmux);
359       g_value_set_double (value, splitmux->mux_overhead);
360       GST_OBJECT_UNLOCK (splitmux);
361       break;
362     case PROP_SINK:
363       GST_OBJECT_LOCK (splitmux);
364       g_value_set_object (value, splitmux->provided_sink);
365       GST_OBJECT_UNLOCK (splitmux);
366       break;
367     case PROP_MUXER:
368       GST_OBJECT_LOCK (splitmux);
369       g_value_set_object (value, splitmux->provided_muxer);
370       GST_OBJECT_UNLOCK (splitmux);
371       break;
372     default:
373       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
374       break;
375   }
376 }
377
378 static GstPad *
379 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
380 {
381   gchar *tmp, *sinkname, *srcname;
382   GstPad *mq_src;
383
384   sinkname = gst_pad_get_name (sink_pad);
385   tmp = sinkname + 5;
386   srcname = g_strdup_printf ("src_%s", tmp);
387
388   mq_src = gst_element_get_static_pad (mq, srcname);
389
390   g_free (sinkname);
391   g_free (srcname);
392
393   return mq_src;
394 }
395
396 static gboolean
397 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
398     GstPad ** src_pad)
399 {
400   GstPad *mq_sink;
401   GstPad *mq_src;
402
403   /* Request a pad from multiqueue, then connect this one, then
404    * discover the corresponding output pad and return both */
405   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
406   if (mq_sink == NULL)
407     return FALSE;
408
409   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
410   if (mq_src == NULL)
411     goto fail;
412
413   *sink_pad = mq_sink;
414   *src_pad = mq_src;
415
416   return TRUE;
417
418 fail:
419   gst_element_release_request_pad (splitmux->mq, mq_sink);
420   return FALSE;
421 }
422
423 static MqStreamCtx *
424 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
425 {
426   MqStreamCtx *ctx;
427
428   ctx = g_new0 (MqStreamCtx, 1);
429   g_atomic_int_set (&ctx->refcount, 1);
430   ctx->splitmux = splitmux;
431   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
432   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
433   ctx->in_running_time = ctx->out_running_time = 0;
434   g_queue_init (&ctx->queued_bufs);
435   return ctx;
436 }
437
438 static void
439 mq_stream_ctx_free (MqStreamCtx * ctx)
440 {
441   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
442   g_queue_clear (&ctx->queued_bufs);
443   g_free (ctx);
444 }
445
446 static void
447 mq_stream_ctx_unref (MqStreamCtx * ctx)
448 {
449   if (g_atomic_int_dec_and_test (&ctx->refcount))
450     mq_stream_ctx_free (ctx);
451 }
452
453 static void
454 mq_stream_ctx_ref (MqStreamCtx * ctx)
455 {
456   g_atomic_int_inc (&ctx->refcount);
457 }
458
459 static void
460 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
461 {
462   ctx->sink_pad_block_id = 0;
463   mq_stream_ctx_unref (ctx);
464 }
465
466 static void
467 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
468 {
469   ctx->src_pad_block_id = 0;
470   mq_stream_ctx_unref (ctx);
471 }
472
473 static void
474 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
475 {
476   gchar *location = NULL;
477   GstMessage *msg;
478   const gchar *msg_name = opened ?
479       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
480
481   g_object_get (splitmux->sink, "location", &location, NULL);
482
483   msg = gst_message_new_element (GST_OBJECT (splitmux),
484       gst_structure_new (msg_name,
485           "location", G_TYPE_STRING, location,
486           "running-time", GST_TYPE_CLOCK_TIME,
487           splitmux->reference_ctx->out_running_time, NULL));
488   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
489
490   g_free (location);
491 }
492
493 /* Called with lock held, drops the lock to send EOS to the
494  * pad
495  */
496 static void
497 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
498 {
499   GstEvent *eos;
500   GstPad *pad;
501
502   eos = gst_event_new_eos ();
503   pad = gst_pad_get_peer (ctx->srcpad);
504
505   ctx->out_eos = TRUE;
506
507   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
508   GST_SPLITMUX_UNLOCK (splitmux);
509   gst_pad_send_event (pad, eos);
510   GST_SPLITMUX_LOCK (splitmux);
511
512   gst_object_unref (pad);
513 }
514
515 /* Called with splitmux lock held to check if this output
516  * context needs to sleep to wait for the release of the
517  * next GOP, or to send EOS to close out the current file
518  */
519 static void
520 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
521 {
522   do {
523
524     GST_LOG_OBJECT (ctx->srcpad,
525         "Checking running time %" GST_TIME_FORMAT " against max %"
526         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
527         GST_TIME_ARGS (splitmux->max_out_running_time));
528
529     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
530         ctx->out_running_time < splitmux->max_out_running_time) {
531       splitmux->have_muxed_something = TRUE;
532       return;
533     }
534
535     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
536       return;
537
538     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
539       if (ctx->out_eos == FALSE) {
540         send_eos (splitmux, ctx);
541         continue;
542       }
543     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
544       start_next_fragment (splitmux);
545       continue;
546     }
547
548     GST_INFO_OBJECT (ctx->srcpad,
549         "Sleeping for running time %"
550         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
551         GST_TIME_ARGS (ctx->out_running_time),
552         GST_TIME_ARGS (splitmux->max_out_running_time));
553     ctx->out_blocked = TRUE;
554     /* Expand the mq if needed before sleeping */
555     check_queue_length (splitmux, ctx);
556     GST_SPLITMUX_WAIT (splitmux);
557     ctx->out_blocked = FALSE;
558     GST_INFO_OBJECT (ctx->srcpad,
559         "Woken for new max running time %" GST_TIME_FORMAT,
560         GST_TIME_ARGS (splitmux->max_out_running_time));
561   } while (1);
562 }
563
564 static GstPadProbeReturn
565 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
566 {
567   GstSplitMuxSink *splitmux = ctx->splitmux;
568   MqStreamBuf *buf_info = NULL;
569
570   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
571
572   /* FIXME: Handle buffer lists, until then make it clear they won't work */
573   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
574     g_warning ("Buffer list handling not implemented");
575     return GST_PAD_PROBE_DROP;
576   }
577   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
578     GstEvent *event = gst_pad_probe_info_get_event (info);
579
580     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
581
582     switch (GST_EVENT_TYPE (event)) {
583       case GST_EVENT_SEGMENT:
584         gst_event_copy_segment (event, &ctx->out_segment);
585         break;
586       case GST_EVENT_FLUSH_STOP:
587         GST_SPLITMUX_LOCK (splitmux);
588         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
589         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
590         g_queue_clear (&ctx->queued_bufs);
591         ctx->flushing = FALSE;
592         GST_SPLITMUX_UNLOCK (splitmux);
593         break;
594       case GST_EVENT_FLUSH_START:
595         GST_SPLITMUX_LOCK (splitmux);
596         GST_LOG_OBJECT (pad, "Flush start");
597         ctx->flushing = TRUE;
598         GST_SPLITMUX_BROADCAST (splitmux);
599         GST_SPLITMUX_UNLOCK (splitmux);
600         break;
601       case GST_EVENT_EOS:
602         GST_SPLITMUX_LOCK (splitmux);
603         if (splitmux->state == SPLITMUX_STATE_STOPPED)
604           goto beach;
605         ctx->out_eos = TRUE;
606         GST_SPLITMUX_UNLOCK (splitmux);
607         break;
608       case GST_EVENT_GAP:{
609         GstClockTime gap_ts;
610
611         gst_event_parse_gap (event, &gap_ts, NULL);
612         if (gap_ts == GST_CLOCK_TIME_NONE)
613           break;
614
615         GST_SPLITMUX_LOCK (splitmux);
616
617         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
618             GST_FORMAT_TIME, gap_ts);
619
620         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
621             GST_TIME_ARGS (gap_ts));
622
623         if (splitmux->state == SPLITMUX_STATE_STOPPED)
624           goto beach;
625         ctx->out_running_time = gap_ts;
626         complete_or_wait_on_out (splitmux, ctx);
627         GST_SPLITMUX_UNLOCK (splitmux);
628         break;
629       }
630       case GST_EVENT_CUSTOM_DOWNSTREAM:{
631         const GstStructure *s;
632         GstClockTime ts = 0;
633
634         s = gst_event_get_structure (event);
635         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
636           break;
637
638         gst_structure_get_uint64 (s, "timestamp", &ts);
639
640         GST_SPLITMUX_LOCK (splitmux);
641
642         if (splitmux->state == SPLITMUX_STATE_STOPPED)
643           goto beach;
644         ctx->out_running_time = ts;
645         complete_or_wait_on_out (splitmux, ctx);
646         GST_SPLITMUX_UNLOCK (splitmux);
647         return GST_PAD_PROBE_DROP;
648       }
649       default:
650         break;
651     }
652     return GST_PAD_PROBE_PASS;
653   }
654
655   /* Allow everything through until the configured next stopping point */
656   GST_SPLITMUX_LOCK (splitmux);
657
658   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
659   if (buf_info == NULL)
660     /* Can only happen due to a poorly timed flush */
661     goto beach;
662
663   /* If we have popped a keyframe, decrement the queued_gop count */
664   if (buf_info->keyframe && splitmux->queued_gops > 0)
665     splitmux->queued_gops--;
666
667   ctx->out_running_time = buf_info->run_ts;
668
669   GST_LOG_OBJECT (splitmux,
670       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
671       " size %" G_GSIZE_FORMAT,
672       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
673
674   if (splitmux->opening_first_fragment) {
675     send_fragment_opened_closed_msg (splitmux, TRUE);
676     splitmux->opening_first_fragment = FALSE;
677   }
678
679   complete_or_wait_on_out (splitmux, ctx);
680
681   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
682       splitmux->muxed_out_time < buf_info->run_ts)
683     splitmux->muxed_out_time = buf_info->run_ts;
684
685   splitmux->muxed_out_bytes += buf_info->buf_size;
686
687 #ifndef GST_DISABLE_GST_DEBUG
688   {
689     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
690     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
691         " run ts %" GST_TIME_FORMAT, buf,
692         GST_TIME_ARGS (ctx->out_running_time));
693   }
694 #endif
695
696   GST_SPLITMUX_UNLOCK (splitmux);
697
698   mq_stream_buf_free (buf_info);
699
700   return GST_PAD_PROBE_PASS;
701
702 beach:
703   GST_SPLITMUX_UNLOCK (splitmux);
704   return GST_PAD_PROBE_DROP;
705 }
706
707 static gboolean
708 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
709 {
710   return gst_pad_send_event (peer, gst_event_ref (*event));
711 }
712
713 static void
714 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
715 {
716   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
717
718   gst_pad_sticky_events_foreach (ctx->srcpad,
719       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
720
721   /* Clear EOS flag if not actually EOS */
722   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
723
724   gst_object_unref (peer);
725 }
726
727 /* Called with lock held when a fragment
728  * reaches EOS and it is time to restart
729  * a new fragment
730  */
731 static void
732 start_next_fragment (GstSplitMuxSink * splitmux)
733 {
734   /* 1 change to new file */
735   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
736   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
737
738   set_next_filename (splitmux);
739
740   gst_element_sync_state_with_parent (splitmux->active_sink);
741   gst_element_sync_state_with_parent (splitmux->muxer);
742
743   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
744
745   /* Switch state and go back to processing */
746   if (!splitmux->reference_ctx->in_eos) {
747     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
748     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
749   } else {
750     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
751     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
752     splitmux->have_muxed_something = FALSE;
753   }
754   splitmux->have_muxed_something =
755       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
756
757   /* Store the overflow parameters as the basis for the next fragment */
758   splitmux->mux_start_time = splitmux->muxed_out_time;
759   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
760
761   GST_DEBUG_OBJECT (splitmux,
762       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
763       GST_TIME_ARGS (splitmux->max_out_running_time));
764
765   send_fragment_opened_closed_msg (splitmux, TRUE);
766
767   GST_SPLITMUX_BROADCAST (splitmux);
768 }
769
770 static void
771 bus_handler (GstBin * bin, GstMessage * message)
772 {
773   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
774
775   switch (GST_MESSAGE_TYPE (message)) {
776     case GST_MESSAGE_EOS:
777       /* If the state is draining out the current file, drop this EOS */
778       GST_SPLITMUX_LOCK (splitmux);
779
780       send_fragment_opened_closed_msg (splitmux, FALSE);
781
782       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
783           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
784         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
785         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
786         GST_SPLITMUX_BROADCAST (splitmux);
787
788         gst_message_unref (message);
789         GST_SPLITMUX_UNLOCK (splitmux);
790         return;
791       }
792       GST_SPLITMUX_UNLOCK (splitmux);
793       break;
794     default:
795       break;
796   }
797
798   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
799 }
800
801 /* Called with splitmux lock held */
802 /* Called when entering ProcessingCompleteGop state
803  * Assess if mq contents overflowed the current file
804  *   -> If yes, need to switch to new file
805  *   -> if no, set max_out_running_time to let this GOP in and
806  *      go to COLLECTING_GOP_START state
807  */
808 static void
809 handle_gathered_gop (GstSplitMuxSink * splitmux)
810 {
811   GList *cur;
812   gsize queued_bytes = 0;
813   GstClockTime queued_time = 0;
814   gboolean at_eos = TRUE;
815
816   /* Assess if the multiqueue contents overflowed the current file */
817   for (cur = g_list_first (splitmux->contexts);
818       cur != NULL; cur = g_list_next (cur)) {
819     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
820     if (tmpctx->in_running_time > queued_time)
821       queued_time = tmpctx->in_running_time;
822     queued_bytes += tmpctx->in_bytes;
823     at_eos &= tmpctx->in_eos;
824   }
825
826   g_assert (queued_bytes >= splitmux->mux_start_bytes);
827   g_assert (queued_time >= splitmux->mux_start_time);
828
829   queued_bytes -= splitmux->mux_start_bytes;
830   queued_time -= splitmux->mux_start_time;
831
832   /* Expand queued bytes estimate by muxer overhead */
833   queued_bytes += (queued_bytes * splitmux->mux_overhead);
834
835   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
836       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
837
838   /* Check for overrun - have we output at least one byte and overrun
839    * either threshold? */
840   if ((splitmux->have_muxed_something &&
841           ((splitmux->threshold_bytes > 0 &&
842                   queued_bytes >= splitmux->threshold_bytes) ||
843               (splitmux->threshold_time > 0 &&
844                   queued_time >= splitmux->threshold_time))) || at_eos) {
845
846     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
847
848     GST_INFO_OBJECT (splitmux,
849         "mq overflowed since last, draining out. max out TS is %"
850         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
851     GST_SPLITMUX_BROADCAST (splitmux);
852
853   } else {
854     /* No overflow */
855     GST_LOG_OBJECT (splitmux,
856         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
857         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
858         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
859         queued_bytes, GST_TIME_ARGS (queued_time));
860
861     /* Wake everyone up to push this one GOP, then sleep */
862     splitmux->have_muxed_something = TRUE;
863
864     if (!splitmux->reference_ctx->in_eos) {
865       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
866       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
867     } else {
868       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
869       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
870     }
871
872     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
873         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
874     GST_SPLITMUX_BROADCAST (splitmux);
875   }
876
877 }
878
879 /* Called with splitmux lock held */
880 /* Called from each input pad when it is has all the pieces
881  * for a GOP or EOS, starting with the reference pad which has set the
882  * splitmux->max_in_running_time
883  */
884 static void
885 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
886 {
887   GList *cur;
888   gboolean ready = TRUE;
889   GstClockTime current_max_in_running_time;
890
891   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
892     /* Iterate each pad, and check that the input running time is at least
893      * up to the reference running time, and if so handle the collected GOP */
894     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
895         GST_TIME_FORMAT " ctx %p",
896         GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
897     for (cur = g_list_first (splitmux->contexts); cur != NULL;
898         cur = g_list_next (cur)) {
899       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
900
901       GST_LOG_OBJECT (splitmux,
902           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
903           " EOS %d", tmpctx, tmpctx->srcpad,
904           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
905
906       if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
907           tmpctx->in_running_time < splitmux->max_in_running_time &&
908           !tmpctx->in_eos) {
909         GST_LOG_OBJECT (splitmux,
910             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
911             tmpctx, tmpctx->srcpad);
912         ready = FALSE;
913         break;
914       }
915     }
916     if (ready) {
917       GST_DEBUG_OBJECT (splitmux,
918           "Collected GOP is complete. Processing (ctx %p)", ctx);
919       /* All pads have a complete GOP, release it into the multiqueue */
920       handle_gathered_gop (splitmux);
921     }
922   }
923
924   /* Some pad is not yet ready, or GOP is being pushed
925    * either way, sleep and wait to get woken */
926   current_max_in_running_time = splitmux->max_in_running_time;
927   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
928           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
929       !ctx->flushing &&
930       (current_max_in_running_time == splitmux->max_in_running_time)) {
931
932     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
933         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
934         "GOP complete" : "EOF draining", ctx);
935     GST_SPLITMUX_WAIT (splitmux);
936
937     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
938   }
939 }
940
941 static void
942 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
943 {
944   GList *cur;
945   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
946
947   GST_DEBUG_OBJECT (ctx->sinkpad,
948       "Checking queue length len %u cur_max %u queued gops %u",
949       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
950
951   if (cur_len >= splitmux->mq_max_buffers) {
952     gboolean allow_grow = FALSE;
953
954     /* If collecting a GOP and this pad might block,
955      * and there isn't already a pending GOP in the queue
956      * then grow
957      */
958     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
959         ctx->in_running_time < splitmux->max_in_running_time &&
960         splitmux->queued_gops <= 1) {
961       allow_grow = TRUE;
962     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
963         ctx->is_reference && splitmux->queued_gops <= 1) {
964       allow_grow = TRUE;
965     }
966
967     if (!allow_grow) {
968       for (cur = g_list_first (splitmux->contexts);
969           cur != NULL; cur = g_list_next (cur)) {
970         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
971         GST_DEBUG_OBJECT (tmpctx->sinkpad,
972             " len %u out_blocked %d",
973             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
974         /* If another stream is starving, grow */
975         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
976           allow_grow = TRUE;
977         }
978       }
979     }
980
981     if (allow_grow) {
982       splitmux->mq_max_buffers = cur_len + 1;
983
984       GST_INFO_OBJECT (splitmux,
985           "Multiqueue overrun - enlarging to %u buffers ctx %p",
986           splitmux->mq_max_buffers, ctx);
987
988       g_object_set (splitmux->mq, "max-size-buffers",
989           splitmux->mq_max_buffers, NULL);
990     }
991   }
992 }
993
994 static GstPadProbeReturn
995 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
996 {
997   GstSplitMuxSink *splitmux = ctx->splitmux;
998   GstBuffer *buf;
999   MqStreamBuf *buf_info = NULL;
1000   GstClockTime ts;
1001   gboolean loop_again;
1002   gboolean keyframe = FALSE;
1003
1004   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1005
1006   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1007   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1008     g_warning ("Buffer list handling not implemented");
1009     return GST_PAD_PROBE_DROP;
1010   }
1011   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1012     GstEvent *event = gst_pad_probe_info_get_event (info);
1013     switch (GST_EVENT_TYPE (event)) {
1014       case GST_EVENT_SEGMENT:
1015         gst_event_copy_segment (event, &ctx->in_segment);
1016         break;
1017       case GST_EVENT_FLUSH_STOP:
1018         GST_SPLITMUX_LOCK (splitmux);
1019         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1020         ctx->in_eos = FALSE;
1021         ctx->in_bytes = 0;
1022         ctx->in_running_time = 0;
1023         GST_SPLITMUX_UNLOCK (splitmux);
1024         break;
1025       case GST_EVENT_EOS:
1026         GST_SPLITMUX_LOCK (splitmux);
1027         ctx->in_eos = TRUE;
1028
1029         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1030           goto beach;
1031
1032         if (ctx->is_reference) {
1033           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1034           /* Act as if this is a new keyframe with infinite timestamp */
1035           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
1036           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1037           /* Wake up other input pads to collect this GOP */
1038           GST_SPLITMUX_BROADCAST (splitmux);
1039           check_completed_gop (splitmux, ctx);
1040         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1041           /* If we are waiting for a GOP to be completed (ie, for aux
1042            * pads to catch up), then this pad is complete, so check
1043            * if the whole GOP is.
1044            */
1045           check_completed_gop (splitmux, ctx);
1046         }
1047         GST_SPLITMUX_UNLOCK (splitmux);
1048         break;
1049       default:
1050         break;
1051     }
1052     return GST_PAD_PROBE_PASS;
1053   }
1054
1055   buf = gst_pad_probe_info_get_buffer (info);
1056   buf_info = mq_stream_buf_new ();
1057
1058   if (GST_BUFFER_PTS_IS_VALID (buf))
1059     ts = GST_BUFFER_PTS (buf);
1060   else
1061     ts = GST_BUFFER_DTS (buf);
1062
1063   GST_SPLITMUX_LOCK (splitmux);
1064
1065   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1066     goto beach;
1067
1068   /* If this buffer has a timestamp, advance the input timestamp of the
1069    * stream */
1070   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1071     GstClockTime running_time =
1072         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1073         GST_BUFFER_TIMESTAMP (buf));
1074
1075     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1076         (ctx->in_running_time == GST_CLOCK_TIME_NONE
1077             || running_time > ctx->in_running_time))
1078       ctx->in_running_time = running_time;
1079   }
1080
1081   /* Try to make sure we have a valid running time */
1082   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1083     ctx->in_running_time =
1084         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1085         ctx->in_segment.start);
1086   }
1087
1088   buf_info->run_ts = ctx->in_running_time;
1089   buf_info->buf_size = gst_buffer_get_size (buf);
1090
1091   /* Update total input byte counter for overflow detect */
1092   ctx->in_bytes += buf_info->buf_size;
1093
1094   /* initialize mux_start_time */
1095   if (ctx->is_reference && splitmux->mux_start_time == 0)
1096     splitmux->mux_start_time = buf_info->run_ts;
1097
1098   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1099       " total in_bytes %" G_GSIZE_FORMAT,
1100       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1101
1102   loop_again = TRUE;
1103   do {
1104     if (ctx->flushing)
1105       break;
1106
1107     switch (splitmux->state) {
1108       case SPLITMUX_STATE_COLLECTING_GOP_START:
1109         if (ctx->is_reference) {
1110           /* If a keyframe, we have a complete GOP */
1111           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1112               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1113               splitmux->max_in_running_time >= ctx->in_running_time) {
1114             /* Pass this buffer through */
1115             loop_again = FALSE;
1116             break;
1117           }
1118           GST_INFO_OBJECT (pad,
1119               "Have keyframe with running time %" GST_TIME_FORMAT,
1120               GST_TIME_ARGS (ctx->in_running_time));
1121           keyframe = TRUE;
1122           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1123           splitmux->max_in_running_time = ctx->in_running_time;
1124           /* Wake up other input pads to collect this GOP */
1125           GST_SPLITMUX_BROADCAST (splitmux);
1126           check_completed_gop (splitmux, ctx);
1127         } else {
1128           /* We're still waiting for a keyframe on the reference pad, sleep */
1129           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1130           GST_SPLITMUX_WAIT (splitmux);
1131           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1132               splitmux->state);
1133         }
1134         break;
1135       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1136         /* After a GOP start is found, this buffer might complete the GOP */
1137         /* If we overran the target timestamp, it might be time to process
1138          * the GOP, otherwise bail out for more data
1139          */
1140         GST_LOG_OBJECT (pad,
1141             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1142             GST_TIME_ARGS (ctx->in_running_time),
1143             GST_TIME_ARGS (splitmux->max_in_running_time));
1144
1145         if (ctx->in_running_time < splitmux->max_in_running_time) {
1146           loop_again = FALSE;
1147           break;
1148         }
1149
1150         GST_LOG_OBJECT (pad,
1151             "Collected last packet of GOP. Checking other pads");
1152         check_completed_gop (splitmux, ctx);
1153         break;
1154       case SPLITMUX_STATE_ENDING_FILE:{
1155         GstEvent *event;
1156
1157         /* If somes streams received no buffer during the last GOP that overran,
1158          * because its next buffer has a timestamp bigger than
1159          * ctx->max_in_running_time, its queue is empty. In that case the only
1160          * way to wakeup the output thread is by injecting an event in the
1161          * queue. This usually happen with subtitle streams.
1162          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1163         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1164         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1165             GST_EVENT_TYPE_SERIALIZED,
1166             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1167                 G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
1168         gst_pad_send_event (ctx->sinkpad, event);
1169         /* fallthrough */
1170       }
1171       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1172         /* A fragment is ending, wait until that's done before continuing */
1173         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1174         GST_SPLITMUX_WAIT (splitmux);
1175         GST_DEBUG_OBJECT (pad,
1176             "Done sleeping for fragment restart state now %d", splitmux->state);
1177         break;
1178       default:
1179         loop_again = FALSE;
1180         break;
1181     }
1182   } while (loop_again);
1183
1184   if (keyframe) {
1185     splitmux->queued_gops++;
1186     buf_info->keyframe = TRUE;
1187   }
1188
1189   /* Now add this buffer to the queue just before returning */
1190   g_queue_push_head (&ctx->queued_bufs, buf_info);
1191
1192   /* Check the buffer will fit in the mq */
1193   check_queue_length (splitmux, ctx);
1194
1195   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1196       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1197
1198   GST_SPLITMUX_UNLOCK (splitmux);
1199   return GST_PAD_PROBE_PASS;
1200
1201 beach:
1202   GST_SPLITMUX_UNLOCK (splitmux);
1203   if (buf_info)
1204     mq_stream_buf_free (buf_info);
1205   return GST_PAD_PROBE_PASS;
1206 }
1207
1208 static GstPad *
1209 gst_splitmux_sink_request_new_pad (GstElement * element,
1210     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1211 {
1212   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1213   GstPadTemplate *mux_template = NULL;
1214   GstPad *res = NULL;
1215   GstPad *mq_sink, *mq_src;
1216   gchar *gname;
1217   gboolean is_video = FALSE;
1218   MqStreamCtx *ctx;
1219
1220   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1221
1222   GST_SPLITMUX_LOCK (splitmux);
1223   if (!create_elements (splitmux))
1224     goto fail;
1225
1226   if (templ->name_template) {
1227     if (g_str_equal (templ->name_template, "video")) {
1228       /* FIXME: Look for a pad template with matching caps, rather than by name */
1229       mux_template =
1230           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1231           (splitmux->muxer), "video_%u");
1232       is_video = TRUE;
1233       name = NULL;
1234     } else {
1235       mux_template =
1236           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1237           (splitmux->muxer), templ->name_template);
1238     }
1239     if (mux_template == NULL) {
1240       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1241       mux_template =
1242           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1243           (splitmux->muxer), "sink_%d");
1244     }
1245   }
1246
1247   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1248   if (res == NULL)
1249     goto fail;
1250
1251   if (is_video)
1252     gname = g_strdup ("video");
1253   else if (name == NULL)
1254     gname = gst_pad_get_name (res);
1255   else
1256     gname = g_strdup (name);
1257
1258   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1259     gst_element_release_request_pad (splitmux->muxer, res);
1260     gst_object_unref (GST_OBJECT (res));
1261     goto fail;
1262   }
1263
1264   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1265     gst_element_release_request_pad (splitmux->muxer, res);
1266     gst_object_unref (GST_OBJECT (res));
1267     gst_element_release_request_pad (splitmux->mq, mq_sink);
1268     gst_object_unref (GST_OBJECT (mq_sink));
1269     goto fail;
1270   }
1271
1272   gst_object_unref (GST_OBJECT (res));
1273
1274   ctx = mq_stream_ctx_new (splitmux);
1275   ctx->srcpad = mq_src;
1276   ctx->sinkpad = mq_sink;
1277
1278   mq_stream_ctx_ref (ctx);
1279   ctx->src_pad_block_id =
1280       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1281       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1282       _pad_block_destroy_src_notify);
1283   if (is_video && splitmux->reference_ctx != NULL) {
1284     splitmux->reference_ctx->is_reference = FALSE;
1285     splitmux->reference_ctx = NULL;
1286   }
1287   if (splitmux->reference_ctx == NULL) {
1288     splitmux->reference_ctx = ctx;
1289     ctx->is_reference = TRUE;
1290   }
1291
1292   res = gst_ghost_pad_new (gname, mq_sink);
1293   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1294
1295   mq_stream_ctx_ref (ctx);
1296   ctx->sink_pad_block_id =
1297       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1298       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1299       _pad_block_destroy_sink_notify);
1300
1301   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1302       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1303
1304   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1305
1306   g_free (gname);
1307
1308   gst_object_unref (mq_sink);
1309   gst_object_unref (mq_src);
1310
1311   gst_pad_set_active (res, TRUE);
1312   gst_element_add_pad (element, res);
1313   GST_SPLITMUX_UNLOCK (splitmux);
1314
1315   return res;
1316 fail:
1317   GST_SPLITMUX_UNLOCK (splitmux);
1318   return NULL;
1319 }
1320
1321 static void
1322 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1323 {
1324   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1325   GstPad *mqsink, *mqsrc, *muxpad;
1326   MqStreamCtx *ctx =
1327       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1328
1329   GST_SPLITMUX_LOCK (splitmux);
1330
1331   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1332     goto fail;                  /* Elements don't exist yet - nothing to release */
1333
1334   GST_INFO_OBJECT (pad, "releasing request pad");
1335
1336   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1337   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1338   muxpad = gst_pad_get_peer (mqsrc);
1339
1340   /* Remove the context from our consideration */
1341   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1342
1343   if (ctx->sink_pad_block_id)
1344     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1345
1346   if (ctx->src_pad_block_id)
1347     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1348
1349   /* Can release the context now */
1350   mq_stream_ctx_unref (ctx);
1351
1352   /* Release and free the mq input */
1353   gst_element_release_request_pad (splitmux->mq, mqsink);
1354
1355   /* Release and free the muxer input */
1356   gst_element_release_request_pad (splitmux->muxer, muxpad);
1357
1358   gst_object_unref (mqsink);
1359   gst_object_unref (mqsrc);
1360   gst_object_unref (muxpad);
1361
1362   gst_element_remove_pad (element, pad);
1363
1364   /* Reset the internal elements only after all request pads are released */
1365   if (splitmux->contexts == NULL)
1366     gst_splitmux_reset (splitmux);
1367
1368 fail:
1369   GST_SPLITMUX_UNLOCK (splitmux);
1370 }
1371
1372 static GstElement *
1373 create_element (GstSplitMuxSink * splitmux,
1374     const gchar * factory, const gchar * name)
1375 {
1376   GstElement *ret = gst_element_factory_make (factory, name);
1377   if (ret == NULL) {
1378     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1379     return NULL;
1380   }
1381
1382   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1383     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1384     gst_object_unref (ret);
1385     return NULL;
1386   }
1387
1388   return ret;
1389 }
1390
1391 static gboolean
1392 create_elements (GstSplitMuxSink * splitmux)
1393 {
1394   /* Create internal elements */
1395   if (splitmux->mq == NULL) {
1396     if ((splitmux->mq =
1397             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1398       goto fail;
1399
1400     splitmux->mq_max_buffers = 5;
1401     /* No bytes or time limit, we limit buffers manually */
1402     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1403         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1404   }
1405
1406   if (splitmux->muxer == NULL) {
1407     GstElement *provided_muxer = NULL;
1408
1409     GST_OBJECT_LOCK (splitmux);
1410     if (splitmux->provided_muxer != NULL)
1411       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1412     GST_OBJECT_UNLOCK (splitmux);
1413
1414     if (provided_muxer == NULL) {
1415       if ((splitmux->muxer =
1416               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1417         goto fail;
1418     } else {
1419       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1420         g_warning ("Could not add muxer element - splitmuxsink will not work");
1421         gst_object_unref (provided_muxer);
1422         goto fail;
1423       }
1424
1425       splitmux->muxer = provided_muxer;
1426       gst_object_unref (provided_muxer);
1427     }
1428   }
1429
1430   return TRUE;
1431 fail:
1432   return FALSE;
1433 }
1434
1435 static GstElement *
1436 find_sink (GstElement * e)
1437 {
1438   GstElement *res = NULL;
1439   GstIterator *iter;
1440   gboolean done = FALSE;
1441   GValue data = { 0, };
1442
1443   if (!GST_IS_BIN (e))
1444     return e;
1445
1446   iter = gst_bin_iterate_sinks (GST_BIN (e));
1447   while (!done) {
1448     switch (gst_iterator_next (iter, &data)) {
1449       case GST_ITERATOR_OK:
1450       {
1451         GstElement *child = g_value_get_object (&data);
1452         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1453                 "location") != NULL) {
1454           res = child;
1455           done = TRUE;
1456         }
1457         g_value_reset (&data);
1458         break;
1459       }
1460       case GST_ITERATOR_RESYNC:
1461         gst_iterator_resync (iter);
1462         break;
1463       case GST_ITERATOR_DONE:
1464         done = TRUE;
1465         break;
1466       case GST_ITERATOR_ERROR:
1467         g_assert_not_reached ();
1468         break;
1469     }
1470   }
1471   g_value_unset (&data);
1472   gst_iterator_free (iter);
1473
1474   return res;
1475 }
1476
1477 static gboolean
1478 create_sink (GstSplitMuxSink * splitmux)
1479 {
1480   GstElement *provided_sink = NULL;
1481
1482   if (splitmux->active_sink == NULL) {
1483
1484     GST_OBJECT_LOCK (splitmux);
1485     if (splitmux->provided_sink != NULL)
1486       provided_sink = gst_object_ref (splitmux->provided_sink);
1487     GST_OBJECT_UNLOCK (splitmux);
1488
1489     if (provided_sink == NULL) {
1490       if ((splitmux->sink =
1491               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1492         goto fail;
1493       splitmux->active_sink = splitmux->sink;
1494     } else {
1495       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1496         g_warning ("Could not add sink elements - splitmuxsink will not work");
1497         gst_object_unref (provided_sink);
1498         goto fail;
1499       }
1500
1501       splitmux->active_sink = provided_sink;
1502
1503       /* The bin holds a ref now, we can drop our tmp ref */
1504       gst_object_unref (provided_sink);
1505
1506       /* Find the sink element */
1507       splitmux->sink = find_sink (splitmux->active_sink);
1508       if (splitmux->sink == NULL) {
1509         g_warning
1510             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1511         goto fail;
1512       }
1513     }
1514
1515     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1516       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1517       goto fail;
1518     }
1519   }
1520
1521   return TRUE;
1522 fail:
1523   return FALSE;
1524 }
1525
1526 #ifdef __GNUC__
1527 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1528 #endif
1529 static void
1530 set_next_filename (GstSplitMuxSink * splitmux)
1531 {
1532   gchar *fname = NULL;
1533
1534   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1535       splitmux->fragment_id, &fname);
1536
1537   if (!fname)
1538     fname = splitmux->location ?
1539         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1540
1541   if (fname) {
1542     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1543     g_object_set (splitmux->sink, "location", fname, NULL);
1544     g_free (fname);
1545
1546     splitmux->fragment_id++;
1547   }
1548 }
1549
1550 static GstStateChangeReturn
1551 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1552 {
1553   GstStateChangeReturn ret;
1554   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1555
1556   switch (transition) {
1557     case GST_STATE_CHANGE_NULL_TO_READY:{
1558       GST_SPLITMUX_LOCK (splitmux);
1559       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1560         ret = GST_STATE_CHANGE_FAILURE;
1561         GST_SPLITMUX_UNLOCK (splitmux);
1562         goto beach;
1563       }
1564       GST_SPLITMUX_UNLOCK (splitmux);
1565       splitmux->fragment_id = 0;
1566       set_next_filename (splitmux);
1567       break;
1568     }
1569     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1570       GST_SPLITMUX_LOCK (splitmux);
1571       /* Start by collecting one input on each pad */
1572       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1573       splitmux->max_in_running_time = 0;
1574       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1575       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1576       splitmux->opening_first_fragment = TRUE;
1577       GST_SPLITMUX_UNLOCK (splitmux);
1578       break;
1579     }
1580     case GST_STATE_CHANGE_PAUSED_TO_READY:
1581     case GST_STATE_CHANGE_READY_TO_NULL:
1582       GST_SPLITMUX_LOCK (splitmux);
1583       splitmux->state = SPLITMUX_STATE_STOPPED;
1584       /* Wake up any blocked threads */
1585       GST_LOG_OBJECT (splitmux,
1586           "State change -> NULL or READY. Waking threads");
1587       GST_SPLITMUX_BROADCAST (splitmux);
1588       GST_SPLITMUX_UNLOCK (splitmux);
1589       break;
1590     default:
1591       break;
1592   }
1593
1594   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1595   if (ret == GST_STATE_CHANGE_FAILURE)
1596     goto beach;
1597
1598   switch (transition) {
1599     case GST_STATE_CHANGE_READY_TO_NULL:
1600       GST_SPLITMUX_LOCK (splitmux);
1601       splitmux->fragment_id = 0;
1602       /* Reset internal elements only if no pad contexts are using them */
1603       if (splitmux->contexts == NULL)
1604         gst_splitmux_reset (splitmux);
1605       GST_SPLITMUX_UNLOCK (splitmux);
1606       break;
1607     default:
1608       break;
1609   }
1610
1611 beach:
1612
1613   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1614       ret == GST_STATE_CHANGE_FAILURE) {
1615     /* Cleanup elements on failed transition out of NULL */
1616     gst_splitmux_reset (splitmux);
1617   }
1618   return ret;
1619 }
1620
1621 gboolean
1622 register_splitmuxsink (GstPlugin * plugin)
1623 {
1624   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1625       "Split File Muxing Sink");
1626
1627   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1628       GST_TYPE_SPLITMUX_SINK);
1629 }