splitmux: Recheck state after unlocking mutex.
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
59
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
62
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
67
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_MAX_SIZE_TIME,
73   PROP_MAX_SIZE_BYTES,
74   PROP_MAX_FILES,
75   PROP_MUXER_OVERHEAD,
76   PROP_MUXER,
77   PROP_SINK
78 };
79
80 #define DEFAULT_MAX_SIZE_TIME       0
81 #define DEFAULT_MAX_SIZE_BYTES      0
82 #define DEFAULT_MAX_FILES           0
83 #define DEFAULT_MUXER_OVERHEAD      0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
86
87 enum
88 {
89   SIGNAL_FORMAT_LOCATION,
90   SIGNAL_LAST
91 };
92
93 static guint signals[SIGNAL_LAST];
94
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
97     GST_PAD_SINK,
98     GST_PAD_REQUEST,
99     GST_STATIC_CAPS_ANY);
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
102     GST_PAD_SINK,
103     GST_PAD_REQUEST,
104     GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
107     GST_PAD_SINK,
108     GST_PAD_REQUEST,
109     GST_STATIC_CAPS_ANY);
110
111 static GQuark PAD_CONTEXT;
112
113 static void
114 _do_init (void)
115 {
116   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
117 }
118
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
121     _do_init ());
122
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126     const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128     GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
131
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
135
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137     element, GstStateChange transition);
138
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
144
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
146
147 static MqStreamBuf *
148 mq_stream_buf_new (void)
149 {
150   return g_slice_new0 (MqStreamBuf);
151 }
152
153 static void
154 mq_stream_buf_free (MqStreamBuf * data)
155 {
156   g_slice_free (MqStreamBuf, data);
157 }
158
159 static void
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
161 {
162   GObjectClass *gobject_class = (GObjectClass *) klass;
163   GstElementClass *gstelement_class = (GstElementClass *) klass;
164   GstBinClass *gstbin_class = (GstBinClass *) klass;
165
166   gobject_class->set_property = gst_splitmux_sink_set_property;
167   gobject_class->get_property = gst_splitmux_sink_get_property;
168   gobject_class->dispose = gst_splitmux_sink_dispose;
169   gobject_class->finalize = gst_splitmux_sink_finalize;
170
171   gst_element_class_set_static_metadata (gstelement_class,
172       "Split Muxing Bin", "Generic/Bin/Muxer",
173       "Convenience bin that muxes incoming streams into multiple time/size limited files",
174       "Jan Schmidt <jan@centricular.com>");
175
176   gst_element_class_add_static_pad_template (gstelement_class,
177       &video_sink_template);
178   gst_element_class_add_static_pad_template (gstelement_class,
179       &audio_sink_template);
180   gst_element_class_add_static_pad_template (gstelement_class,
181       &subtitle_sink_template);
182
183   gstelement_class->change_state =
184       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185   gstelement_class->request_new_pad =
186       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187   gstelement_class->release_pad =
188       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
189
190   gstbin_class->handle_message = bus_handler;
191
192   g_object_class_install_property (gobject_class, PROP_LOCATION,
193       g_param_spec_string ("location", "File Output Pattern",
194           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197       g_param_spec_double ("mux-overhead", "Muxing Overhead",
198           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199           DEFAULT_MUXER_OVERHEAD,
200           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
201
202   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211       g_param_spec_uint ("max-files", "Max files",
212           "Maximum number of files to keep on disk. Once the maximum is reached,"
213           "old files start to be deleted to make room for new ones.",
214           0, G_MAXUINT, DEFAULT_MAX_FILES,
215           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
216
217
218   g_object_class_install_property (gobject_class, PROP_MUXER,
219       g_param_spec_object ("muxer", "Muxer",
220           "The muxer element to use (NULL = default mp4mux)",
221           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_SINK,
223       g_param_spec_object ("sink", "Sink",
224           "The sink element (or element chain) to use (NULL = default filesink)",
225           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   /**
228    * GstSplitMuxSink::format-location:
229    * @splitmux: the #GstSplitMuxSink
230    * @fragment_id: the sequence number of the file to be created
231    *
232    * Returns: the location to be used for the next output file
233    */
234   signals[SIGNAL_FORMAT_LOCATION] =
235       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
237 }
238
239 static void
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
241 {
242   g_mutex_init (&splitmux->lock);
243   g_cond_init (&splitmux->data_cond);
244
245   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248   splitmux->max_files = DEFAULT_MAX_FILES;
249
250   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251   g_object_set (splitmux, "async-handling", TRUE, NULL);
252 }
253
254 static void
255 gst_splitmux_reset (GstSplitMuxSink * splitmux)
256 {
257   if (splitmux->mq)
258     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
259   if (splitmux->muxer)
260     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
261   if (splitmux->active_sink)
262     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
263
264   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
265       NULL;
266 }
267
268 static void
269 gst_splitmux_sink_dispose (GObject * object)
270 {
271   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
272
273   G_OBJECT_CLASS (parent_class)->dispose (object);
274
275   /* Calling parent dispose invalidates all child pointers */
276   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
277       NULL;
278 }
279
280 static void
281 gst_splitmux_sink_finalize (GObject * object)
282 {
283   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
284   g_cond_clear (&splitmux->data_cond);
285   g_mutex_clear (&splitmux->lock);
286   if (splitmux->provided_sink)
287     gst_object_unref (splitmux->provided_sink);
288   if (splitmux->provided_muxer)
289     gst_object_unref (splitmux->provided_muxer);
290
291   g_free (splitmux->location);
292
293   /* Make sure to free any un-released contexts */
294   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
295   g_list_free (splitmux->contexts);
296
297   G_OBJECT_CLASS (parent_class)->finalize (object);
298 }
299
300 static void
301 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
302     const GValue * value, GParamSpec * pspec)
303 {
304   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
305
306   switch (prop_id) {
307     case PROP_LOCATION:{
308       GST_OBJECT_LOCK (splitmux);
309       g_free (splitmux->location);
310       splitmux->location = g_value_dup_string (value);
311       GST_OBJECT_UNLOCK (splitmux);
312       break;
313     }
314     case PROP_MAX_SIZE_BYTES:
315       GST_OBJECT_LOCK (splitmux);
316       splitmux->threshold_bytes = g_value_get_uint64 (value);
317       GST_OBJECT_UNLOCK (splitmux);
318       break;
319     case PROP_MAX_SIZE_TIME:
320       GST_OBJECT_LOCK (splitmux);
321       splitmux->threshold_time = g_value_get_uint64 (value);
322       GST_OBJECT_UNLOCK (splitmux);
323       break;
324     case PROP_MAX_FILES:
325       GST_OBJECT_LOCK (splitmux);
326       splitmux->max_files = g_value_get_uint (value);
327       GST_OBJECT_UNLOCK (splitmux);
328       break;
329     case PROP_MUXER_OVERHEAD:
330       GST_OBJECT_LOCK (splitmux);
331       splitmux->mux_overhead = g_value_get_double (value);
332       GST_OBJECT_UNLOCK (splitmux);
333       break;
334     case PROP_SINK:
335       GST_OBJECT_LOCK (splitmux);
336       if (splitmux->provided_sink)
337         gst_object_unref (splitmux->provided_sink);
338       splitmux->provided_sink = g_value_dup_object (value);
339       GST_OBJECT_UNLOCK (splitmux);
340       break;
341     case PROP_MUXER:
342       GST_OBJECT_LOCK (splitmux);
343       if (splitmux->provided_muxer)
344         gst_object_unref (splitmux->provided_muxer);
345       splitmux->provided_muxer = g_value_dup_object (value);
346       GST_OBJECT_UNLOCK (splitmux);
347       break;
348     default:
349       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
350       break;
351   }
352 }
353
354 static void
355 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
356     GValue * value, GParamSpec * pspec)
357 {
358   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
359
360   switch (prop_id) {
361     case PROP_LOCATION:
362       GST_OBJECT_LOCK (splitmux);
363       g_value_set_string (value, splitmux->location);
364       GST_OBJECT_UNLOCK (splitmux);
365       break;
366     case PROP_MAX_SIZE_BYTES:
367       GST_OBJECT_LOCK (splitmux);
368       g_value_set_uint64 (value, splitmux->threshold_bytes);
369       GST_OBJECT_UNLOCK (splitmux);
370       break;
371     case PROP_MAX_SIZE_TIME:
372       GST_OBJECT_LOCK (splitmux);
373       g_value_set_uint64 (value, splitmux->threshold_time);
374       GST_OBJECT_UNLOCK (splitmux);
375       break;
376     case PROP_MAX_FILES:
377       GST_OBJECT_LOCK (splitmux);
378       g_value_set_uint (value, splitmux->max_files);
379       GST_OBJECT_UNLOCK (splitmux);
380       break;
381     case PROP_MUXER_OVERHEAD:
382       GST_OBJECT_LOCK (splitmux);
383       g_value_set_double (value, splitmux->mux_overhead);
384       GST_OBJECT_UNLOCK (splitmux);
385       break;
386     case PROP_SINK:
387       GST_OBJECT_LOCK (splitmux);
388       g_value_set_object (value, splitmux->provided_sink);
389       GST_OBJECT_UNLOCK (splitmux);
390       break;
391     case PROP_MUXER:
392       GST_OBJECT_LOCK (splitmux);
393       g_value_set_object (value, splitmux->provided_muxer);
394       GST_OBJECT_UNLOCK (splitmux);
395       break;
396     default:
397       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
398       break;
399   }
400 }
401
402 /* Convenience function */
403 static inline GstClockTimeDiff
404 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
405 {
406   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
407
408   if (GST_CLOCK_TIME_IS_VALID (val)) {
409     gboolean sign =
410         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
411     if (sign > 0)
412       res = val;
413     else if (sign < 0)
414       res = -val;
415   }
416   return res;
417 }
418
419 static GstPad *
420 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
421 {
422   gchar *tmp, *sinkname, *srcname;
423   GstPad *mq_src;
424
425   sinkname = gst_pad_get_name (sink_pad);
426   tmp = sinkname + 5;
427   srcname = g_strdup_printf ("src_%s", tmp);
428
429   mq_src = gst_element_get_static_pad (mq, srcname);
430
431   g_free (sinkname);
432   g_free (srcname);
433
434   return mq_src;
435 }
436
437 static gboolean
438 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
439     GstPad ** src_pad)
440 {
441   GstPad *mq_sink;
442   GstPad *mq_src;
443
444   /* Request a pad from multiqueue, then connect this one, then
445    * discover the corresponding output pad and return both */
446   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
447   if (mq_sink == NULL)
448     return FALSE;
449
450   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
451   if (mq_src == NULL)
452     goto fail;
453
454   *sink_pad = mq_sink;
455   *src_pad = mq_src;
456
457   return TRUE;
458
459 fail:
460   gst_element_release_request_pad (splitmux->mq, mq_sink);
461   return FALSE;
462 }
463
464 static MqStreamCtx *
465 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
466 {
467   MqStreamCtx *ctx;
468
469   ctx = g_new0 (MqStreamCtx, 1);
470   g_atomic_int_set (&ctx->refcount, 1);
471   ctx->splitmux = splitmux;
472   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
473   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
474   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
475   g_queue_init (&ctx->queued_bufs);
476   return ctx;
477 }
478
479 static void
480 mq_stream_ctx_free (MqStreamCtx * ctx)
481 {
482   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
483   g_queue_clear (&ctx->queued_bufs);
484   g_free (ctx);
485 }
486
487 static void
488 mq_stream_ctx_unref (MqStreamCtx * ctx)
489 {
490   if (g_atomic_int_dec_and_test (&ctx->refcount))
491     mq_stream_ctx_free (ctx);
492 }
493
494 static void
495 mq_stream_ctx_ref (MqStreamCtx * ctx)
496 {
497   g_atomic_int_inc (&ctx->refcount);
498 }
499
500 static void
501 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
502 {
503   ctx->sink_pad_block_id = 0;
504   mq_stream_ctx_unref (ctx);
505 }
506
507 static void
508 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
509 {
510   ctx->src_pad_block_id = 0;
511   mq_stream_ctx_unref (ctx);
512 }
513
514 static void
515 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
516 {
517   gchar *location = NULL;
518   GstMessage *msg;
519   const gchar *msg_name = opened ?
520       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
521
522   g_object_get (splitmux->sink, "location", &location, NULL);
523
524   msg = gst_message_new_element (GST_OBJECT (splitmux),
525       gst_structure_new (msg_name,
526           "location", G_TYPE_STRING, location,
527           "running-time", GST_TYPE_CLOCK_TIME,
528           splitmux->reference_ctx->out_running_time, NULL));
529   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
530
531   g_free (location);
532 }
533
534 /* Called with lock held, drops the lock to send EOS to the
535  * pad
536  */
537 static void
538 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
539 {
540   GstEvent *eos;
541   GstPad *pad;
542
543   eos = gst_event_new_eos ();
544   pad = gst_pad_get_peer (ctx->srcpad);
545
546   ctx->out_eos = TRUE;
547
548   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
549   GST_SPLITMUX_UNLOCK (splitmux);
550   gst_pad_send_event (pad, eos);
551   GST_SPLITMUX_LOCK (splitmux);
552
553   gst_object_unref (pad);
554 }
555
556 /* Called with splitmux lock held to check if this output
557  * context needs to sleep to wait for the release of the
558  * next GOP, or to send EOS to close out the current file
559  */
560 static void
561 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
562 {
563   do {
564
565     GST_LOG_OBJECT (ctx->srcpad,
566         "Checking running time %" GST_STIME_FORMAT " against max %"
567         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
568         GST_STIME_ARGS (splitmux->max_out_running_time));
569
570     if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
571         ctx->out_running_time < splitmux->max_out_running_time) {
572       splitmux->have_muxed_something = TRUE;
573       return;
574     }
575
576     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
577       return;
578
579     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
580       if (ctx->out_eos == FALSE) {
581         send_eos (splitmux, ctx);
582         continue;
583       }
584     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
585       start_next_fragment (splitmux);
586       continue;
587     }
588
589     GST_INFO_OBJECT (ctx->srcpad,
590         "Sleeping for running time %"
591         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
592         GST_STIME_ARGS (ctx->out_running_time),
593         GST_STIME_ARGS (splitmux->max_out_running_time));
594     ctx->out_blocked = TRUE;
595     /* Expand the mq if needed before sleeping */
596     check_queue_length (splitmux, ctx);
597     GST_SPLITMUX_WAIT (splitmux);
598     ctx->out_blocked = FALSE;
599     GST_INFO_OBJECT (ctx->srcpad,
600         "Woken for new max running time %" GST_STIME_FORMAT,
601         GST_STIME_ARGS (splitmux->max_out_running_time));
602   } while (1);
603 }
604
605 static GstPadProbeReturn
606 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
607 {
608   GstSplitMuxSink *splitmux = ctx->splitmux;
609   MqStreamBuf *buf_info = NULL;
610
611   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
612
613   /* FIXME: Handle buffer lists, until then make it clear they won't work */
614   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
615     g_warning ("Buffer list handling not implemented");
616     return GST_PAD_PROBE_DROP;
617   }
618   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
619     GstEvent *event = gst_pad_probe_info_get_event (info);
620
621     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
622
623     switch (GST_EVENT_TYPE (event)) {
624       case GST_EVENT_SEGMENT:
625         gst_event_copy_segment (event, &ctx->out_segment);
626         break;
627       case GST_EVENT_FLUSH_STOP:
628         GST_SPLITMUX_LOCK (splitmux);
629         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
630         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
631         g_queue_clear (&ctx->queued_bufs);
632         ctx->flushing = FALSE;
633         GST_SPLITMUX_UNLOCK (splitmux);
634         break;
635       case GST_EVENT_FLUSH_START:
636         GST_SPLITMUX_LOCK (splitmux);
637         GST_LOG_OBJECT (pad, "Flush start");
638         ctx->flushing = TRUE;
639         GST_SPLITMUX_BROADCAST (splitmux);
640         GST_SPLITMUX_UNLOCK (splitmux);
641         break;
642       case GST_EVENT_EOS:
643         GST_SPLITMUX_LOCK (splitmux);
644         if (splitmux->state == SPLITMUX_STATE_STOPPED)
645           goto beach;
646         ctx->out_eos = TRUE;
647         GST_SPLITMUX_UNLOCK (splitmux);
648         break;
649       case GST_EVENT_GAP:{
650         GstClockTime gap_ts;
651         GstClockTimeDiff rtime;
652
653         gst_event_parse_gap (event, &gap_ts, NULL);
654         if (gap_ts == GST_CLOCK_TIME_NONE)
655           break;
656
657         GST_SPLITMUX_LOCK (splitmux);
658
659         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
660
661         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
662             GST_STIME_ARGS (rtime));
663
664         if (splitmux->state == SPLITMUX_STATE_STOPPED)
665           goto beach;
666
667         if (rtime != GST_CLOCK_STIME_NONE) {
668           ctx->out_running_time = rtime;
669           complete_or_wait_on_out (splitmux, ctx);
670         }
671         GST_SPLITMUX_UNLOCK (splitmux);
672         break;
673       }
674       case GST_EVENT_CUSTOM_DOWNSTREAM:{
675         const GstStructure *s;
676         GstClockTimeDiff ts = 0;
677
678         s = gst_event_get_structure (event);
679         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
680           break;
681
682         gst_structure_get_int64 (s, "timestamp", &ts);
683
684         GST_SPLITMUX_LOCK (splitmux);
685
686         if (splitmux->state == SPLITMUX_STATE_STOPPED)
687           goto beach;
688         ctx->out_running_time = ts;
689         complete_or_wait_on_out (splitmux, ctx);
690         GST_SPLITMUX_UNLOCK (splitmux);
691         return GST_PAD_PROBE_DROP;
692       }
693       default:
694         break;
695     }
696     return GST_PAD_PROBE_PASS;
697   }
698
699   /* Allow everything through until the configured next stopping point */
700   GST_SPLITMUX_LOCK (splitmux);
701
702   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
703   if (buf_info == NULL)
704     /* Can only happen due to a poorly timed flush */
705     goto beach;
706
707   /* If we have popped a keyframe, decrement the queued_gop count */
708   if (buf_info->keyframe && splitmux->queued_gops > 0)
709     splitmux->queued_gops--;
710
711   ctx->out_running_time = buf_info->run_ts;
712
713   GST_LOG_OBJECT (splitmux,
714       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
715       " size %" G_GSIZE_FORMAT,
716       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
717
718   if (splitmux->opening_first_fragment) {
719     send_fragment_opened_closed_msg (splitmux, TRUE);
720     splitmux->opening_first_fragment = FALSE;
721   }
722
723   complete_or_wait_on_out (splitmux, ctx);
724
725   if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
726       splitmux->muxed_out_time < buf_info->run_ts)
727     splitmux->muxed_out_time = buf_info->run_ts;
728
729   splitmux->muxed_out_bytes += buf_info->buf_size;
730
731 #ifndef GST_DISABLE_GST_DEBUG
732   {
733     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
734     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
735         " run ts %" GST_STIME_FORMAT, buf,
736         GST_STIME_ARGS (ctx->out_running_time));
737   }
738 #endif
739
740   GST_SPLITMUX_UNLOCK (splitmux);
741
742   mq_stream_buf_free (buf_info);
743
744   return GST_PAD_PROBE_PASS;
745
746 beach:
747   GST_SPLITMUX_UNLOCK (splitmux);
748   return GST_PAD_PROBE_DROP;
749 }
750
751 static gboolean
752 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
753 {
754   return gst_pad_send_event (peer, gst_event_ref (*event));
755 }
756
757 static void
758 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
759 {
760   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
761
762   gst_pad_sticky_events_foreach (ctx->srcpad,
763       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
764
765   /* Clear EOS flag if not actually EOS */
766   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
767
768   gst_object_unref (peer);
769 }
770
771 /* Called with lock held when a fragment
772  * reaches EOS and it is time to restart
773  * a new fragment
774  */
775 static void
776 start_next_fragment (GstSplitMuxSink * splitmux)
777 {
778   /* 1 change to new file */
779   gst_element_set_locked_state (splitmux->muxer, TRUE);
780   gst_element_set_locked_state (splitmux->active_sink, TRUE);
781   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
782   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
783
784   set_next_filename (splitmux);
785
786   gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
787   gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
788   gst_element_set_locked_state (splitmux->muxer, FALSE);
789   gst_element_set_locked_state (splitmux->active_sink, FALSE);
790
791   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
792
793   /* Switch state and go back to processing */
794   if (!splitmux->reference_ctx->in_eos) {
795     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
796     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
797   } else {
798     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
799     splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
800     splitmux->have_muxed_something = FALSE;
801   }
802   splitmux->have_muxed_something =
803       (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
804
805   /* Store the overflow parameters as the basis for the next fragment */
806   splitmux->mux_start_time = splitmux->muxed_out_time;
807   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
808
809   GST_DEBUG_OBJECT (splitmux,
810       "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
811       GST_STIME_ARGS (splitmux->max_out_running_time));
812
813   send_fragment_opened_closed_msg (splitmux, TRUE);
814
815   GST_SPLITMUX_BROADCAST (splitmux);
816 }
817
818 static void
819 bus_handler (GstBin * bin, GstMessage * message)
820 {
821   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
822
823   switch (GST_MESSAGE_TYPE (message)) {
824     case GST_MESSAGE_EOS:
825       /* If the state is draining out the current file, drop this EOS */
826       GST_SPLITMUX_LOCK (splitmux);
827
828       send_fragment_opened_closed_msg (splitmux, FALSE);
829
830       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
831           splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
832         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
833         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
834         GST_SPLITMUX_BROADCAST (splitmux);
835
836         gst_message_unref (message);
837         GST_SPLITMUX_UNLOCK (splitmux);
838         return;
839       }
840       GST_SPLITMUX_UNLOCK (splitmux);
841       break;
842     default:
843       break;
844   }
845
846   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
847 }
848
849 /* Called with splitmux lock held */
850 /* Called when entering ProcessingCompleteGop state
851  * Assess if mq contents overflowed the current file
852  *   -> If yes, need to switch to new file
853  *   -> if no, set max_out_running_time to let this GOP in and
854  *      go to COLLECTING_GOP_START state
855  */
856 static void
857 handle_gathered_gop (GstSplitMuxSink * splitmux)
858 {
859   GList *cur;
860   gsize queued_bytes = 0;
861   GstClockTimeDiff queued_time = 0;
862
863   /* Assess if the multiqueue contents overflowed the current file */
864   for (cur = g_list_first (splitmux->contexts);
865       cur != NULL; cur = g_list_next (cur)) {
866     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
867     if (tmpctx->in_running_time > queued_time)
868       queued_time = tmpctx->in_running_time;
869     queued_bytes += tmpctx->in_bytes;
870   }
871
872   g_assert (queued_bytes >= splitmux->mux_start_bytes);
873   g_assert (queued_time >= splitmux->mux_start_time);
874
875   queued_bytes -= splitmux->mux_start_bytes;
876   queued_time -= splitmux->mux_start_time;
877
878   /* Expand queued bytes estimate by muxer overhead */
879   queued_bytes += (queued_bytes * splitmux->mux_overhead);
880
881   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
882       " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
883
884   /* Check for overrun - have we output at least one byte and overrun
885    * either threshold? */
886   if ((splitmux->have_muxed_something &&
887           ((splitmux->threshold_bytes > 0 &&
888                   queued_bytes >= splitmux->threshold_bytes) ||
889               (splitmux->threshold_time > 0 &&
890                   queued_time >= splitmux->threshold_time)))) {
891
892     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
893
894     GST_INFO_OBJECT (splitmux,
895         "mq overflowed since last, draining out. max out TS is %"
896         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
897     GST_SPLITMUX_BROADCAST (splitmux);
898
899   } else {
900     /* No overflow */
901     GST_LOG_OBJECT (splitmux,
902         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
903         " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
904         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
905         queued_bytes, GST_STIME_ARGS (queued_time));
906
907     /* Wake everyone up to push this one GOP, then sleep */
908     splitmux->have_muxed_something = TRUE;
909
910     if (!splitmux->reference_ctx->in_eos) {
911       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
912       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
913     } else {
914       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
915       splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
916     }
917
918     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
919         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
920     GST_SPLITMUX_BROADCAST (splitmux);
921   }
922
923 }
924
925 /* Called with splitmux lock held */
926 /* Called from each input pad when it is has all the pieces
927  * for a GOP or EOS, starting with the reference pad which has set the
928  * splitmux->max_in_running_time
929  */
930 static void
931 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
932 {
933   GList *cur;
934   gboolean ready = TRUE;
935   GstClockTimeDiff current_max_in_running_time;
936
937   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
938     /* Iterate each pad, and check that the input running time is at least
939      * up to the reference running time, and if so handle the collected GOP */
940     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
941         GST_STIME_FORMAT " ctx %p",
942         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
943     for (cur = g_list_first (splitmux->contexts); cur != NULL;
944         cur = g_list_next (cur)) {
945       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
946
947       GST_LOG_OBJECT (splitmux,
948           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
949           " EOS %d", tmpctx, tmpctx->srcpad,
950           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
951
952       if (splitmux->max_in_running_time != G_MAXINT64 &&
953           tmpctx->in_running_time < splitmux->max_in_running_time &&
954           !tmpctx->in_eos) {
955         GST_LOG_OBJECT (splitmux,
956             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
957             tmpctx, tmpctx->srcpad);
958         ready = FALSE;
959         break;
960       }
961     }
962     if (ready) {
963       GST_DEBUG_OBJECT (splitmux,
964           "Collected GOP is complete. Processing (ctx %p)", ctx);
965       /* All pads have a complete GOP, release it into the multiqueue */
966       handle_gathered_gop (splitmux);
967     }
968   }
969
970   /* If upstream reached EOS we are not expecting more data, no need to wait
971    * here. */
972   if (ctx->in_eos)
973     return;
974
975   /* Some pad is not yet ready, or GOP is being pushed
976    * either way, sleep and wait to get woken */
977   current_max_in_running_time = splitmux->max_in_running_time;
978   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
979           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
980       !ctx->flushing &&
981       (current_max_in_running_time == splitmux->max_in_running_time)) {
982
983     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
984         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
985         "GOP complete" : "EOF draining", ctx);
986     GST_SPLITMUX_WAIT (splitmux);
987
988     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
989   }
990 }
991
992 static void
993 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
994 {
995   GList *cur;
996   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
997
998   GST_DEBUG_OBJECT (ctx->sinkpad,
999       "Checking queue length len %u cur_max %u queued gops %u",
1000       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
1001
1002   if (cur_len >= splitmux->mq_max_buffers) {
1003     gboolean allow_grow = FALSE;
1004
1005     /* If collecting a GOP and this pad might block,
1006      * and there isn't already a pending GOP in the queue
1007      * then grow
1008      */
1009     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
1010         ctx->in_running_time < splitmux->max_in_running_time &&
1011         splitmux->queued_gops <= 1) {
1012       allow_grow = TRUE;
1013     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
1014         ctx->is_reference && splitmux->queued_gops <= 1) {
1015       allow_grow = TRUE;
1016     }
1017
1018     if (!allow_grow) {
1019       for (cur = g_list_first (splitmux->contexts);
1020           cur != NULL; cur = g_list_next (cur)) {
1021         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1022         GST_DEBUG_OBJECT (tmpctx->sinkpad,
1023             " len %u out_blocked %d",
1024             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1025         /* If another stream is starving, grow */
1026         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1027           allow_grow = TRUE;
1028         }
1029       }
1030     }
1031
1032     if (allow_grow) {
1033       splitmux->mq_max_buffers = cur_len + 1;
1034
1035       GST_INFO_OBJECT (splitmux,
1036           "Multiqueue overrun - enlarging to %u buffers ctx %p",
1037           splitmux->mq_max_buffers, ctx);
1038
1039       g_object_set (splitmux->mq, "max-size-buffers",
1040           splitmux->mq_max_buffers, NULL);
1041     }
1042   }
1043 }
1044
1045 static GstPadProbeReturn
1046 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1047 {
1048   GstSplitMuxSink *splitmux = ctx->splitmux;
1049   GstBuffer *buf;
1050   MqStreamBuf *buf_info = NULL;
1051   GstClockTime ts;
1052   gboolean loop_again;
1053   gboolean keyframe = FALSE;
1054
1055   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1056
1057   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1058   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1059     g_warning ("Buffer list handling not implemented");
1060     return GST_PAD_PROBE_DROP;
1061   }
1062   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1063     GstEvent *event = gst_pad_probe_info_get_event (info);
1064     switch (GST_EVENT_TYPE (event)) {
1065       case GST_EVENT_SEGMENT:
1066         gst_event_copy_segment (event, &ctx->in_segment);
1067         break;
1068       case GST_EVENT_FLUSH_STOP:
1069         GST_SPLITMUX_LOCK (splitmux);
1070         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1071         ctx->in_eos = FALSE;
1072         ctx->in_bytes = 0;
1073         ctx->in_running_time = GST_CLOCK_STIME_NONE;
1074         GST_SPLITMUX_UNLOCK (splitmux);
1075         break;
1076       case GST_EVENT_EOS:
1077         GST_SPLITMUX_LOCK (splitmux);
1078         ctx->in_eos = TRUE;
1079
1080         if (splitmux->state == SPLITMUX_STATE_STOPPED)
1081           goto beach;
1082
1083         if (ctx->is_reference) {
1084           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1085           /* Act as if this is a new keyframe with infinite timestamp */
1086           splitmux->max_in_running_time = G_MAXINT64;
1087           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1088           /* Wake up other input pads to collect this GOP */
1089           GST_SPLITMUX_BROADCAST (splitmux);
1090           check_completed_gop (splitmux, ctx);
1091         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1092           /* If we are waiting for a GOP to be completed (ie, for aux
1093            * pads to catch up), then this pad is complete, so check
1094            * if the whole GOP is.
1095            */
1096           check_completed_gop (splitmux, ctx);
1097         }
1098         GST_SPLITMUX_UNLOCK (splitmux);
1099         break;
1100       default:
1101         break;
1102     }
1103     return GST_PAD_PROBE_PASS;
1104   }
1105
1106   buf = gst_pad_probe_info_get_buffer (info);
1107   buf_info = mq_stream_buf_new ();
1108
1109   if (GST_BUFFER_PTS_IS_VALID (buf))
1110     ts = GST_BUFFER_PTS (buf);
1111   else
1112     ts = GST_BUFFER_DTS (buf);
1113
1114   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1115
1116   GST_SPLITMUX_LOCK (splitmux);
1117
1118   if (splitmux->state == SPLITMUX_STATE_STOPPED)
1119     goto beach;
1120
1121   /* If this buffer has a timestamp, advance the input timestamp of the
1122    * stream */
1123   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1124     GstClockTimeDiff running_time =
1125         my_segment_to_running_time (&ctx->in_segment,
1126         GST_BUFFER_TIMESTAMP (buf));
1127
1128     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1129         GST_STIME_ARGS (running_time));
1130
1131     if (GST_CLOCK_STIME_IS_VALID (running_time)
1132         && running_time > ctx->in_running_time)
1133       ctx->in_running_time = running_time;
1134   }
1135
1136   /* Try to make sure we have a valid running time */
1137   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1138     ctx->in_running_time =
1139         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1140   }
1141
1142   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1143       GST_STIME_ARGS (ctx->in_running_time));
1144
1145   buf_info->run_ts = ctx->in_running_time;
1146   buf_info->buf_size = gst_buffer_get_size (buf);
1147
1148   /* Update total input byte counter for overflow detect */
1149   ctx->in_bytes += buf_info->buf_size;
1150
1151   /* initialize mux_start_time */
1152   if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
1153     splitmux->mux_start_time = buf_info->run_ts;
1154     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1155         GST_STIME_ARGS (splitmux->mux_start_time));
1156     /* Also take this as the first start time when starting up,
1157      * so that we start counting overflow from the first frame */
1158     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1159       splitmux->max_in_running_time = splitmux->mux_start_time;
1160   }
1161
1162   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1163       " total in_bytes %" G_GSIZE_FORMAT,
1164       GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1165
1166   loop_again = TRUE;
1167   do {
1168     if (ctx->flushing)
1169       break;
1170
1171     switch (splitmux->state) {
1172       case SPLITMUX_STATE_COLLECTING_GOP_START:
1173         if (ctx->is_reference) {
1174           /* If a keyframe, we have a complete GOP */
1175           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1176               !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1177               splitmux->max_in_running_time >= ctx->in_running_time) {
1178             /* Pass this buffer through */
1179             loop_again = FALSE;
1180             break;
1181           }
1182           GST_INFO_OBJECT (pad,
1183               "Have keyframe with running time %" GST_STIME_FORMAT,
1184               GST_STIME_ARGS (ctx->in_running_time));
1185           keyframe = TRUE;
1186           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1187           splitmux->max_in_running_time = ctx->in_running_time;
1188           /* Wake up other input pads to collect this GOP */
1189           GST_SPLITMUX_BROADCAST (splitmux);
1190           check_completed_gop (splitmux, ctx);
1191         } else {
1192           /* We're still waiting for a keyframe on the reference pad, sleep */
1193           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1194           GST_SPLITMUX_WAIT (splitmux);
1195           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1196               splitmux->state);
1197         }
1198         break;
1199       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1200
1201         /* If we overran the target timestamp, it might be time to process
1202          * the GOP, otherwise bail out for more data
1203          */
1204         GST_LOG_OBJECT (pad,
1205             "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
1206             GST_STIME_ARGS (ctx->in_running_time),
1207             GST_STIME_ARGS (splitmux->max_in_running_time));
1208
1209         if (ctx->in_running_time < splitmux->max_in_running_time) {
1210           loop_again = FALSE;
1211           break;
1212         }
1213
1214         GST_LOG_OBJECT (pad,
1215             "Collected last packet of GOP. Checking other pads");
1216         check_completed_gop (splitmux, ctx);
1217         break;
1218       case SPLITMUX_STATE_ENDING_FILE:{
1219         GstEvent *event;
1220
1221         /* If somes streams received no buffer during the last GOP that overran,
1222          * because its next buffer has a timestamp bigger than
1223          * ctx->max_in_running_time, its queue is empty. In that case the only
1224          * way to wakeup the output thread is by injecting an event in the
1225          * queue. This usually happen with subtitle streams.
1226          * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1227         GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1228         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1229             GST_EVENT_TYPE_SERIALIZED,
1230             gst_structure_new ("splitmuxsink-unblock", "timestamp",
1231                 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1232
1233         GST_SPLITMUX_UNLOCK (splitmux);
1234         gst_pad_send_event (ctx->sinkpad, event);
1235         GST_SPLITMUX_LOCK (splitmux);
1236         /* state may have changed while we were unlocked. Loop again if so */
1237         if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
1238           break;
1239         /* fallthrough */
1240       }
1241       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1242         /* A fragment is ending, wait until that's done before continuing */
1243         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1244         GST_SPLITMUX_WAIT (splitmux);
1245         GST_DEBUG_OBJECT (pad,
1246             "Done sleeping for fragment restart state now %d", splitmux->state);
1247         break;
1248       default:
1249         loop_again = FALSE;
1250         break;
1251     }
1252   } while (loop_again);
1253
1254   if (keyframe) {
1255     splitmux->queued_gops++;
1256     buf_info->keyframe = TRUE;
1257   }
1258
1259   /* Now add this buffer to the queue just before returning */
1260   g_queue_push_head (&ctx->queued_bufs, buf_info);
1261
1262   /* Check the buffer will fit in the mq */
1263   check_queue_length (splitmux, ctx);
1264
1265   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1266       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1267
1268   GST_SPLITMUX_UNLOCK (splitmux);
1269   return GST_PAD_PROBE_PASS;
1270
1271 beach:
1272   GST_SPLITMUX_UNLOCK (splitmux);
1273   if (buf_info)
1274     mq_stream_buf_free (buf_info);
1275   return GST_PAD_PROBE_PASS;
1276 }
1277
1278 static GstPad *
1279 gst_splitmux_sink_request_new_pad (GstElement * element,
1280     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1281 {
1282   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1283   GstPadTemplate *mux_template = NULL;
1284   GstPad *res = NULL;
1285   GstPad *mq_sink, *mq_src;
1286   gchar *gname;
1287   gboolean is_video = FALSE;
1288   MqStreamCtx *ctx;
1289
1290   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1291
1292   GST_SPLITMUX_LOCK (splitmux);
1293   if (!create_elements (splitmux))
1294     goto fail;
1295
1296   if (templ->name_template) {
1297     if (g_str_equal (templ->name_template, "video")) {
1298       /* FIXME: Look for a pad template with matching caps, rather than by name */
1299       mux_template =
1300           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1301           (splitmux->muxer), "video_%u");
1302       is_video = TRUE;
1303       name = NULL;
1304     } else {
1305       mux_template =
1306           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1307           (splitmux->muxer), templ->name_template);
1308     }
1309     if (mux_template == NULL) {
1310       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1311       mux_template =
1312           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1313           (splitmux->muxer), "sink_%d");
1314     }
1315   }
1316
1317   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1318   if (res == NULL)
1319     goto fail;
1320
1321   if (is_video)
1322     gname = g_strdup ("video");
1323   else if (name == NULL)
1324     gname = gst_pad_get_name (res);
1325   else
1326     gname = g_strdup (name);
1327
1328   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1329     gst_element_release_request_pad (splitmux->muxer, res);
1330     gst_object_unref (GST_OBJECT (res));
1331     goto fail;
1332   }
1333
1334   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1335     gst_element_release_request_pad (splitmux->muxer, res);
1336     gst_object_unref (GST_OBJECT (res));
1337     gst_element_release_request_pad (splitmux->mq, mq_sink);
1338     gst_object_unref (GST_OBJECT (mq_sink));
1339     goto fail;
1340   }
1341
1342   gst_object_unref (GST_OBJECT (res));
1343
1344   ctx = mq_stream_ctx_new (splitmux);
1345   ctx->srcpad = mq_src;
1346   ctx->sinkpad = mq_sink;
1347
1348   mq_stream_ctx_ref (ctx);
1349   ctx->src_pad_block_id =
1350       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1351       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1352       _pad_block_destroy_src_notify);
1353   if (is_video && splitmux->reference_ctx != NULL) {
1354     splitmux->reference_ctx->is_reference = FALSE;
1355     splitmux->reference_ctx = NULL;
1356   }
1357   if (splitmux->reference_ctx == NULL) {
1358     splitmux->reference_ctx = ctx;
1359     ctx->is_reference = TRUE;
1360   }
1361
1362   res = gst_ghost_pad_new (gname, mq_sink);
1363   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1364
1365   mq_stream_ctx_ref (ctx);
1366   ctx->sink_pad_block_id =
1367       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1368       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1369       _pad_block_destroy_sink_notify);
1370
1371   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1372       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1373
1374   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1375
1376   g_free (gname);
1377
1378   gst_object_unref (mq_sink);
1379   gst_object_unref (mq_src);
1380
1381   gst_pad_set_active (res, TRUE);
1382   gst_element_add_pad (element, res);
1383   GST_SPLITMUX_UNLOCK (splitmux);
1384
1385   return res;
1386 fail:
1387   GST_SPLITMUX_UNLOCK (splitmux);
1388   return NULL;
1389 }
1390
1391 static void
1392 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1393 {
1394   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1395   GstPad *mqsink, *mqsrc, *muxpad;
1396   MqStreamCtx *ctx =
1397       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1398
1399   GST_SPLITMUX_LOCK (splitmux);
1400
1401   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1402     goto fail;                  /* Elements don't exist yet - nothing to release */
1403
1404   GST_INFO_OBJECT (pad, "releasing request pad");
1405
1406   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1407   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1408   muxpad = gst_pad_get_peer (mqsrc);
1409
1410   /* Remove the context from our consideration */
1411   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1412
1413   if (ctx->sink_pad_block_id)
1414     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1415
1416   if (ctx->src_pad_block_id)
1417     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1418
1419   /* Can release the context now */
1420   mq_stream_ctx_unref (ctx);
1421
1422   /* Release and free the mq input */
1423   gst_element_release_request_pad (splitmux->mq, mqsink);
1424
1425   /* Release and free the muxer input */
1426   gst_element_release_request_pad (splitmux->muxer, muxpad);
1427
1428   gst_object_unref (mqsink);
1429   gst_object_unref (mqsrc);
1430   gst_object_unref (muxpad);
1431
1432   gst_element_remove_pad (element, pad);
1433
1434   /* Reset the internal elements only after all request pads are released */
1435   if (splitmux->contexts == NULL)
1436     gst_splitmux_reset (splitmux);
1437
1438 fail:
1439   GST_SPLITMUX_UNLOCK (splitmux);
1440 }
1441
1442 static GstElement *
1443 create_element (GstSplitMuxSink * splitmux,
1444     const gchar * factory, const gchar * name)
1445 {
1446   GstElement *ret = gst_element_factory_make (factory, name);
1447   if (ret == NULL) {
1448     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1449     return NULL;
1450   }
1451
1452   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1453     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1454     gst_object_unref (ret);
1455     return NULL;
1456   }
1457
1458   return ret;
1459 }
1460
1461 static gboolean
1462 create_elements (GstSplitMuxSink * splitmux)
1463 {
1464   /* Create internal elements */
1465   if (splitmux->mq == NULL) {
1466     if ((splitmux->mq =
1467             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1468       goto fail;
1469
1470     splitmux->mq_max_buffers = 5;
1471     /* No bytes or time limit, we limit buffers manually */
1472     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1473         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1474   }
1475
1476   if (splitmux->muxer == NULL) {
1477     GstElement *provided_muxer = NULL;
1478
1479     GST_OBJECT_LOCK (splitmux);
1480     if (splitmux->provided_muxer != NULL)
1481       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1482     GST_OBJECT_UNLOCK (splitmux);
1483
1484     if (provided_muxer == NULL) {
1485       if ((splitmux->muxer =
1486               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1487         goto fail;
1488     } else {
1489       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1490         g_warning ("Could not add muxer element - splitmuxsink will not work");
1491         gst_object_unref (provided_muxer);
1492         goto fail;
1493       }
1494
1495       splitmux->muxer = provided_muxer;
1496       gst_object_unref (provided_muxer);
1497     }
1498   }
1499
1500   return TRUE;
1501 fail:
1502   return FALSE;
1503 }
1504
1505 static GstElement *
1506 find_sink (GstElement * e)
1507 {
1508   GstElement *res = NULL;
1509   GstIterator *iter;
1510   gboolean done = FALSE;
1511   GValue data = { 0, };
1512
1513   if (!GST_IS_BIN (e))
1514     return e;
1515
1516   iter = gst_bin_iterate_sinks (GST_BIN (e));
1517   while (!done) {
1518     switch (gst_iterator_next (iter, &data)) {
1519       case GST_ITERATOR_OK:
1520       {
1521         GstElement *child = g_value_get_object (&data);
1522         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1523                 "location") != NULL) {
1524           res = child;
1525           done = TRUE;
1526         }
1527         g_value_reset (&data);
1528         break;
1529       }
1530       case GST_ITERATOR_RESYNC:
1531         gst_iterator_resync (iter);
1532         break;
1533       case GST_ITERATOR_DONE:
1534         done = TRUE;
1535         break;
1536       case GST_ITERATOR_ERROR:
1537         g_assert_not_reached ();
1538         break;
1539     }
1540   }
1541   g_value_unset (&data);
1542   gst_iterator_free (iter);
1543
1544   return res;
1545 }
1546
1547 static gboolean
1548 create_sink (GstSplitMuxSink * splitmux)
1549 {
1550   GstElement *provided_sink = NULL;
1551
1552   if (splitmux->active_sink == NULL) {
1553
1554     GST_OBJECT_LOCK (splitmux);
1555     if (splitmux->provided_sink != NULL)
1556       provided_sink = gst_object_ref (splitmux->provided_sink);
1557     GST_OBJECT_UNLOCK (splitmux);
1558
1559     if (provided_sink == NULL) {
1560       if ((splitmux->sink =
1561               create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1562         goto fail;
1563       splitmux->active_sink = splitmux->sink;
1564     } else {
1565       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1566         g_warning ("Could not add sink elements - splitmuxsink will not work");
1567         gst_object_unref (provided_sink);
1568         goto fail;
1569       }
1570
1571       splitmux->active_sink = provided_sink;
1572
1573       /* The bin holds a ref now, we can drop our tmp ref */
1574       gst_object_unref (provided_sink);
1575
1576       /* Find the sink element */
1577       splitmux->sink = find_sink (splitmux->active_sink);
1578       if (splitmux->sink == NULL) {
1579         g_warning
1580             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1581         goto fail;
1582       }
1583     }
1584
1585     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1586       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1587       goto fail;
1588     }
1589   }
1590
1591   return TRUE;
1592 fail:
1593   return FALSE;
1594 }
1595
1596 #ifdef __GNUC__
1597 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1598 #endif
1599 static void
1600 set_next_filename (GstSplitMuxSink * splitmux)
1601 {
1602   gchar *fname = NULL;
1603   gst_splitmux_sink_ensure_max_files (splitmux);
1604
1605   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1606       splitmux->fragment_id, &fname);
1607
1608   if (!fname)
1609     fname = splitmux->location ?
1610         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1611
1612   if (fname) {
1613     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1614     g_object_set (splitmux->sink, "location", fname, NULL);
1615     g_free (fname);
1616
1617     splitmux->fragment_id++;
1618   }
1619 }
1620
1621 static GstStateChangeReturn
1622 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1623 {
1624   GstStateChangeReturn ret;
1625   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1626
1627   switch (transition) {
1628     case GST_STATE_CHANGE_NULL_TO_READY:{
1629       GST_SPLITMUX_LOCK (splitmux);
1630       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1631         ret = GST_STATE_CHANGE_FAILURE;
1632         GST_SPLITMUX_UNLOCK (splitmux);
1633         goto beach;
1634       }
1635       GST_SPLITMUX_UNLOCK (splitmux);
1636       splitmux->fragment_id = 0;
1637       set_next_filename (splitmux);
1638       break;
1639     }
1640     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1641       GST_SPLITMUX_LOCK (splitmux);
1642       /* Start by collecting one input on each pad */
1643       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1644       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
1645       splitmux->muxed_out_time = splitmux->mux_start_time =
1646           GST_CLOCK_STIME_NONE;
1647       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1648       splitmux->opening_first_fragment = TRUE;
1649       GST_SPLITMUX_UNLOCK (splitmux);
1650       break;
1651     }
1652     case GST_STATE_CHANGE_PAUSED_TO_READY:
1653     case GST_STATE_CHANGE_READY_TO_NULL:
1654       GST_SPLITMUX_LOCK (splitmux);
1655       splitmux->state = SPLITMUX_STATE_STOPPED;
1656       /* Wake up any blocked threads */
1657       GST_LOG_OBJECT (splitmux,
1658           "State change -> NULL or READY. Waking threads");
1659       GST_SPLITMUX_BROADCAST (splitmux);
1660       GST_SPLITMUX_UNLOCK (splitmux);
1661       break;
1662     default:
1663       break;
1664   }
1665
1666   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1667   if (ret == GST_STATE_CHANGE_FAILURE)
1668     goto beach;
1669
1670   switch (transition) {
1671     case GST_STATE_CHANGE_READY_TO_NULL:
1672       GST_SPLITMUX_LOCK (splitmux);
1673       splitmux->fragment_id = 0;
1674       /* Reset internal elements only if no pad contexts are using them */
1675       if (splitmux->contexts == NULL)
1676         gst_splitmux_reset (splitmux);
1677       GST_SPLITMUX_UNLOCK (splitmux);
1678       break;
1679     default:
1680       break;
1681   }
1682
1683 beach:
1684
1685   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1686       ret == GST_STATE_CHANGE_FAILURE) {
1687     /* Cleanup elements on failed transition out of NULL */
1688     gst_splitmux_reset (splitmux);
1689   }
1690   return ret;
1691 }
1692
1693 gboolean
1694 register_splitmuxsink (GstPlugin * plugin)
1695 {
1696   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1697       "Split File Muxing Sink");
1698
1699   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1700       GST_TYPE_SPLITMUX_SINK);
1701 }
1702
1703 static void
1704 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1705 {
1706   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1707     splitmux->fragment_id = 0;
1708   }
1709 }