splitmuxsink: When flushing, exit handle_mq_input quickly
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202
203 static void
204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250     GstVideoTimeCode ** next_tc);
251
252 static MqStreamBuf *
253 mq_stream_buf_new (void)
254 {
255   return g_slice_new0 (MqStreamBuf);
256 }
257
258 static void
259 mq_stream_buf_free (MqStreamBuf * data)
260 {
261   g_slice_free (MqStreamBuf, data);
262 }
263
264 static SplitMuxOutputCommand *
265 out_cmd_buf_new (void)
266 {
267   return g_slice_new0 (SplitMuxOutputCommand);
268 }
269
270 static void
271 out_cmd_buf_free (SplitMuxOutputCommand * data)
272 {
273   g_slice_free (SplitMuxOutputCommand, data);
274 }
275
276 static void
277 input_gop_free (InputGop * gop)
278 {
279   g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280   g_slice_free (InputGop, gop);
281 }
282
283 static void
284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
285 {
286   GObjectClass *gobject_class = (GObjectClass *) klass;
287   GstElementClass *gstelement_class = (GstElementClass *) klass;
288   GstBinClass *gstbin_class = (GstBinClass *) klass;
289
290   gobject_class->set_property = gst_splitmux_sink_set_property;
291   gobject_class->get_property = gst_splitmux_sink_get_property;
292   gobject_class->dispose = gst_splitmux_sink_dispose;
293   gobject_class->finalize = gst_splitmux_sink_finalize;
294
295   gst_element_class_set_static_metadata (gstelement_class,
296       "Split Muxing Bin", "Generic/Bin/Muxer",
297       "Convenience bin that muxes incoming streams into multiple time/size limited files",
298       "Jan Schmidt <jan@centricular.com>");
299
300   gst_element_class_add_static_pad_template (gstelement_class,
301       &video_sink_template);
302   gst_element_class_add_static_pad_template (gstelement_class,
303       &video_aux_sink_template);
304   gst_element_class_add_static_pad_template (gstelement_class,
305       &audio_sink_template);
306   gst_element_class_add_static_pad_template (gstelement_class,
307       &subtitle_sink_template);
308   gst_element_class_add_static_pad_template (gstelement_class,
309       &caption_sink_template);
310
311   gstelement_class->change_state =
312       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313   gstelement_class->request_new_pad =
314       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315   gstelement_class->release_pad =
316       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
317
318   gstbin_class->handle_message = bus_handler;
319
320   g_object_class_install_property (gobject_class, PROP_LOCATION,
321       g_param_spec_string ("location", "File Output Pattern",
322           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325       g_param_spec_double ("mux-overhead", "Muxing Overhead",
326           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327           DEFAULT_MUXER_OVERHEAD,
328           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
329
330   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333           DEFAULT_MAX_SIZE_TIME,
334           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335           G_PARAM_STATIC_STRINGS));
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339           DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344           "Maximum difference in timecode between first and last frame. "
345           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346           "Will only be effective if a timecode track is present.", NULL,
347           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350       g_param_spec_boolean ("send-keyframe-requests",
351           "Request keyframes at max-size-time",
352           "Request a keyframe every max-size-time ns to try splitting at that point. "
353           "Needs max-size-bytes to be 0 in order to be effective.",
354           DEFAULT_SEND_KEYFRAME_REQUESTS,
355           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358       g_param_spec_uint ("max-files", "Max files",
359           "Maximum number of files to keep on disk. Once the maximum is reached,"
360           "old files start to be deleted to make room for new ones.", 0,
361           G_MAXUINT, DEFAULT_MAX_FILES,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365           "Allow non-reference streams to be that many ns before the reference"
366           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368           G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_MUXER,
371       g_param_spec_object ("muxer", "Muxer",
372           "The muxer element to use (NULL = default mp4mux). "
373           "Valid only for async-finalize = FALSE",
374           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375   g_object_class_install_property (gobject_class, PROP_SINK,
376       g_param_spec_object ("sink", "Sink",
377           "The sink element (or element chain) to use (NULL = default filesink). "
378           "Valid only for async-finalize = FALSE",
379           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382       g_param_spec_boolean ("use-robust-muxing",
383           "Support robust-muxing mode of some muxers",
384           "Check if muxers support robust muxing via the reserved-max-duration and "
385           "reserved-duration-remaining properties and use them if so. "
386           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387           " create new fragments if the reserved header space is about to overflow. "
388           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389           "manually by the app to a non-zero value for robust muxing to have an effect.",
390           DEFAULT_USE_ROBUST_MUXING,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394       g_param_spec_boolean ("reset-muxer",
395           "Reset Muxer",
396           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400       g_param_spec_boolean ("async-finalize",
401           "Finalize fragments asynchronously",
402           "Finalize each fragment asynchronously and start a new one",
403           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405       g_param_spec_string ("muxer-factory", "Muxer factory",
406           "The muxer element factory to use (default = mp4mux). "
407           "Valid only for async-finalize = TRUE",
408           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409   /**
410    * GstSplitMuxSink:muxer-preset
411    *
412    * An optional #GstPreset name to use for the muxer. This only has an effect
413    * in `async-finalize=TRUE` mode.
414    *
415    * Since: 1.18
416    */
417   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418       g_param_spec_string ("muxer-preset", "Muxer preset",
419           "The muxer preset to use. "
420           "Valid only for async-finalize = TRUE",
421           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423       g_param_spec_boxed ("muxer-properties", "Muxer properties",
424           "The muxer element properties to use. "
425           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426           "Valid only for async-finalize = TRUE",
427           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429       g_param_spec_string ("sink-factory", "Sink factory",
430           "The sink element factory to use (default = filesink). "
431           "Valid only for async-finalize = TRUE",
432           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433   /**
434    * GstSplitMuxSink:sink-preset
435    *
436    * An optional #GstPreset name to use for the sink. This only has an effect
437    * in `async-finalize=TRUE` mode.
438    *
439    * Since: 1.18
440    */
441   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442       g_param_spec_string ("sink-preset", "Sink preset",
443           "The sink preset to use. "
444           "Valid only for async-finalize = TRUE",
445           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447       g_param_spec_boxed ("sink-properties", "Sink properties",
448           "The sink element properties to use. "
449           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450           "Valid only for async-finalize = TRUE",
451           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452   g_object_class_install_property (gobject_class, PROP_START_INDEX,
453       g_param_spec_int ("start-index", "Start Index",
454           "Start value of fragment index.",
455           0, G_MAXINT, DEFAULT_START_INDEX,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457
458   /**
459    * GstSplitMuxSink::muxer-pad-map
460    *
461    * An optional GstStructure that provides a map from splitmuxsink sinkpad
462    * names to muxer pad names they should feed. Splitmuxsink has some default
463    * mapping behaviour to link video to video pads and audio to audio pads
464    * that usually works fine. This property is useful if you need to ensure
465    * a particular mapping to muxed streams.
466    *
467    * The GstStructure contains string fields like so:
468    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
469    *
470    * Since: 1.18
471    */
472   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
475           GST_TYPE_STRUCTURE,
476           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
477
478   /**
479    * GstSplitMuxSink::format-location:
480    * @splitmux: the #GstSplitMuxSink
481    * @fragment_id: the sequence number of the file to be created
482    *
483    * Returns: the location to be used for the next output file. This must be
484    *    a newly-allocated string which will be freed with g_free() by the
485    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
486    *    g_strdup_printf() or similar functions to allocate it.
487    */
488   signals[SIGNAL_FORMAT_LOCATION] =
489       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
491
492   /**
493    * GstSplitMuxSink::format-location-full:
494    * @splitmux: the #GstSplitMuxSink
495    * @fragment_id: the sequence number of the file to be created
496    * @first_sample: A #GstSample containing the first buffer
497    *   from the reference stream in the new file
498    *
499    * Returns: the location to be used for the next output file. This must be
500    *    a newly-allocated string which will be freed with g_free() by the
501    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
502    *    g_strdup_printf() or similar functions to allocate it.
503    *
504    * Since: 1.12
505    */
506   signals[SIGNAL_FORMAT_LOCATION_FULL] =
507       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
509       GST_TYPE_SAMPLE);
510
511   /**
512    * GstSplitMuxSink::split-now:
513    * @splitmux: the #GstSplitMuxSink
514    *
515    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516    * The current GOP will be output to the new file.
517    *
518    * Since: 1.14
519    */
520   signals[SIGNAL_SPLIT_NOW] =
521       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
524       G_TYPE_NONE, 0);
525
526   /**
527    * GstSplitMuxSink::split-after:
528    * @splitmux: the #GstSplitMuxSink
529    *
530    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
532    *
533    * Since: 1.16
534    */
535   signals[SIGNAL_SPLIT_AFTER] =
536       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
539       G_TYPE_NONE, 0);
540
541   /**
542    * GstSplitMuxSink::split-at-running-time:
543    * @splitmux: the #GstSplitMuxSink
544    *
545    * When called by the user, this action signal splits the video file (and
546    * begins a new one) as soon as the given running time is reached. If this
547    * action signal is called multiple times, running times are queued up and
548    * processed in the order they were given.
549    *
550    * Note that this is prone to race conditions, where said running time is
551    * reached and surpassed before we had a chance to split. The file will
552    * still split immediately, but in order to make sure that the split doesn't
553    * happen too late, it is recommended to call this action signal from
554    * something that will prevent further buffers from flowing into
555    * splitmuxsink before the split is completed, such as a pad probe before
556    * splitmuxsink.
557    *
558    *
559    * Since: 1.16
560    */
561   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
566
567   /**
568    * GstSplitMuxSink::muxer-added:
569    * @splitmux: the #GstSplitMuxSink
570    * @muxer: the newly added muxer element
571    *
572    * Since: 1.14
573    */
574   signals[SIGNAL_MUXER_ADDED] =
575       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
577
578   /**
579    * GstSplitMuxSink::sink-added:
580    * @splitmux: the #GstSplitMuxSink
581    * @sink: the newly added sink element
582    *
583    * Since: 1.14
584    */
585   signals[SIGNAL_SINK_ADDED] =
586       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
588
589   klass->split_now = split_now;
590   klass->split_after = split_after;
591   klass->split_at_running_time = split_at_running_time;
592 }
593
594 static void
595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
596 {
597   g_mutex_init (&splitmux->lock);
598   g_mutex_init (&splitmux->state_lock);
599   g_cond_init (&splitmux->input_cond);
600   g_cond_init (&splitmux->output_cond);
601   g_queue_init (&splitmux->out_cmd_q);
602
603   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606   splitmux->max_files = DEFAULT_MAX_FILES;
607   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
611
612   splitmux->threshold_timecode_str = NULL;
613
614   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616   splitmux->muxer_properties = NULL;
617   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618   splitmux->sink_properties = NULL;
619
620   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621   splitmux->split_requested = FALSE;
622   splitmux->do_split_next_gop = FALSE;
623   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
625
626   g_queue_init (&splitmux->pending_input_gops);
627 }
628
629 static void
630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
631 {
632   if (splitmux->muxer) {
633     gst_element_set_locked_state (splitmux->muxer, TRUE);
634     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
636   }
637   if (splitmux->active_sink) {
638     gst_element_set_locked_state (splitmux->active_sink, TRUE);
639     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
641   }
642
643   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
644 }
645
646 static void
647 gst_splitmux_sink_dispose (GObject * object)
648 {
649   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
650
651   /* Calling parent dispose invalidates all child pointers */
652   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
653
654   G_OBJECT_CLASS (parent_class)->dispose (object);
655 }
656
657 static void
658 gst_splitmux_sink_finalize (GObject * object)
659 {
660   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
661
662   g_cond_clear (&splitmux->input_cond);
663   g_cond_clear (&splitmux->output_cond);
664   g_mutex_clear (&splitmux->lock);
665   g_mutex_clear (&splitmux->state_lock);
666   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667   g_queue_clear (&splitmux->out_cmd_q);
668   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669   g_queue_clear (&splitmux->pending_input_gops);
670
671   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
672
673   if (splitmux->muxerpad_map)
674     gst_structure_free (splitmux->muxerpad_map);
675
676   if (splitmux->provided_sink)
677     gst_object_unref (splitmux->provided_sink);
678   if (splitmux->provided_muxer)
679     gst_object_unref (splitmux->provided_muxer);
680
681   if (splitmux->muxer_factory)
682     g_free (splitmux->muxer_factory);
683   if (splitmux->muxer_preset)
684     g_free (splitmux->muxer_preset);
685   if (splitmux->muxer_properties)
686     gst_structure_free (splitmux->muxer_properties);
687   if (splitmux->sink_factory)
688     g_free (splitmux->sink_factory);
689   if (splitmux->sink_preset)
690     g_free (splitmux->sink_preset);
691   if (splitmux->sink_properties)
692     gst_structure_free (splitmux->sink_properties);
693
694   if (splitmux->threshold_timecode_str)
695     g_free (splitmux->threshold_timecode_str);
696   if (splitmux->tc_interval)
697     gst_video_time_code_interval_free (splitmux->tc_interval);
698
699   if (splitmux->times_to_split)
700     gst_queue_array_free (splitmux->times_to_split);
701
702   g_free (splitmux->location);
703
704   /* Make sure to free any un-released contexts. There should not be any,
705    * because the dispose will have freed all request pads though */
706   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707   g_list_free (splitmux->contexts);
708
709   G_OBJECT_CLASS (parent_class)->finalize (object);
710 }
711
712 /*
713  * Set any time threshold to the muxer, if it has
714  * reserved-max-duration and reserved-duration-remaining
715  * properties. Called when creating/claiming the muxer
716  * in create_elements() */
717 static void
718 update_muxer_properties (GstSplitMuxSink * sink)
719 {
720   GObjectClass *klass;
721   GstClockTime threshold_time;
722
723   sink->muxer_has_reserved_props = FALSE;
724   if (sink->muxer == NULL)
725     return;
726   klass = G_OBJECT_GET_CLASS (sink->muxer);
727   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
728     return;
729   if (g_object_class_find_property (klass,
730           "reserved-duration-remaining") == NULL)
731     return;
732   sink->muxer_has_reserved_props = TRUE;
733
734   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735       GST_TIME_ARGS (sink->threshold_time));
736   GST_OBJECT_LOCK (sink);
737   threshold_time = sink->threshold_time;
738   GST_OBJECT_UNLOCK (sink);
739
740   if (threshold_time > 0) {
741     /* Tell the muxer how much space to reserve */
742     GstClockTime muxer_threshold = threshold_time;
743     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
744   }
745 }
746
747 static void
748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749     const GValue * value, GParamSpec * pspec)
750 {
751   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
752
753   switch (prop_id) {
754     case PROP_LOCATION:{
755       GST_OBJECT_LOCK (splitmux);
756       g_free (splitmux->location);
757       splitmux->location = g_value_dup_string (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     }
761     case PROP_START_INDEX:
762       GST_OBJECT_LOCK (splitmux);
763       splitmux->start_index = g_value_get_int (value);
764       GST_OBJECT_UNLOCK (splitmux);
765       break;
766     case PROP_MAX_SIZE_BYTES:
767       GST_OBJECT_LOCK (splitmux);
768       splitmux->threshold_bytes = g_value_get_uint64 (value);
769       GST_OBJECT_UNLOCK (splitmux);
770       break;
771     case PROP_MAX_SIZE_TIME:
772       GST_OBJECT_LOCK (splitmux);
773       splitmux->threshold_time = g_value_get_uint64 (value);
774       GST_OBJECT_UNLOCK (splitmux);
775       break;
776     case PROP_MAX_SIZE_TIMECODE:
777       GST_OBJECT_LOCK (splitmux);
778       g_free (splitmux->threshold_timecode_str);
779       /* will be calculated later */
780       g_clear_pointer (&splitmux->tc_interval,
781           gst_video_time_code_interval_free);
782
783       splitmux->threshold_timecode_str = g_value_dup_string (value);
784       if (splitmux->threshold_timecode_str) {
785         splitmux->tc_interval =
786             gst_video_time_code_interval_new_from_string
787             (splitmux->threshold_timecode_str);
788         if (!splitmux->tc_interval) {
789           g_warning ("Wrong timecode string %s",
790               splitmux->threshold_timecode_str);
791           g_free (splitmux->threshold_timecode_str);
792           splitmux->threshold_timecode_str = NULL;
793         }
794       }
795       splitmux->next_fragment_start_tc_time =
796           calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
797           splitmux->fragment_start_time, NULL);
798       if (splitmux->tc_interval && splitmux->fragment_start_tc
799           && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
800         GST_WARNING_OBJECT (splitmux,
801             "Couldn't calculate next fragment start time for timecode mode");
802       }
803       GST_OBJECT_UNLOCK (splitmux);
804       break;
805     case PROP_SEND_KEYFRAME_REQUESTS:
806       GST_OBJECT_LOCK (splitmux);
807       splitmux->send_keyframe_requests = g_value_get_boolean (value);
808       GST_OBJECT_UNLOCK (splitmux);
809       break;
810     case PROP_MAX_FILES:
811       GST_OBJECT_LOCK (splitmux);
812       splitmux->max_files = g_value_get_uint (value);
813       GST_OBJECT_UNLOCK (splitmux);
814       break;
815     case PROP_MUXER_OVERHEAD:
816       GST_OBJECT_LOCK (splitmux);
817       splitmux->mux_overhead = g_value_get_double (value);
818       GST_OBJECT_UNLOCK (splitmux);
819       break;
820     case PROP_USE_ROBUST_MUXING:
821       GST_OBJECT_LOCK (splitmux);
822       splitmux->use_robust_muxing = g_value_get_boolean (value);
823       GST_OBJECT_UNLOCK (splitmux);
824       if (splitmux->use_robust_muxing)
825         update_muxer_properties (splitmux);
826       break;
827     case PROP_ALIGNMENT_THRESHOLD:
828       GST_OBJECT_LOCK (splitmux);
829       splitmux->alignment_threshold = g_value_get_uint64 (value);
830       GST_OBJECT_UNLOCK (splitmux);
831       break;
832     case PROP_SINK:
833       GST_OBJECT_LOCK (splitmux);
834       gst_clear_object (&splitmux->provided_sink);
835       splitmux->provided_sink = g_value_get_object (value);
836       if (splitmux->provided_sink)
837         gst_object_ref_sink (splitmux->provided_sink);
838       GST_OBJECT_UNLOCK (splitmux);
839       break;
840     case PROP_MUXER:
841       GST_OBJECT_LOCK (splitmux);
842       gst_clear_object (&splitmux->provided_muxer);
843       splitmux->provided_muxer = g_value_get_object (value);
844       if (splitmux->provided_muxer)
845         gst_object_ref_sink (splitmux->provided_muxer);
846       GST_OBJECT_UNLOCK (splitmux);
847       break;
848     case PROP_RESET_MUXER:
849       GST_OBJECT_LOCK (splitmux);
850       splitmux->reset_muxer = g_value_get_boolean (value);
851       GST_OBJECT_UNLOCK (splitmux);
852       break;
853     case PROP_ASYNC_FINALIZE:
854       GST_OBJECT_LOCK (splitmux);
855       splitmux->async_finalize = g_value_get_boolean (value);
856       GST_OBJECT_UNLOCK (splitmux);
857       break;
858     case PROP_MUXER_FACTORY:
859       GST_OBJECT_LOCK (splitmux);
860       if (splitmux->muxer_factory)
861         g_free (splitmux->muxer_factory);
862       splitmux->muxer_factory = g_value_dup_string (value);
863       GST_OBJECT_UNLOCK (splitmux);
864       break;
865     case PROP_MUXER_PRESET:
866       GST_OBJECT_LOCK (splitmux);
867       if (splitmux->muxer_preset)
868         g_free (splitmux->muxer_preset);
869       splitmux->muxer_preset = g_value_dup_string (value);
870       GST_OBJECT_UNLOCK (splitmux);
871       break;
872     case PROP_MUXER_PROPERTIES:
873       GST_OBJECT_LOCK (splitmux);
874       if (splitmux->muxer_properties)
875         gst_structure_free (splitmux->muxer_properties);
876       if (gst_value_get_structure (value))
877         splitmux->muxer_properties =
878             gst_structure_copy (gst_value_get_structure (value));
879       else
880         splitmux->muxer_properties = NULL;
881       GST_OBJECT_UNLOCK (splitmux);
882       break;
883     case PROP_SINK_FACTORY:
884       GST_OBJECT_LOCK (splitmux);
885       if (splitmux->sink_factory)
886         g_free (splitmux->sink_factory);
887       splitmux->sink_factory = g_value_dup_string (value);
888       GST_OBJECT_UNLOCK (splitmux);
889       break;
890     case PROP_SINK_PRESET:
891       GST_OBJECT_LOCK (splitmux);
892       if (splitmux->sink_preset)
893         g_free (splitmux->sink_preset);
894       splitmux->sink_preset = g_value_dup_string (value);
895       GST_OBJECT_UNLOCK (splitmux);
896       break;
897     case PROP_SINK_PROPERTIES:
898       GST_OBJECT_LOCK (splitmux);
899       if (splitmux->sink_properties)
900         gst_structure_free (splitmux->sink_properties);
901       if (gst_value_get_structure (value))
902         splitmux->sink_properties =
903             gst_structure_copy (gst_value_get_structure (value));
904       else
905         splitmux->sink_properties = NULL;
906       GST_OBJECT_UNLOCK (splitmux);
907       break;
908     case PROP_MUXERPAD_MAP:
909     {
910       const GstStructure *s = gst_value_get_structure (value);
911       GST_SPLITMUX_LOCK (splitmux);
912       if (splitmux->muxerpad_map) {
913         gst_structure_free (splitmux->muxerpad_map);
914       }
915       if (s)
916         splitmux->muxerpad_map = gst_structure_copy (s);
917       else
918         splitmux->muxerpad_map = NULL;
919       GST_SPLITMUX_UNLOCK (splitmux);
920       break;
921     }
922     default:
923       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
924       break;
925   }
926 }
927
928 static void
929 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
930     GValue * value, GParamSpec * pspec)
931 {
932   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
933
934   switch (prop_id) {
935     case PROP_LOCATION:
936       GST_OBJECT_LOCK (splitmux);
937       g_value_set_string (value, splitmux->location);
938       GST_OBJECT_UNLOCK (splitmux);
939       break;
940     case PROP_START_INDEX:
941       GST_OBJECT_LOCK (splitmux);
942       g_value_set_int (value, splitmux->start_index);
943       GST_OBJECT_UNLOCK (splitmux);
944       break;
945     case PROP_MAX_SIZE_BYTES:
946       GST_OBJECT_LOCK (splitmux);
947       g_value_set_uint64 (value, splitmux->threshold_bytes);
948       GST_OBJECT_UNLOCK (splitmux);
949       break;
950     case PROP_MAX_SIZE_TIME:
951       GST_OBJECT_LOCK (splitmux);
952       g_value_set_uint64 (value, splitmux->threshold_time);
953       GST_OBJECT_UNLOCK (splitmux);
954       break;
955     case PROP_MAX_SIZE_TIMECODE:
956       GST_OBJECT_LOCK (splitmux);
957       g_value_set_string (value, splitmux->threshold_timecode_str);
958       GST_OBJECT_UNLOCK (splitmux);
959       break;
960     case PROP_SEND_KEYFRAME_REQUESTS:
961       GST_OBJECT_LOCK (splitmux);
962       g_value_set_boolean (value, splitmux->send_keyframe_requests);
963       GST_OBJECT_UNLOCK (splitmux);
964       break;
965     case PROP_MAX_FILES:
966       GST_OBJECT_LOCK (splitmux);
967       g_value_set_uint (value, splitmux->max_files);
968       GST_OBJECT_UNLOCK (splitmux);
969       break;
970     case PROP_MUXER_OVERHEAD:
971       GST_OBJECT_LOCK (splitmux);
972       g_value_set_double (value, splitmux->mux_overhead);
973       GST_OBJECT_UNLOCK (splitmux);
974       break;
975     case PROP_USE_ROBUST_MUXING:
976       GST_OBJECT_LOCK (splitmux);
977       g_value_set_boolean (value, splitmux->use_robust_muxing);
978       GST_OBJECT_UNLOCK (splitmux);
979       break;
980     case PROP_ALIGNMENT_THRESHOLD:
981       GST_OBJECT_LOCK (splitmux);
982       g_value_set_uint64 (value, splitmux->alignment_threshold);
983       GST_OBJECT_UNLOCK (splitmux);
984       break;
985     case PROP_SINK:
986       GST_OBJECT_LOCK (splitmux);
987       g_value_set_object (value, splitmux->provided_sink);
988       GST_OBJECT_UNLOCK (splitmux);
989       break;
990     case PROP_MUXER:
991       GST_OBJECT_LOCK (splitmux);
992       g_value_set_object (value, splitmux->provided_muxer);
993       GST_OBJECT_UNLOCK (splitmux);
994       break;
995     case PROP_RESET_MUXER:
996       GST_OBJECT_LOCK (splitmux);
997       g_value_set_boolean (value, splitmux->reset_muxer);
998       GST_OBJECT_UNLOCK (splitmux);
999       break;
1000     case PROP_ASYNC_FINALIZE:
1001       GST_OBJECT_LOCK (splitmux);
1002       g_value_set_boolean (value, splitmux->async_finalize);
1003       GST_OBJECT_UNLOCK (splitmux);
1004       break;
1005     case PROP_MUXER_FACTORY:
1006       GST_OBJECT_LOCK (splitmux);
1007       g_value_set_string (value, splitmux->muxer_factory);
1008       GST_OBJECT_UNLOCK (splitmux);
1009       break;
1010     case PROP_MUXER_PRESET:
1011       GST_OBJECT_LOCK (splitmux);
1012       g_value_set_string (value, splitmux->muxer_preset);
1013       GST_OBJECT_UNLOCK (splitmux);
1014       break;
1015     case PROP_MUXER_PROPERTIES:
1016       GST_OBJECT_LOCK (splitmux);
1017       gst_value_set_structure (value, splitmux->muxer_properties);
1018       GST_OBJECT_UNLOCK (splitmux);
1019       break;
1020     case PROP_SINK_FACTORY:
1021       GST_OBJECT_LOCK (splitmux);
1022       g_value_set_string (value, splitmux->sink_factory);
1023       GST_OBJECT_UNLOCK (splitmux);
1024       break;
1025     case PROP_SINK_PRESET:
1026       GST_OBJECT_LOCK (splitmux);
1027       g_value_set_string (value, splitmux->sink_preset);
1028       GST_OBJECT_UNLOCK (splitmux);
1029       break;
1030     case PROP_SINK_PROPERTIES:
1031       GST_OBJECT_LOCK (splitmux);
1032       gst_value_set_structure (value, splitmux->sink_properties);
1033       GST_OBJECT_UNLOCK (splitmux);
1034       break;
1035     case PROP_MUXERPAD_MAP:
1036       GST_SPLITMUX_LOCK (splitmux);
1037       gst_value_set_structure (value, splitmux->muxerpad_map);
1038       GST_SPLITMUX_UNLOCK (splitmux);
1039       break;
1040     default:
1041       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1042       break;
1043   }
1044 }
1045
1046 /* Convenience function */
1047 static inline GstClockTimeDiff
1048 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1049 {
1050   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1051
1052   if (GST_CLOCK_TIME_IS_VALID (val)) {
1053     gboolean sign =
1054         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1055     if (sign > 0)
1056       res = val;
1057     else if (sign < 0)
1058       res = -val;
1059   }
1060   return res;
1061 }
1062
1063 static void
1064 mq_stream_ctx_reset (MqStreamCtx * ctx)
1065 {
1066   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1067   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1068   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1069   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1070   g_queue_clear (&ctx->queued_bufs);
1071 }
1072
1073 static MqStreamCtx *
1074 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1075 {
1076   MqStreamCtx *ctx;
1077
1078   ctx = g_new0 (MqStreamCtx, 1);
1079   ctx->splitmux = splitmux;
1080   g_queue_init (&ctx->queued_bufs);
1081   mq_stream_ctx_reset (ctx);
1082
1083   return ctx;
1084 }
1085
1086 static void
1087 mq_stream_ctx_free (MqStreamCtx * ctx)
1088 {
1089   if (ctx->q) {
1090     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1091
1092     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1093
1094     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1095       gst_element_set_locked_state (ctx->q, TRUE);
1096       gst_element_set_state (ctx->q, GST_STATE_NULL);
1097       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1098       gst_object_unref (parent);
1099     }
1100     gst_object_unref (ctx->q);
1101   }
1102   gst_object_unref (ctx->sinkpad);
1103   gst_object_unref (ctx->srcpad);
1104   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1105   g_queue_clear (&ctx->queued_bufs);
1106   g_free (ctx);
1107 }
1108
1109 static void
1110 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1111     GstElement * sink)
1112 {
1113   gchar *location = NULL;
1114   GstMessage *msg;
1115   const gchar *msg_name = opened ?
1116       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1117   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1118
1119   if (!opened) {
1120     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1121     if (rtime)
1122       running_time = *rtime;
1123   }
1124
1125   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1126           "location") != NULL)
1127     g_object_get (sink, "location", &location, NULL);
1128
1129   GST_DEBUG_OBJECT (splitmux,
1130       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1131       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1132
1133   /* If it's in the middle of a teardown, the reference_ctc might have become
1134    * NULL */
1135   if (splitmux->reference_ctx) {
1136     msg = gst_message_new_element (GST_OBJECT (splitmux),
1137         gst_structure_new (msg_name,
1138             "location", G_TYPE_STRING, location,
1139             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1140             "sink", GST_TYPE_ELEMENT, sink, NULL));
1141     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1142   }
1143
1144   g_free (location);
1145 }
1146
1147 static void
1148 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1149 {
1150   GstEvent *eos;
1151   GstPad *pad;
1152   MqStreamCtx *ctx;
1153
1154   eos = gst_event_new_eos ();
1155   pad = helper->pad;
1156   ctx = helper->ctx;
1157
1158   GST_SPLITMUX_LOCK (splitmux);
1159   if (!pad)
1160     pad = gst_pad_get_peer (ctx->srcpad);
1161   GST_SPLITMUX_UNLOCK (splitmux);
1162
1163   gst_pad_send_event (pad, eos);
1164   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1165
1166   gst_object_unref (pad);
1167   g_free (helper);
1168 }
1169
1170 /* Called with lock held, drops the lock to send EOS to the
1171  * pad
1172  */
1173 static void
1174 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1175 {
1176   GstEvent *eos;
1177   GstPad *pad;
1178
1179   eos = gst_event_new_eos ();
1180   pad = gst_pad_get_peer (ctx->srcpad);
1181
1182   ctx->out_eos = TRUE;
1183
1184   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1185   GST_SPLITMUX_UNLOCK (splitmux);
1186   gst_pad_send_event (pad, eos);
1187   GST_SPLITMUX_LOCK (splitmux);
1188
1189   gst_object_unref (pad);
1190 }
1191
1192 /* Called with lock held. Schedules an EOS event to the ctx pad
1193  * to happen in another thread */
1194 static void
1195 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1196 {
1197   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1198   GstPad *srcpad, *sinkpad;
1199
1200   srcpad = ctx->srcpad;
1201   sinkpad = gst_pad_get_peer (srcpad);
1202
1203   helper->ctx = ctx;
1204   helper->pad = sinkpad;        /* Takes the reference */
1205
1206   ctx->out_eos_async_done = TRUE;
1207
1208   /* There used to be a bug here, where we had to explicitly remove
1209    * the SINK flag so that GstBin would ignore it for EOS purposes.
1210    * That fixed a race where if splitmuxsink really reaches EOS
1211    * before an asynchronous background element has finished, then
1212    * the bin wouldn't actually send EOS to the pipeline. Even after
1213    * finishing and removing the old element, the bin didn't re-check
1214    * EOS status on removing a SINK element. That bug was fixed
1215    * in core. */
1216   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1217       sinkpad, ctx);
1218
1219   g_assert_nonnull (helper->pad);
1220   gst_element_call_async (GST_ELEMENT (splitmux),
1221       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1222 }
1223
1224 /* Called with lock held. TRUE iff all contexts have a
1225  * pending (or delivered) async eos event */
1226 static gboolean
1227 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1228 {
1229   gboolean ret = TRUE;
1230   GList *item;
1231
1232   for (item = splitmux->contexts; item; item = item->next) {
1233     MqStreamCtx *ctx = item->data;
1234     ret &= ctx->out_eos_async_done;
1235   }
1236   return ret;
1237 }
1238
1239 /* Called with splitmux lock held to check if this output
1240  * context needs to sleep to wait for the release of the
1241  * next GOP, or to send EOS to close out the current file
1242  */
1243 static GstFlowReturn
1244 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1245 {
1246   if (ctx->caps_change)
1247     return GST_FLOW_OK;
1248
1249   do {
1250     /* When first starting up, the reference stream has to output
1251      * the first buffer to prepare the muxer and sink */
1252     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1253     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1254
1255     if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1256         && my_max_out_running_time != G_MAXINT64) {
1257       my_max_out_running_time -= splitmux->alignment_threshold;
1258       GST_LOG_OBJECT (ctx->srcpad,
1259           "Max out running time currently %" GST_STIME_FORMAT
1260           ", with threshold applied it is %" GST_STIME_FORMAT,
1261           GST_STIME_ARGS (splitmux->max_out_running_time),
1262           GST_STIME_ARGS (my_max_out_running_time));
1263     }
1264
1265     if (ctx->flushing
1266         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1267       return GST_FLOW_FLUSHING;
1268
1269     GST_LOG_OBJECT (ctx->srcpad,
1270         "Checking running time %" GST_STIME_FORMAT " against max %"
1271         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1272         GST_STIME_ARGS (my_max_out_running_time));
1273
1274     if (can_output) {
1275       if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1276           ctx->out_running_time < my_max_out_running_time) {
1277         return GST_FLOW_OK;
1278       }
1279
1280       switch (splitmux->output_state) {
1281         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1282           /* We only get here if we've finished outputting a GOP and need to know
1283            * what to do next */
1284           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1285           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1286           continue;
1287
1288         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1289         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1290           /* We've reached the max out running_time to get here, so end this file now */
1291           if (ctx->out_eos == FALSE) {
1292             if (splitmux->async_finalize) {
1293               /* We must set EOS asynchronously at this point. We cannot defer
1294                * it, because we need all contexts to wake up, for the
1295                * reference context to eventually give us something at
1296                * START_NEXT_FILE. Otherwise, collectpads might choose another
1297                * context to give us the first buffer, and format-location-full
1298                * will not contain a valid sample. */
1299               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1300                   GINT_TO_POINTER (1));
1301               eos_context_async (ctx, splitmux);
1302               if (all_contexts_are_async_eos (splitmux)) {
1303                 GST_INFO_OBJECT (splitmux,
1304                     "All contexts are async_eos. Moving to the next file.");
1305                 /* We can start the next file once we've asked each pad to go EOS */
1306                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1307                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1308                 continue;
1309               }
1310             } else {
1311               send_eos (splitmux, ctx);
1312               continue;
1313             }
1314           } else {
1315             GST_INFO_OBJECT (splitmux,
1316                 "At end-of-file state, but context %p is already EOS", ctx);
1317           }
1318           break;
1319         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1320           if (ctx->is_reference) {
1321             GstFlowReturn ret = GST_FLOW_OK;
1322
1323             /* Special handling on the reference ctx to start new fragments
1324              * and collect commands from the command queue */
1325             /* drops the splitmux lock briefly: */
1326             /* We must have reference ctx in order for format-location-full to
1327              * have a sample */
1328             ret = start_next_fragment (splitmux, ctx);
1329             if (ret != GST_FLOW_OK)
1330               return ret;
1331
1332             continue;
1333           }
1334           break;
1335         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1336           do {
1337             SplitMuxOutputCommand *cmd =
1338                 g_queue_pop_tail (&splitmux->out_cmd_q);
1339             if (cmd != NULL) {
1340               /* If we pop the last command, we need to make our queues bigger */
1341               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1342                 grow_blocked_queues (splitmux);
1343
1344               if (cmd->start_new_fragment) {
1345                 if (splitmux->muxed_out_bytes > 0) {
1346                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1347                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1348                 } else {
1349                   GST_DEBUG_OBJECT (splitmux,
1350                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1351                 }
1352               } else {
1353                 GST_DEBUG_OBJECT (splitmux,
1354                     "Got new output cmd for time %" GST_STIME_FORMAT,
1355                     GST_STIME_ARGS (cmd->max_output_ts));
1356
1357                 /* Extend the output range immediately */
1358                 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1359                     || cmd->max_output_ts > splitmux->max_out_running_time)
1360                   splitmux->max_out_running_time = cmd->max_output_ts;
1361                 GST_DEBUG_OBJECT (splitmux,
1362                     "Max out running time now %" GST_STIME_FORMAT,
1363                     GST_STIME_ARGS (splitmux->max_out_running_time));
1364                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1365               }
1366               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1367
1368               out_cmd_buf_free (cmd);
1369               break;
1370             } else {
1371               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1372             }
1373           } while (!ctx->flushing && splitmux->output_state ==
1374               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1375           /* loop and re-check the state */
1376           continue;
1377         }
1378         case SPLITMUX_OUTPUT_STATE_STOPPED:
1379           return GST_FLOW_FLUSHING;
1380       }
1381     } else {
1382       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1383     }
1384
1385     GST_INFO_OBJECT (ctx->srcpad,
1386         "Sleeping for running time %"
1387         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1388         GST_STIME_ARGS (ctx->out_running_time),
1389         GST_STIME_ARGS (splitmux->max_out_running_time));
1390     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1391     GST_INFO_OBJECT (ctx->srcpad,
1392         "Woken for new max running time %" GST_STIME_FORMAT,
1393         GST_STIME_ARGS (splitmux->max_out_running_time));
1394   }
1395   while (1);
1396
1397   return GST_FLOW_OK;
1398 }
1399
1400 static GstClockTime
1401 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1402     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1403     GstVideoTimeCode ** next_tc)
1404 {
1405   GstVideoTimeCode *target_tc;
1406   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1407
1408   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1409     return GST_CLOCK_TIME_NONE;
1410
1411   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1412   if (!target_tc) {
1413     GST_ELEMENT_ERROR (splitmux,
1414         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1415     return GST_CLOCK_TIME_NONE;
1416   }
1417
1418   /* Convert to ns */
1419   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1420   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1421
1422   /* Add running_time, accounting for wraparound. */
1423   if (target_tc_time >= cur_tc_time) {
1424     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1425   } else {
1426     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1427
1428     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1429         (cur_tc->config.fps_d == 1001)) {
1430       /* Checking fps_d is probably unneeded, but better safe than sorry
1431        * (e.g. someone accidentally set a flag) */
1432       GstVideoTimeCode *tc_for_offset;
1433
1434       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1435        * but slightly less. Calculate that duration from a fake timecode. The
1436        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1437        * is to add one frame to 23:59:59;29 */
1438       tc_for_offset =
1439           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1440           NULL, cur_tc->config.flags, 23, 59, 59,
1441           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1442       day_in_ns =
1443           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1444           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1445           cur_tc->config.fps_n);
1446       gst_video_time_code_free (tc_for_offset);
1447     }
1448     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1449   }
1450
1451 #ifndef GST_DISABLE_GST_DEBUG
1452   {
1453     gchar *next_max_tc_str, *cur_tc_str;
1454
1455     cur_tc_str = gst_video_time_code_to_string (cur_tc);
1456     next_max_tc_str = gst_video_time_code_to_string (target_tc);
1457
1458     GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1459         " from ref timecode %s time: %" GST_TIME_FORMAT,
1460         next_max_tc_str,
1461         GST_TIME_ARGS (next_max_tc_time),
1462         cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1463
1464     g_free (next_max_tc_str);
1465     g_free (cur_tc_str);
1466   }
1467 #endif
1468
1469   if (next_tc)
1470     *next_tc = target_tc;
1471   else
1472     gst_video_time_code_free (target_tc);
1473
1474   return next_max_tc_time;
1475 }
1476
1477 static gboolean
1478 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1479     GstClockTimeDiff running_time_dts)
1480 {
1481   GstEvent *ev;
1482   GstClockTime target_time;
1483   gboolean timecode_based = FALSE;
1484   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1485   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1486   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1487   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1488   InputGop *newest_gop = NULL;
1489   GList *l;
1490
1491   if (!splitmux->send_keyframe_requests)
1492     return TRUE;
1493
1494   /* Find the newest GOP where we passed in DTS the start PTS */
1495   for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1496     InputGop *tmp = l->data;
1497
1498     GST_TRACE_OBJECT (splitmux,
1499         "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1500         " and start time %" GST_STIME_FORMAT,
1501         GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1502
1503     if (tmp->sent_fku) {
1504       GST_DEBUG_OBJECT (splitmux,
1505           "Already checked for a keyframe request for this GOP");
1506       return TRUE;
1507     }
1508
1509     if (running_time_dts == GST_CLOCK_STIME_NONE ||
1510         tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1511         running_time_dts >= tmp->start_time_pts) {
1512       GST_DEBUG_OBJECT (splitmux,
1513           "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1514           GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1515           GST_STIME_ARGS (tmp->start_time));
1516       newest_gop = tmp;
1517       break;
1518     }
1519   }
1520
1521   if (!newest_gop) {
1522     GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1523     return TRUE;
1524   }
1525
1526   if (splitmux->tc_interval) {
1527     if (newest_gop->start_tc
1528         && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1529       GstVideoTimeCode *next_tc = NULL;
1530       max_tc_time =
1531           calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1532           newest_gop->start_time, &next_tc);
1533
1534       /* calculate the next expected keyframe time to prevent too early fku
1535        * event */
1536       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1537         next_max_tc_time =
1538             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1539       }
1540       if (next_tc)
1541         gst_video_time_code_free (next_tc);
1542
1543       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1544           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1545
1546       if (!timecode_based) {
1547         GST_WARNING_OBJECT (splitmux,
1548             "Couldn't calculate maximum fragment time for timecode mode");
1549       }
1550     } else {
1551       /* This can happen in the presence of GAP events that trigger
1552        * a new fragment start */
1553       GST_WARNING_OBJECT (splitmux,
1554           "No buffer available to calculate next timecode");
1555     }
1556   }
1557
1558   if ((splitmux->threshold_time == 0 && !timecode_based)
1559       || splitmux->threshold_bytes != 0)
1560     return TRUE;
1561
1562   if (timecode_based) {
1563     /* We might have rounding errors: aim slightly earlier */
1564     if (max_tc_time >= tc_rounding_error) {
1565       target_time = max_tc_time - tc_rounding_error;
1566     } else {
1567       /* unreliable target time */
1568       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1569           " is smaller than allowed rounding error, set it to zero",
1570           GST_TIME_ARGS (max_tc_time));
1571       target_time = 0;
1572     }
1573
1574     if (next_max_tc_time >= tc_rounding_error) {
1575       next_fku_time = next_max_tc_time - tc_rounding_error;
1576     } else {
1577       /* unreliable target time */
1578       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1579           " is smaller than allowed rounding error, set it to zero",
1580           GST_TIME_ARGS (next_max_tc_time));
1581       next_fku_time = 0;
1582     }
1583   } else {
1584     target_time = newest_gop->start_time + splitmux->threshold_time;
1585   }
1586
1587   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1588     GstClockTime allowed_time = splitmux->next_fku_time;
1589
1590     if (timecode_based) {
1591       if (allowed_time >= tc_rounding_error) {
1592         allowed_time -= tc_rounding_error;
1593       } else {
1594         /* unreliable next force key unit time */
1595         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1596             GST_TIME_FORMAT
1597             " is smaller than allowed rounding error, set it to zero",
1598             GST_TIME_ARGS (splitmux->next_fku_time));
1599         allowed_time = 0;
1600       }
1601     }
1602
1603     if (target_time < allowed_time) {
1604       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1605           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1606           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1607           GST_TIME_ARGS (target_time),
1608           GST_TIME_ARGS (splitmux->next_fku_time),
1609           GST_TIME_ARGS (allowed_time));
1610
1611       return TRUE;
1612     } else if (allowed_time != splitmux->next_fku_time &&
1613         target_time < splitmux->next_fku_time) {
1614       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1615           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1616           ", but the difference is smaller than allowed rounding error",
1617           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1618     }
1619   }
1620
1621   if (!timecode_based) {
1622     next_fku_time = target_time + splitmux->threshold_time;
1623   }
1624
1625   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1626       ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1627       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1628
1629   newest_gop->sent_fku = TRUE;
1630
1631   splitmux->next_fku_time = next_fku_time;
1632   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1633
1634   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1635 }
1636
1637 static GstPadProbeReturn
1638 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1639 {
1640   GstSplitMuxSink *splitmux = ctx->splitmux;
1641   MqStreamBuf *buf_info = NULL;
1642   GstFlowReturn ret = GST_FLOW_OK;
1643
1644   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1645
1646   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1647   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1648     g_warning ("Buffer list handling not implemented");
1649     return GST_PAD_PROBE_DROP;
1650   }
1651   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1652       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1653     GstEvent *event = gst_pad_probe_info_get_event (info);
1654     gboolean locked = FALSE, wait = !ctx->is_reference;
1655
1656     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1657
1658     switch (GST_EVENT_TYPE (event)) {
1659       case GST_EVENT_SEGMENT:
1660         gst_event_copy_segment (event, &ctx->out_segment);
1661         break;
1662       case GST_EVENT_FLUSH_STOP:
1663         GST_SPLITMUX_LOCK (splitmux);
1664         locked = TRUE;
1665         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1666         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1667         g_queue_clear (&ctx->queued_bufs);
1668         g_queue_clear (&ctx->queued_bufs);
1669         /* If this is the reference context, we just threw away any queued keyframes */
1670         if (ctx->is_reference)
1671           splitmux->queued_keyframes = 0;
1672         ctx->flushing = FALSE;
1673         wait = FALSE;
1674         break;
1675       case GST_EVENT_FLUSH_START:
1676         GST_SPLITMUX_LOCK (splitmux);
1677         locked = TRUE;
1678         GST_LOG_OBJECT (pad, "Flush start");
1679         ctx->flushing = TRUE;
1680         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1681         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1682         break;
1683       case GST_EVENT_EOS:
1684         GST_SPLITMUX_LOCK (splitmux);
1685         locked = TRUE;
1686         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1687           goto beach;
1688         ctx->out_eos = TRUE;
1689
1690         if (ctx == splitmux->reference_ctx) {
1691           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1692           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1693         }
1694
1695         GST_INFO_OBJECT (splitmux,
1696             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1697         break;
1698       case GST_EVENT_GAP:{
1699         GstClockTime gap_ts;
1700         GstClockTimeDiff rtime;
1701
1702         gst_event_parse_gap (event, &gap_ts, NULL);
1703         if (gap_ts == GST_CLOCK_TIME_NONE)
1704           break;
1705
1706         GST_SPLITMUX_LOCK (splitmux);
1707         locked = TRUE;
1708
1709         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1710           goto beach;
1711
1712         /* When we get a gap event on the
1713          * reference stream and we're trying to open a
1714          * new file, we need to store it until we get
1715          * the buffer afterwards
1716          */
1717         if (ctx->is_reference &&
1718             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1719           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1720           gst_event_replace (&ctx->pending_gap, event);
1721           GST_SPLITMUX_UNLOCK (splitmux);
1722           return GST_PAD_PROBE_HANDLED;
1723         }
1724
1725         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1726
1727         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1728             GST_STIME_ARGS (rtime));
1729
1730         if (rtime != GST_CLOCK_STIME_NONE) {
1731           ctx->out_running_time = rtime;
1732           complete_or_wait_on_out (splitmux, ctx);
1733         }
1734         break;
1735       }
1736       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1737         const GstStructure *s;
1738         GstClockTimeDiff ts = 0;
1739
1740         s = gst_event_get_structure (event);
1741         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1742           break;
1743
1744         gst_structure_get_int64 (s, "timestamp", &ts);
1745
1746         GST_SPLITMUX_LOCK (splitmux);
1747         locked = TRUE;
1748
1749         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1750           goto beach;
1751         ctx->out_running_time = ts;
1752         if (!ctx->is_reference)
1753           ret = complete_or_wait_on_out (splitmux, ctx);
1754         GST_SPLITMUX_UNLOCK (splitmux);
1755         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1756         return GST_PAD_PROBE_DROP;
1757       }
1758       case GST_EVENT_CAPS:{
1759         GstPad *peer;
1760
1761         if (!ctx->is_reference)
1762           break;
1763
1764         peer = gst_pad_get_peer (pad);
1765         if (peer) {
1766           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1767
1768           gst_object_unref (peer);
1769
1770           if (ok)
1771             break;
1772
1773         } else {
1774           break;
1775         }
1776         /* This is in the case the muxer doesn't allow this change of caps */
1777         GST_SPLITMUX_LOCK (splitmux);
1778         locked = TRUE;
1779         ctx->caps_change = TRUE;
1780
1781         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1782           GST_DEBUG_OBJECT (splitmux,
1783               "New caps were not accepted. Switching output file");
1784           if (ctx->out_eos == FALSE) {
1785             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1786             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1787           }
1788         }
1789
1790         /* Lets it fall through, if it fails again, then the muxer just can't
1791          * support this format, but at least we have a closed file.
1792          */
1793         break;
1794       }
1795       default:
1796         break;
1797     }
1798
1799     /* We need to make sure events aren't passed
1800      * until the muxer / sink are ready for it */
1801     if (!locked)
1802       GST_SPLITMUX_LOCK (splitmux);
1803     if (wait)
1804       ret = complete_or_wait_on_out (splitmux, ctx);
1805     GST_SPLITMUX_UNLOCK (splitmux);
1806
1807     /* Don't try to forward sticky events before the next buffer is there
1808      * because it would cause a new file to be created without the first
1809      * buffer being available.
1810      */
1811     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1812     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1813       gst_event_unref (event);
1814       return GST_PAD_PROBE_HANDLED;
1815     } else {
1816       return GST_PAD_PROBE_PASS;
1817     }
1818   }
1819
1820   /* Allow everything through until the configured next stopping point */
1821   GST_SPLITMUX_LOCK (splitmux);
1822
1823   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1824   if (buf_info == NULL) {
1825     /* Can only happen due to a poorly timed flush */
1826     ret = GST_FLOW_FLUSHING;
1827     goto beach;
1828   }
1829
1830   /* If we have popped a keyframe, decrement the queued_gop count */
1831   if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1832     splitmux->queued_keyframes--;
1833
1834   ctx->out_running_time = buf_info->run_ts;
1835   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1836
1837   GST_LOG_OBJECT (splitmux,
1838       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1839       " size %" G_GUINT64_FORMAT,
1840       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1841
1842   ctx->caps_change = FALSE;
1843
1844   ret = complete_or_wait_on_out (splitmux, ctx);
1845
1846   splitmux->muxed_out_bytes += buf_info->buf_size;
1847
1848 #ifndef GST_DISABLE_GST_DEBUG
1849   {
1850     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1851     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1852         " run ts %" GST_STIME_FORMAT, buf,
1853         GST_STIME_ARGS (ctx->out_running_time));
1854   }
1855 #endif
1856
1857   ctx->cur_out_buffer = NULL;
1858   GST_SPLITMUX_UNLOCK (splitmux);
1859
1860   /* pending_gap is protected by the STREAM lock */
1861   if (ctx->pending_gap) {
1862     /* If we previously stored a gap event, send it now */
1863     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1864
1865     GST_DEBUG_OBJECT (splitmux,
1866         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1867
1868     gst_pad_send_event (peer, ctx->pending_gap);
1869     ctx->pending_gap = NULL;
1870
1871     gst_object_unref (peer);
1872   }
1873
1874   mq_stream_buf_free (buf_info);
1875
1876   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1877   return GST_PAD_PROBE_PASS;
1878
1879 beach:
1880   GST_SPLITMUX_UNLOCK (splitmux);
1881   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1882   return GST_PAD_PROBE_DROP;
1883 }
1884
1885 static gboolean
1886 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1887 {
1888   return gst_pad_send_event (peer, gst_event_ref (*event));
1889 }
1890
1891 static void
1892 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1893 {
1894   if (ctx->fragment_block_id > 0) {
1895     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1896     ctx->fragment_block_id = 0;
1897   }
1898 }
1899
1900 static void
1901 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1902 {
1903   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1904
1905   gst_pad_sticky_events_foreach (ctx->srcpad,
1906       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1907
1908   /* Clear EOS flag if not actually EOS */
1909   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1910   ctx->out_eos_async_done = ctx->out_eos;
1911
1912   gst_object_unref (peer);
1913 }
1914
1915 static void
1916 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1917 {
1918   GstPad *sinkpad, *srcpad, *newpad;
1919   GstPadTemplate *templ;
1920
1921   srcpad = ctx->srcpad;
1922   sinkpad = gst_pad_get_peer (srcpad);
1923
1924   templ = sinkpad->padtemplate;
1925   newpad =
1926       gst_element_request_pad (splitmux->muxer, templ,
1927       GST_PAD_NAME (sinkpad), NULL);
1928
1929   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1930       newpad);
1931   if (!gst_pad_unlink (srcpad, sinkpad)) {
1932     gst_object_unref (sinkpad);
1933     goto fail;
1934   }
1935   if (gst_pad_link_full (srcpad, newpad,
1936           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1937     gst_element_release_request_pad (splitmux->muxer, newpad);
1938     gst_object_unref (sinkpad);
1939     gst_object_unref (newpad);
1940     goto fail;
1941   }
1942   gst_object_unref (newpad);
1943   gst_object_unref (sinkpad);
1944   return;
1945
1946 fail:
1947   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1948       ("Could not create the new muxer/sink"), NULL);
1949 }
1950
1951 static GstPadProbeReturn
1952 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1953 {
1954   return GST_PAD_PROBE_OK;
1955 }
1956
1957 static void
1958 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1959 {
1960   ctx->fragment_block_id =
1961       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1962       NULL, NULL);
1963 }
1964
1965 static gboolean
1966 _set_property_from_structure (GQuark field_id, const GValue * value,
1967     gpointer user_data)
1968 {
1969   const gchar *property_name = g_quark_to_string (field_id);
1970   GObject *element = G_OBJECT (user_data);
1971
1972   g_object_set_property (element, property_name, value);
1973
1974   return TRUE;
1975 }
1976
1977 static void
1978 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1979 {
1980   gst_element_set_locked_state (element, TRUE);
1981   gst_element_set_state (element, GST_STATE_NULL);
1982   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1983   gst_bin_remove (GST_BIN (splitmux), element);
1984 }
1985
1986
1987 static void
1988 _send_event (const GValue * value, gpointer user_data)
1989 {
1990   GstPad *pad = g_value_get_object (value);
1991   GstEvent *ev = user_data;
1992
1993   gst_pad_send_event (pad, gst_event_ref (ev));
1994 }
1995
1996 /* Called with lock held when a fragment
1997  * reaches EOS and it is time to restart
1998  * a new fragment
1999  */
2000 static GstFlowReturn
2001 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2002 {
2003   GstElement *muxer, *sink;
2004
2005   g_assert (ctx->is_reference);
2006
2007   /* 1 change to new file */
2008   splitmux->switching_fragment = TRUE;
2009
2010   /* We need to drop the splitmux lock to acquire the state lock
2011    * here and ensure there's no racy state change going on elsewhere */
2012   muxer = gst_object_ref (splitmux->muxer);
2013   sink = gst_object_ref (splitmux->active_sink);
2014
2015   GST_SPLITMUX_UNLOCK (splitmux);
2016   GST_SPLITMUX_STATE_LOCK (splitmux);
2017
2018   if (splitmux->shutdown) {
2019     GST_DEBUG_OBJECT (splitmux,
2020         "Shutdown requested. Aborting fragment switch.");
2021     GST_SPLITMUX_LOCK (splitmux);
2022     GST_SPLITMUX_STATE_UNLOCK (splitmux);
2023     gst_object_unref (muxer);
2024     gst_object_unref (sink);
2025     return GST_FLOW_FLUSHING;
2026   }
2027
2028   if (splitmux->async_finalize) {
2029     if (splitmux->muxed_out_bytes > 0
2030         || splitmux->fragment_id != splitmux->start_index) {
2031       gchar *newname;
2032       GstElement *new_sink, *new_muxer;
2033
2034       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2035           splitmux->fragment_id);
2036       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2037       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2038       GST_SPLITMUX_LOCK (splitmux);
2039       if ((splitmux->sink =
2040               create_element (splitmux, splitmux->sink_factory, newname,
2041                   TRUE)) == NULL)
2042         goto fail;
2043       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2044         gst_preset_load_preset (GST_PRESET (splitmux->sink),
2045             splitmux->sink_preset);
2046       if (splitmux->sink_properties)
2047         gst_structure_foreach (splitmux->sink_properties,
2048             _set_property_from_structure, splitmux->sink);
2049       splitmux->active_sink = splitmux->sink;
2050       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2051       g_free (newname);
2052       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2053       if ((splitmux->muxer =
2054               create_element (splitmux, splitmux->muxer_factory, newname,
2055                   TRUE)) == NULL)
2056         goto fail;
2057       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2058               "async") != NULL) {
2059         /* async child elements are causing state change races and weird
2060          * failures, so let's try and turn that off */
2061         g_object_set (splitmux->sink, "async", FALSE, NULL);
2062       }
2063       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2064         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2065             splitmux->muxer_preset);
2066       if (splitmux->muxer_properties)
2067         gst_structure_foreach (splitmux->muxer_properties,
2068             _set_property_from_structure, splitmux->muxer);
2069       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2070       g_free (newname);
2071       new_sink = splitmux->sink;
2072       new_muxer = splitmux->muxer;
2073       GST_SPLITMUX_UNLOCK (splitmux);
2074       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2075       gst_element_link (new_muxer, new_sink);
2076
2077       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2078         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2079                     EOS_FROM_US)) == 2) {
2080           _lock_and_set_to_null (muxer, splitmux);
2081           _lock_and_set_to_null (sink, splitmux);
2082         } else {
2083           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2084               GINT_TO_POINTER (2));
2085         }
2086       }
2087       gst_object_unref (muxer);
2088       gst_object_unref (sink);
2089       muxer = new_muxer;
2090       sink = new_sink;
2091       gst_object_ref (muxer);
2092       gst_object_ref (sink);
2093     }
2094   } else {
2095
2096     gst_element_set_locked_state (muxer, TRUE);
2097     gst_element_set_locked_state (sink, TRUE);
2098     gst_element_set_state (sink, GST_STATE_NULL);
2099
2100     if (splitmux->reset_muxer) {
2101       gst_element_set_state (muxer, GST_STATE_NULL);
2102     } else {
2103       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2104       GstEvent *ev;
2105       guint32 seqnum;
2106
2107       ev = gst_event_new_flush_start ();
2108       seqnum = gst_event_get_seqnum (ev);
2109       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110       gst_event_unref (ev);
2111
2112       gst_iterator_resync (it);
2113
2114       ev = gst_event_new_flush_stop (TRUE);
2115       gst_event_set_seqnum (ev, seqnum);
2116       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2117       gst_event_unref (ev);
2118
2119       gst_iterator_free (it);
2120     }
2121   }
2122
2123   GST_SPLITMUX_LOCK (splitmux);
2124   set_next_filename (splitmux, ctx);
2125   splitmux->muxed_out_bytes = 0;
2126   GST_SPLITMUX_UNLOCK (splitmux);
2127
2128   if (gst_element_set_state (sink,
2129           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2130     gst_element_set_state (sink, GST_STATE_NULL);
2131     gst_element_set_locked_state (muxer, FALSE);
2132     gst_element_set_locked_state (sink, FALSE);
2133
2134     goto fail_output;
2135   }
2136
2137   if (gst_element_set_state (muxer,
2138           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2139     gst_element_set_state (muxer, GST_STATE_NULL);
2140     gst_element_set_state (sink, GST_STATE_NULL);
2141     gst_element_set_locked_state (muxer, FALSE);
2142     gst_element_set_locked_state (sink, FALSE);
2143     goto fail_muxer;
2144   }
2145
2146   gst_element_set_locked_state (muxer, FALSE);
2147   gst_element_set_locked_state (sink, FALSE);
2148
2149   gst_object_unref (sink);
2150   gst_object_unref (muxer);
2151
2152   GST_SPLITMUX_LOCK (splitmux);
2153   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2154   splitmux->switching_fragment = FALSE;
2155   do_async_done (splitmux);
2156
2157   splitmux->ready_for_output = TRUE;
2158
2159   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2160   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2161
2162   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2163
2164   /* FIXME: Is this always the correct next state? */
2165   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2166   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2167   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2168   return GST_FLOW_OK;
2169
2170 fail:
2171   gst_object_unref (sink);
2172   gst_object_unref (muxer);
2173
2174   GST_SPLITMUX_LOCK (splitmux);
2175   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2176   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2177       ("Could not create the new muxer/sink"), NULL);
2178   return GST_FLOW_ERROR;
2179
2180 fail_output:
2181   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2182       ("Could not start new output sink"), NULL);
2183   gst_object_unref (sink);
2184   gst_object_unref (muxer);
2185
2186   GST_SPLITMUX_LOCK (splitmux);
2187   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2188   splitmux->switching_fragment = FALSE;
2189   return GST_FLOW_ERROR;
2190
2191 fail_muxer:
2192   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2193       ("Could not start new muxer"), NULL);
2194   gst_object_unref (sink);
2195   gst_object_unref (muxer);
2196
2197   GST_SPLITMUX_LOCK (splitmux);
2198   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2199   splitmux->switching_fragment = FALSE;
2200   return GST_FLOW_ERROR;
2201 }
2202
2203 static void
2204 bus_handler (GstBin * bin, GstMessage * message)
2205 {
2206   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2207
2208   switch (GST_MESSAGE_TYPE (message)) {
2209     case GST_MESSAGE_EOS:{
2210       /* If the state is draining out the current file, drop this EOS */
2211       GstElement *sink;
2212
2213       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2214       GST_SPLITMUX_LOCK (splitmux);
2215
2216       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2217
2218       if (splitmux->async_finalize) {
2219
2220         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2221           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2222                       EOS_FROM_US)) == 2) {
2223             GstElement *muxer;
2224             GstPad *sinksink, *muxersrc;
2225
2226             sinksink = gst_element_get_static_pad (sink, "sink");
2227             muxersrc = gst_pad_get_peer (sinksink);
2228             muxer = gst_pad_get_parent_element (muxersrc);
2229             gst_object_unref (sinksink);
2230             gst_object_unref (muxersrc);
2231
2232             gst_element_call_async (muxer,
2233                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2234                 gst_object_ref (splitmux), gst_object_unref);
2235             gst_element_call_async (sink,
2236                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2237                 gst_object_ref (splitmux), gst_object_unref);
2238             gst_object_unref (muxer);
2239           } else {
2240             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2241                 GINT_TO_POINTER (2));
2242           }
2243           GST_DEBUG_OBJECT (splitmux,
2244               "Caught async EOS from previous muxer+sink. Dropping.");
2245           /* We forward the EOS so that it gets aggregated as normal. If the sink
2246            * finishes and is removed before the end, it will be de-aggregated */
2247           gst_message_unref (message);
2248           GST_SPLITMUX_UNLOCK (splitmux);
2249           return;
2250         }
2251       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2252         GST_DEBUG_OBJECT (splitmux,
2253             "Passing EOS message. Output state %d max_out_running_time %"
2254             GST_STIME_FORMAT, splitmux->output_state,
2255             GST_STIME_ARGS (splitmux->max_out_running_time));
2256       } else {
2257         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2258         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2259         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2260
2261         gst_message_unref (message);
2262         GST_SPLITMUX_UNLOCK (splitmux);
2263         return;
2264       }
2265       GST_SPLITMUX_UNLOCK (splitmux);
2266       break;
2267     }
2268     case GST_MESSAGE_ASYNC_START:
2269     case GST_MESSAGE_ASYNC_DONE:
2270       /* Ignore state changes from our children while switching */
2271       GST_SPLITMUX_LOCK (splitmux);
2272       if (splitmux->switching_fragment) {
2273         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2274             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2275           GST_LOG_OBJECT (splitmux,
2276               "Ignoring state change from child %" GST_PTR_FORMAT
2277               " while switching", GST_MESSAGE_SRC (message));
2278           gst_message_unref (message);
2279           GST_SPLITMUX_UNLOCK (splitmux);
2280           return;
2281         }
2282       }
2283       GST_SPLITMUX_UNLOCK (splitmux);
2284       break;
2285     case GST_MESSAGE_WARNING:
2286     {
2287       GError *gerror = NULL;
2288
2289       gst_message_parse_warning (message, &gerror, NULL);
2290
2291       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2292         GList *item;
2293         gboolean caps_change = FALSE;
2294
2295         GST_SPLITMUX_LOCK (splitmux);
2296
2297         for (item = splitmux->contexts; item; item = item->next) {
2298           MqStreamCtx *ctx = item->data;
2299
2300           if (ctx->caps_change) {
2301             caps_change = TRUE;
2302             break;
2303           }
2304         }
2305
2306         GST_SPLITMUX_UNLOCK (splitmux);
2307
2308         if (caps_change) {
2309           GST_LOG_OBJECT (splitmux,
2310               "Ignoring warning change from child %" GST_PTR_FORMAT
2311               " while switching caps", GST_MESSAGE_SRC (message));
2312           gst_message_unref (message);
2313           return;
2314         }
2315       }
2316       break;
2317     }
2318     default:
2319       break;
2320   }
2321
2322   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2323 }
2324
2325 static void
2326 ctx_set_unblock (MqStreamCtx * ctx)
2327 {
2328   ctx->need_unblock = TRUE;
2329 }
2330
2331 static gboolean
2332 need_new_fragment (GstSplitMuxSink * splitmux,
2333     GstClockTime queued_time, GstClockTime queued_gop_time,
2334     guint64 queued_bytes)
2335 {
2336   guint64 thresh_bytes;
2337   GstClockTime thresh_time;
2338   gboolean check_robust_muxing;
2339   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2340   GstClockTime *ptr_to_time;
2341   const InputGop *gop, *next_gop;
2342
2343   GST_OBJECT_LOCK (splitmux);
2344   thresh_bytes = splitmux->threshold_bytes;
2345   thresh_time = splitmux->threshold_time;
2346   ptr_to_time = (GstClockTime *)
2347       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2348   if (ptr_to_time)
2349     time_to_split = *ptr_to_time;
2350   check_robust_muxing = splitmux->use_robust_muxing
2351       && splitmux->muxer_has_reserved_props;
2352   GST_OBJECT_UNLOCK (splitmux);
2353
2354   /* Have we muxed at least one thing from the reference
2355    * stream into the file? If not, no other streams can have
2356    * either */
2357   if (splitmux->fragment_reference_bytes <= 0) {
2358     GST_TRACE_OBJECT (splitmux,
2359         "Not ready to split - nothing muxed on the reference stream");
2360     return FALSE;
2361   }
2362
2363   /* User told us to split now */
2364   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2365     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2366     return TRUE;
2367   }
2368
2369   gop = g_queue_peek_head (&splitmux->pending_input_gops);
2370   /* We need a full GOP queued up at this point */
2371   g_assert (gop != NULL);
2372   next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2373   /* And the beginning of the next GOP or otherwise EOS */
2374
2375   /* User told us to split at this running time */
2376   if (gop->start_time >= time_to_split) {
2377     GST_OBJECT_LOCK (splitmux);
2378     /* Dequeue running time */
2379     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2380     /* Empty any running times after this that are past now */
2381     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382     while (ptr_to_time) {
2383       time_to_split = *ptr_to_time;
2384       if (gop->start_time < time_to_split) {
2385         break;
2386       }
2387       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2388       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2389     }
2390     GST_TRACE_OBJECT (splitmux,
2391         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2392         GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2393         GST_STIME_ARGS (time_to_split));
2394     GST_OBJECT_UNLOCK (splitmux);
2395     return TRUE;
2396   }
2397
2398   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2399     GST_TRACE_OBJECT (splitmux,
2400         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2401     return TRUE;                /* Would overrun byte limit */
2402   }
2403
2404   if (thresh_time > 0 && queued_time > thresh_time) {
2405     GST_TRACE_OBJECT (splitmux,
2406         "queued time %" GST_STIME_FORMAT " overruns time limit",
2407         GST_STIME_ARGS (queued_time));
2408     return TRUE;                /* Would overrun time limit */
2409   }
2410
2411   if (splitmux->tc_interval) {
2412     GstClockTime next_gop_start_time =
2413         next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2414
2415     if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2416         GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2417         next_gop_start_time >
2418         splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2419       GST_TRACE_OBJECT (splitmux,
2420           "in running time %" GST_STIME_FORMAT " overruns time limit %"
2421           GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2422           GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2423       return TRUE;
2424     }
2425   }
2426
2427   if (check_robust_muxing) {
2428     GstClockTime mux_reserved_remain;
2429
2430     g_object_get (splitmux->muxer,
2431         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2432
2433     GST_LOG_OBJECT (splitmux,
2434         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2435         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2436         mux_reserved_remain, queued_gop_time);
2437
2438     if (queued_gop_time >= mux_reserved_remain) {
2439       GST_INFO_OBJECT (splitmux,
2440           "File is about to run out of header room - %" G_GUINT64_FORMAT
2441           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2442           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2443       return TRUE;
2444     }
2445   }
2446
2447   /* Continue and mux this GOP */
2448   return FALSE;
2449 }
2450
2451 /* probably we want to add this API? */
2452 static void
2453 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2454 {
2455   GstVideoTimeCode *timecode = NULL;
2456
2457   g_return_if_fail (old_tc != NULL);
2458
2459   if (*old_tc == new_tc)
2460     return;
2461
2462   if (new_tc)
2463     timecode = gst_video_time_code_copy (new_tc);
2464
2465   if (*old_tc)
2466     gst_video_time_code_free (*old_tc);
2467
2468   *old_tc = timecode;
2469 }
2470
2471 /* Called with splitmux lock held */
2472 /* Called when entering ProcessingCompleteGop state
2473  * Assess if mq contents overflowed the current file
2474  *   -> If yes, need to switch to new file
2475  *   -> if no, set max_out_running_time to let this GOP in and
2476  *      go to COLLECTING_GOP_START state
2477  */
2478 static void
2479 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2480     GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2481 {
2482   guint64 queued_bytes;
2483   GstClockTimeDiff queued_time = 0;
2484   GstClockTimeDiff queued_gop_time = 0;
2485   SplitMuxOutputCommand *cmd;
2486
2487   /* Assess if the multiqueue contents overflowed the current file */
2488   /* When considering if a newly gathered GOP overflows
2489    * the time limit for the file, only consider the running time of the
2490    * reference stream. Other streams might have run ahead a little bit,
2491    * but extra pieces won't be released to the muxer beyond the reference
2492    * stream cut-off anyway - so it forms the limit. */
2493   queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2494   queued_time = next_gop_start_time;
2495   /* queued_gop_time tracks how much unwritten data there is waiting to
2496    * be written to this fragment including this GOP */
2497   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2498     queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2499   else
2500     queued_gop_time = queued_time - gop->start_time;
2501
2502   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2503   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2504       " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2505       " gop start time %" GST_STIME_FORMAT,
2506       GST_STIME_ARGS (queued_time), queued_bytes,
2507       GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2508
2509   if (queued_gop_time < 0)
2510     goto error_gop_duration;
2511
2512   if (queued_time < splitmux->fragment_start_time)
2513     goto error_queued_time;
2514
2515   queued_time -= splitmux->fragment_start_time;
2516   if (queued_time < queued_gop_time)
2517     queued_gop_time = queued_time;
2518
2519   /* Expand queued bytes estimate by muxer overhead */
2520   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2521
2522   /* Check for overrun - have we output at least one byte and overrun
2523    * either threshold? */
2524   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2525     if (splitmux->async_finalize) {
2526       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2527       *sink_running_time = splitmux->reference_ctx->out_running_time;
2528       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2529           RUNNING_TIME, sink_running_time, g_free);
2530     }
2531     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2532     /* Tell the output side to start a new fragment */
2533     GST_INFO_OBJECT (splitmux,
2534         "This GOP (dur %" GST_STIME_FORMAT
2535         ") would overflow the fragment, Sending start_new_fragment cmd",
2536         GST_STIME_ARGS (queued_gop_time));
2537     cmd = out_cmd_buf_new ();
2538     cmd->start_new_fragment = TRUE;
2539     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2540     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2541
2542     splitmux->fragment_start_time = gop->start_time;
2543     splitmux->fragment_start_time_pts = gop->start_time_pts;
2544     splitmux->fragment_total_bytes = 0;
2545     splitmux->fragment_reference_bytes = 0;
2546
2547     video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2548     splitmux->next_fragment_start_tc_time =
2549         calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2550         splitmux->fragment_start_time, NULL);
2551     if (splitmux->tc_interval && splitmux->fragment_start_tc
2552         && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2553       GST_WARNING_OBJECT (splitmux,
2554           "Couldn't calculate next fragment start time for timecode mode");
2555     }
2556   }
2557
2558   /* And set up to collect the next GOP */
2559   if (max_out_running_time != G_MAXINT64) {
2560     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2561   } else {
2562     /* This is probably already the current state, but just in case: */
2563     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2564   }
2565
2566   /* And wake all input contexts to send a wake-up event */
2567   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2568   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2569
2570   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2571   splitmux->fragment_total_bytes += gop->total_bytes;
2572   splitmux->fragment_reference_bytes += gop->reference_bytes;
2573
2574   if (gop->total_bytes > 0) {
2575     GST_LOG_OBJECT (splitmux,
2576         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2577         " time %" GST_STIME_FORMAT,
2578         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2579
2580     /* Send this GOP to the output command queue */
2581     cmd = out_cmd_buf_new ();
2582     cmd->start_new_fragment = FALSE;
2583     cmd->max_output_ts = max_out_running_time;
2584     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2585         GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2586     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2587
2588     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2589   }
2590
2591   return;
2592
2593 error_gop_duration:
2594   GST_ELEMENT_ERROR (splitmux,
2595       STREAM, FAILED, ("Timestamping error on input streams"),
2596       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2597           GST_STIME_ARGS (queued_gop_time)));
2598   return;
2599 error_queued_time:
2600   GST_ELEMENT_ERROR (splitmux,
2601       STREAM, FAILED, ("Timestamping error on input streams"),
2602       ("Queued time is negative. Input went backwards. queued_time - %"
2603           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2604   return;
2605 }
2606
2607 /* Called with splitmux lock held */
2608 /* Called from each input pad when it is has all the pieces
2609  * for a GOP or EOS, starting with the reference pad which has set the
2610  * splitmux->max_in_running_time
2611  */
2612 static void
2613 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2614 {
2615   GList *cur;
2616   GstEvent *event;
2617
2618   /* On ENDING_FILE, the reference stream sends a command to start a new
2619    * fragment, then releases the GOP for output in the new fragment.
2620    *  If some streams received no buffer during the last GOP that overran,
2621    * because its next buffer has a timestamp bigger than
2622    * ctx->max_in_running_time, its queue is empty. In that case the only
2623    * way to wakeup the output thread is by injecting an event in the
2624    * queue. This usually happen with subtitle streams.
2625    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2626   if (ctx->need_unblock) {
2627     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2628     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2629         GST_EVENT_TYPE_SERIALIZED,
2630         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2631             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2632
2633     GST_SPLITMUX_UNLOCK (splitmux);
2634     gst_pad_send_event (ctx->sinkpad, event);
2635     GST_SPLITMUX_LOCK (splitmux);
2636
2637     ctx->need_unblock = FALSE;
2638     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2639     /* state may have changed while we were unlocked. Loop again if so */
2640     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2641       return;
2642   }
2643
2644   do {
2645     GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2646
2647     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2648       GstClockTimeDiff max_out_running_time;
2649       gboolean ready = TRUE;
2650       InputGop *gop;
2651       const InputGop *next_gop;
2652
2653       gop = g_queue_peek_head (&splitmux->pending_input_gops);
2654       next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2655
2656       /* If we have no GOP or no next GOP here then the reference context is
2657        * at EOS, otherwise use the start time of the next GOP if we're far
2658        * enough in the GOP to know it */
2659       if (gop && next_gop) {
2660         if (!splitmux->reference_ctx->in_eos
2661             && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2662             && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2663           GST_LOG_OBJECT (splitmux,
2664               "No further GOPs finished collecting, waiting until current DTS %"
2665               GST_STIME_FORMAT " has passed next GOP start PTS %"
2666               GST_STIME_FORMAT,
2667               GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2668               GST_STIME_ARGS (next_gop->start_time_pts));
2669           break;
2670         }
2671
2672         GST_LOG_OBJECT (splitmux,
2673             "Finished collecting GOP with start time %" GST_STIME_FORMAT
2674             ", next GOP start time %" GST_STIME_FORMAT,
2675             GST_STIME_ARGS (gop->start_time),
2676             GST_STIME_ARGS (next_gop->start_time));
2677         next_gop_start = next_gop->start_time;
2678         max_out_running_time =
2679             splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2680       } else if (!next_gop) {
2681         GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2682         next_gop_start = splitmux->max_in_running_time;
2683         max_out_running_time = G_MAXINT64;
2684       } else if (!gop) {
2685         GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2686         break;
2687       } else {
2688         g_assert_not_reached ();
2689       }
2690
2691       g_assert (gop != NULL);
2692
2693       /* Iterate each pad, and check that the input running time is at least
2694        * up to the start running time of the next GOP or EOS, and if so handle
2695        * the collected GOP */
2696       GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2697           GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2698       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2699           cur = g_list_next (cur)) {
2700         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2701
2702         GST_LOG_OBJECT (splitmux,
2703             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2704             " EOS %d", tmpctx, tmpctx->sinkpad,
2705             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2706
2707         if (next_gop_start != GST_CLOCK_STIME_NONE &&
2708             tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2709           GST_LOG_OBJECT (splitmux,
2710               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2711               tmpctx, tmpctx->sinkpad);
2712           ready = FALSE;
2713           break;
2714         }
2715       }
2716       if (ready) {
2717         GST_DEBUG_OBJECT (splitmux,
2718             "Collected GOP is complete. Processing (ctx %p)", ctx);
2719         /* All pads have a complete GOP, release it into the multiqueue */
2720         handle_gathered_gop (splitmux, gop, next_gop_start,
2721             max_out_running_time);
2722
2723         g_queue_pop_head (&splitmux->pending_input_gops);
2724         input_gop_free (gop);
2725
2726         /* The user has requested a split, we can split now that the previous GOP
2727          * has been collected to the correct location */
2728         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2729                 TRUE, FALSE)) {
2730           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2731         }
2732       }
2733     }
2734
2735     /* If upstream reached EOS we are not expecting more data, no need to wait
2736      * here. */
2737     if (ctx->in_eos)
2738       return;
2739
2740     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2741         !ctx->flushing &&
2742         ctx->in_running_time >= next_gop_start &&
2743         next_gop_start != GST_CLOCK_STIME_NONE) {
2744       /* Some pad is not yet ready, or GOP is being pushed
2745        * either way, sleep and wait to get woken */
2746       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2747       GST_SPLITMUX_WAIT_INPUT (splitmux);
2748       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2749     } else {
2750       /* This pad is not ready or the state changed - break out and get another
2751        * buffer / event */
2752       break;
2753     }
2754   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2755 }
2756
2757 static GstPadProbeReturn
2758 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2759 {
2760   GstSplitMuxSink *splitmux = ctx->splitmux;
2761   GstFlowReturn ret = GST_FLOW_OK;
2762   GstBuffer *buf;
2763   MqStreamBuf *buf_info = NULL;
2764   GstClockTime ts, pts, dts;
2765   GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2766   gboolean loop_again;
2767   gboolean keyframe = FALSE;
2768
2769   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2770
2771   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2772   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2773     g_warning ("Buffer list handling not implemented");
2774     return GST_PAD_PROBE_DROP;
2775   }
2776   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2777       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2778     GstEvent *event = gst_pad_probe_info_get_event (info);
2779
2780     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2781
2782     switch (GST_EVENT_TYPE (event)) {
2783       case GST_EVENT_SEGMENT:
2784         gst_event_copy_segment (event, &ctx->in_segment);
2785         break;
2786       case GST_EVENT_FLUSH_STOP:
2787         GST_SPLITMUX_LOCK (splitmux);
2788         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2789         ctx->in_eos = FALSE;
2790         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2791         GST_SPLITMUX_UNLOCK (splitmux);
2792         break;
2793       case GST_EVENT_EOS:
2794         GST_SPLITMUX_LOCK (splitmux);
2795         ctx->in_eos = TRUE;
2796
2797         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2798           ret = GST_FLOW_FLUSHING;
2799           goto beach;
2800         }
2801
2802         if (ctx->is_reference) {
2803           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2804           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2805           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2806           /* Wake up other input pads to collect this GOP */
2807           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2808           check_completed_gop (splitmux, ctx);
2809         } else if (splitmux->input_state ==
2810             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2811           /* If we are waiting for a GOP to be completed (ie, for aux
2812            * pads to catch up), then this pad is complete, so check
2813            * if the whole GOP is.
2814            */
2815           check_completed_gop (splitmux, ctx);
2816         }
2817         GST_SPLITMUX_UNLOCK (splitmux);
2818         break;
2819       case GST_EVENT_GAP:{
2820         GstClockTime gap_ts;
2821         GstClockTimeDiff rtime;
2822
2823         gst_event_parse_gap (event, &gap_ts, NULL);
2824         if (gap_ts == GST_CLOCK_TIME_NONE)
2825           break;
2826
2827         GST_SPLITMUX_LOCK (splitmux);
2828
2829         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2830           ret = GST_FLOW_FLUSHING;
2831           goto beach;
2832         }
2833         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2834
2835         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2836             GST_STIME_ARGS (rtime));
2837
2838         if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2839           /* If this GAP event happens before the first fragment then
2840            * initialize the fragment start time here. */
2841           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2842             splitmux->fragment_start_time = rtime;
2843             GST_LOG_OBJECT (splitmux,
2844                 "Fragment start time now %" GST_STIME_FORMAT,
2845                 GST_STIME_ARGS (splitmux->fragment_start_time));
2846
2847             /* Also take this as the first start time when starting up,
2848              * so that we start counting overflow from the first frame */
2849             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2850               splitmux->max_in_running_time = rtime;
2851             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2852               splitmux->max_in_running_time_dts = rtime;
2853           }
2854
2855           /* Similarly take it as fragment start PTS and GOP start time if
2856            * these are not set */
2857           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2858             splitmux->fragment_start_time_pts = rtime;
2859
2860           if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2861             InputGop *gop = g_slice_new0 (InputGop);
2862
2863             gop->from_gap = TRUE;
2864             gop->start_time = rtime;
2865             gop->start_time_pts = rtime;
2866
2867             g_queue_push_tail (&splitmux->pending_input_gops, gop);
2868           }
2869         }
2870
2871         GST_SPLITMUX_UNLOCK (splitmux);
2872         break;
2873       }
2874       default:
2875         break;
2876     }
2877     return GST_PAD_PROBE_PASS;
2878   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2879     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2880       case GST_QUERY_ALLOCATION:
2881         return GST_PAD_PROBE_DROP;
2882       default:
2883         return GST_PAD_PROBE_PASS;
2884     }
2885   }
2886
2887   buf = gst_pad_probe_info_get_buffer (info);
2888   buf_info = mq_stream_buf_new ();
2889
2890   pts = GST_BUFFER_PTS (buf);
2891   dts = GST_BUFFER_DTS (buf);
2892   if (GST_BUFFER_PTS_IS_VALID (buf))
2893     ts = GST_BUFFER_PTS (buf);
2894   else
2895     ts = GST_BUFFER_DTS (buf);
2896
2897   GST_LOG_OBJECT (pad,
2898       "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2899       GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2900       GST_TIME_ARGS (dts));
2901
2902   GST_SPLITMUX_LOCK (splitmux);
2903
2904   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2905     ret = GST_FLOW_FLUSHING;
2906     goto beach;
2907   }
2908
2909   /* If this buffer has a timestamp, advance the input timestamp of the
2910    * stream */
2911   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2912     running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2913
2914     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2915         GST_STIME_ARGS (running_time));
2916
2917     /* in running time is always the maximum PTS (or DTS) that was observed so far */
2918     if (GST_CLOCK_STIME_IS_VALID (running_time)
2919         && running_time > ctx->in_running_time)
2920       ctx->in_running_time = running_time;
2921   } else {
2922     running_time = ctx->in_running_time;
2923   }
2924
2925   if (GST_CLOCK_TIME_IS_VALID (pts))
2926     running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2927   else
2928     running_time_pts = GST_CLOCK_STIME_NONE;
2929
2930   if (GST_CLOCK_TIME_IS_VALID (dts))
2931     running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2932   else
2933     running_time_dts = GST_CLOCK_STIME_NONE;
2934
2935   /* Try to make sure we have a valid running time */
2936   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2937     ctx->in_running_time =
2938         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2939   }
2940
2941   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2942       GST_STIME_ARGS (ctx->in_running_time));
2943
2944   buf_info->run_ts = ctx->in_running_time;
2945   buf_info->buf_size = gst_buffer_get_size (buf);
2946   buf_info->duration = GST_BUFFER_DURATION (buf);
2947
2948   if (ctx->is_reference) {
2949     InputGop *gop = NULL;
2950     GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2951
2952     /* initialize fragment_start_time if it was not set yet (i.e. for the
2953      * first fragment), or otherwise set it to the minimum observed time */
2954     if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
2955         || splitmux->fragment_start_time > running_time) {
2956       if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
2957         splitmux->fragment_start_time_pts = running_time_pts;
2958       splitmux->fragment_start_time = running_time;
2959
2960       GST_LOG_OBJECT (splitmux,
2961           "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
2962           GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
2963           GST_STIME_ARGS (splitmux->fragment_start_time_pts));
2964
2965       /* Also take this as the first start time when starting up,
2966        * so that we start counting overflow from the first frame */
2967       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
2968           || splitmux->max_in_running_time < splitmux->fragment_start_time)
2969         splitmux->max_in_running_time = splitmux->fragment_start_time;
2970
2971       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2972         splitmux->max_in_running_time_dts = running_time_dts;
2973
2974       if (tc_meta) {
2975         video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2976
2977         splitmux->next_fragment_start_tc_time =
2978             calculate_next_max_timecode (splitmux, &tc_meta->tc,
2979             running_time, NULL);
2980         if (splitmux->tc_interval
2981             && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
2982         {
2983           GST_WARNING_OBJECT (splitmux,
2984               "Couldn't calculate next fragment start time for timecode mode");
2985         }
2986 #ifndef GST_DISABLE_GST_DEBUG
2987         {
2988           gchar *tc_str;
2989
2990           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
2991           GST_DEBUG_OBJECT (splitmux,
2992               "Initialize fragment start timecode %s, next fragment start timecode time %"
2993               GST_TIME_FORMAT, tc_str,
2994               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2995           g_free (tc_str);
2996         }
2997 #endif
2998       }
2999     }
3000
3001
3002     /* First check if we're at the very first GOP and the tracking was created
3003      * from a GAP event. In that case don't start a new GOP on keyframes but
3004      * just updated it as needed */
3005     gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3006
3007     if (!gop || (!gop->from_gap
3008             && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
3009       gop = g_slice_new0 (InputGop);
3010
3011       gop->start_time = running_time;
3012       gop->start_time_pts = running_time_pts;
3013
3014       GST_LOG_OBJECT (splitmux,
3015           "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3016           GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3017           GST_STIME_ARGS (gop->start_time_pts));
3018
3019       if (tc_meta) {
3020         video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3021
3022 #ifndef GST_DISABLE_GST_DEBUG
3023         {
3024           gchar *tc_str;
3025
3026           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3027           GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3028           g_free (tc_str);
3029         }
3030 #endif
3031       }
3032
3033       g_queue_push_tail (&splitmux->pending_input_gops, gop);
3034     } else {
3035       gop->from_gap = FALSE;
3036
3037       if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3038           || gop->start_time > running_time) {
3039         gop->start_time = running_time;
3040
3041         GST_LOG_OBJECT (splitmux,
3042             "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3043             GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3044             GST_STIME_ARGS (gop->start_time_pts));
3045
3046         if (tc_meta) {
3047           video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3048
3049 #ifndef GST_DISABLE_GST_DEBUG
3050           {
3051             gchar *tc_str;
3052
3053             tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3054             GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3055                 tc_str);
3056             g_free (tc_str);
3057           }
3058 #endif
3059         }
3060       }
3061     }
3062
3063     /* Check whether we need to request next keyframe depending on
3064      * current running time */
3065     if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3066       GST_WARNING_OBJECT (splitmux,
3067           "Could not request a keyframe. Files may not split at the exact location they should");
3068     }
3069   }
3070
3071   {
3072     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3073
3074     if (gop) {
3075       GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3076           " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3077           G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3078           gop->total_bytes, gop->total_bytes);
3079     }
3080   }
3081
3082   loop_again = TRUE;
3083   do {
3084     if (ctx->flushing) {
3085       ret = GST_FLOW_FLUSHING;
3086       goto beach;
3087     }
3088
3089     switch (splitmux->input_state) {
3090       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3091         if (ctx->is_reference) {
3092           const InputGop *gop, *next_gop;
3093
3094           /* This is the reference context. If it's a keyframe,
3095            * it marks the start of a new GOP and we should wait in
3096            * check_completed_gop before continuing, but either way
3097            * (keyframe or no, we'll pass this buffer through after
3098            * so set loop_again to FALSE */
3099           loop_again = FALSE;
3100
3101           gop = g_queue_peek_head (&splitmux->pending_input_gops);
3102           g_assert (gop != NULL);
3103           next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3104
3105           if (ctx->in_running_time > splitmux->max_in_running_time)
3106             splitmux->max_in_running_time = ctx->in_running_time;
3107           if (running_time_dts > splitmux->max_in_running_time_dts)
3108             splitmux->max_in_running_time_dts = running_time_dts;
3109
3110           GST_LOG_OBJECT (splitmux,
3111               "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3112               GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3113               GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3114
3115           if (!next_gop) {
3116             GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3117             /* Allow other input pads to catch up to here too */
3118             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3119             break;
3120           }
3121
3122           if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3123             GST_INFO_OBJECT (pad,
3124                 "Have keyframe with running time %" GST_STIME_FORMAT,
3125                 GST_STIME_ARGS (ctx->in_running_time));
3126             keyframe = TRUE;
3127           }
3128
3129           if (running_time_dts != GST_CLOCK_STIME_NONE
3130               && running_time_dts < next_gop->start_time_pts) {
3131             GST_DEBUG_OBJECT (splitmux,
3132                 "Waiting until DTS (%" GST_STIME_FORMAT
3133                 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3134                 GST_STIME_ARGS (running_time_dts),
3135                 GST_STIME_ARGS (next_gop->start_time_pts));
3136             /* Allow other input pads to catch up to here too */
3137             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3138             break;
3139           }
3140
3141           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3142           /* Wake up other input pads to collect this GOP */
3143           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3144           check_completed_gop (splitmux, ctx);
3145         } else {
3146           /* Pass this buffer if the reference ctx is far enough ahead */
3147           if (ctx->in_running_time < splitmux->max_in_running_time) {
3148             loop_again = FALSE;
3149             break;
3150           }
3151
3152           /* We're still waiting for a keyframe on the reference pad, sleep */
3153           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3154           GST_SPLITMUX_WAIT_INPUT (splitmux);
3155           GST_LOG_OBJECT (pad,
3156               "Done sleeping for GOP start input state now %d",
3157               splitmux->input_state);
3158         }
3159         break;
3160       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3161         /* We're collecting a GOP, this is only ever called for non-reference
3162          * contexts as the reference context would be waiting inside
3163          * check_completed_gop() */
3164
3165         g_assert (!ctx->is_reference);
3166
3167         /* If we overran the target timestamp, it might be time to process
3168          * the GOP, otherwise bail out for more data. */
3169         GST_LOG_OBJECT (pad,
3170             "Checking TS %" GST_STIME_FORMAT " against max %"
3171             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3172             GST_STIME_ARGS (splitmux->max_in_running_time));
3173
3174         if (ctx->in_running_time < splitmux->max_in_running_time) {
3175           loop_again = FALSE;
3176           break;
3177         }
3178
3179         GST_LOG_OBJECT (pad,
3180             "Collected last packet of GOP. Checking other pads");
3181         check_completed_gop (splitmux, ctx);
3182         break;
3183       }
3184       case SPLITMUX_INPUT_STATE_FINISHING_UP:
3185         loop_again = FALSE;
3186         break;
3187       default:
3188         loop_again = FALSE;
3189         break;
3190     }
3191   }
3192   while (loop_again);
3193
3194   if (keyframe && ctx->is_reference)
3195     splitmux->queued_keyframes++;
3196   buf_info->keyframe = keyframe;
3197
3198   /* Update total input byte counter for overflow detect unless we're after
3199    * EOS now */
3200   if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
3201       && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
3202     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3203
3204     /* We must have a GOP at this point */
3205     g_assert (gop != NULL);
3206
3207     gop->total_bytes += buf_info->buf_size;
3208     if (ctx->is_reference) {
3209       gop->reference_bytes += buf_info->buf_size;
3210     }
3211   }
3212
3213   /* Now add this buffer to the queue just before returning */
3214   g_queue_push_head (&ctx->queued_bufs, buf_info);
3215
3216   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3217       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3218
3219   GST_SPLITMUX_UNLOCK (splitmux);
3220   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3221   return GST_PAD_PROBE_PASS;
3222
3223 beach:
3224   GST_SPLITMUX_UNLOCK (splitmux);
3225   if (buf_info)
3226     mq_stream_buf_free (buf_info);
3227   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3228   return GST_PAD_PROBE_PASS;
3229 }
3230
3231 static void
3232 grow_blocked_queues (GstSplitMuxSink * splitmux)
3233 {
3234   GList *cur;
3235
3236   /* Scan other queues for full-ness and grow them */
3237   for (cur = g_list_first (splitmux->contexts);
3238       cur != NULL; cur = g_list_next (cur)) {
3239     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3240     guint cur_limit;
3241     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3242
3243     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3244     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3245
3246     if (cur_len >= cur_limit) {
3247       cur_limit = cur_len + 1;
3248       GST_DEBUG_OBJECT (tmpctx->q,
3249           "Queue overflowed and needs enlarging. Growing to %u buffers",
3250           cur_limit);
3251       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3252     }
3253   }
3254 }
3255
3256 static void
3257 handle_q_underrun (GstElement * q, gpointer user_data)
3258 {
3259   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3260   GstSplitMuxSink *splitmux = ctx->splitmux;
3261
3262   GST_SPLITMUX_LOCK (splitmux);
3263   GST_DEBUG_OBJECT (q,
3264       "Queue reported underrun with %d keyframes and %d cmds enqueued",
3265       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3266   grow_blocked_queues (splitmux);
3267   GST_SPLITMUX_UNLOCK (splitmux);
3268 }
3269
3270 static void
3271 handle_q_overrun (GstElement * q, gpointer user_data)
3272 {
3273   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3274   GstSplitMuxSink *splitmux = ctx->splitmux;
3275   gboolean allow_grow = FALSE;
3276
3277   GST_SPLITMUX_LOCK (splitmux);
3278   GST_DEBUG_OBJECT (q,
3279       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3280       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3281
3282   if (splitmux->queued_keyframes < 2) {
3283     /* Less than a full GOP queued, grow the queue */
3284     allow_grow = TRUE;
3285   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3286     allow_grow = TRUE;
3287   } else {
3288     /* If another queue is starved, grow */
3289     GList *cur;
3290     for (cur = g_list_first (splitmux->contexts);
3291         cur != NULL; cur = g_list_next (cur)) {
3292       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3293       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3294         allow_grow = TRUE;
3295       }
3296     }
3297   }
3298   GST_SPLITMUX_UNLOCK (splitmux);
3299
3300   if (allow_grow) {
3301     guint cur_limit;
3302
3303     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3304     cur_limit++;
3305
3306     GST_DEBUG_OBJECT (q,
3307         "Queue overflowed and needs enlarging. Growing to %u buffers",
3308         cur_limit);
3309
3310     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3311   }
3312 }
3313
3314 /* Called with SPLITMUX lock held */
3315 static const gchar *
3316 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3317 {
3318   const gchar *ret = NULL;
3319
3320   if (splitmux->muxerpad_map == NULL)
3321     return NULL;
3322
3323   if (sinkpad_name == NULL) {
3324     GST_WARNING_OBJECT (splitmux,
3325         "Can't look up request pad in pad map without providing a pad name");
3326     return NULL;
3327   }
3328
3329   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3330   if (ret) {
3331     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3332         ret);
3333     return g_strdup (ret);
3334   }
3335
3336   return NULL;
3337 }
3338
3339 static GstPad *
3340 gst_splitmux_sink_request_new_pad (GstElement * element,
3341     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3342 {
3343   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3344   GstPadTemplate *mux_template = NULL;
3345   GstPad *ret = NULL, *muxpad = NULL;
3346   GstElement *q;
3347   GstPad *q_sink = NULL, *q_src = NULL;
3348   gchar *gname, *qname;
3349   gboolean is_primary_video = FALSE, is_video = FALSE,
3350       muxer_is_requestpad = FALSE;
3351   MqStreamCtx *ctx;
3352   const gchar *muxer_padname = NULL;
3353
3354   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3355
3356   GST_SPLITMUX_LOCK (splitmux);
3357   if (!create_muxer (splitmux))
3358     goto fail;
3359   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3360
3361   if (g_str_equal (templ->name_template, "video") ||
3362       g_str_has_prefix (templ->name_template, "video_aux_")) {
3363     is_primary_video = g_str_equal (templ->name_template, "video");
3364     if (is_primary_video && splitmux->have_video)
3365       goto already_have_video;
3366     is_video = TRUE;
3367   }
3368
3369   /* See if there's a pad map and it lists this pad */
3370   muxer_padname = lookup_muxer_pad (splitmux, name);
3371
3372   if (muxer_padname == NULL) {
3373     if (is_video) {
3374       /* FIXME: Look for a pad template with matching caps, rather than by name */
3375       GST_DEBUG_OBJECT (element,
3376           "searching for pad-template with name 'video_%%u'");
3377       mux_template =
3378           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3379           (splitmux->muxer), "video_%u");
3380
3381       /* Fallback to find sink pad templates named 'video' (flvmux) */
3382       if (!mux_template) {
3383         GST_DEBUG_OBJECT (element,
3384             "searching for pad-template with name 'video'");
3385         mux_template =
3386             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3387             (splitmux->muxer), "video");
3388       }
3389       name = NULL;
3390     } else {
3391       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3392           templ->name_template);
3393       mux_template =
3394           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3395           (splitmux->muxer), templ->name_template);
3396
3397       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3398       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3399         GST_DEBUG_OBJECT (element,
3400             "searching for pad-template with name 'audio'");
3401         mux_template =
3402             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3403             (splitmux->muxer), "audio");
3404         name = NULL;
3405       }
3406     }
3407
3408     if (mux_template == NULL) {
3409       GST_DEBUG_OBJECT (element,
3410           "searching for pad-template with name 'sink_%%d'");
3411       mux_template =
3412           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3413           (splitmux->muxer), "sink_%d");
3414       name = NULL;
3415     }
3416     if (mux_template == NULL) {
3417       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3418       mux_template =
3419           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3420           (splitmux->muxer), "sink");
3421       name = NULL;
3422     }
3423
3424     if (mux_template == NULL) {
3425       GST_ERROR_OBJECT (element,
3426           "unable to find a suitable sink pad-template on the muxer");
3427       goto fail;
3428     }
3429     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3430         mux_template->name_template);
3431
3432     if (mux_template->presence == GST_PAD_REQUEST) {
3433       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3434
3435       muxpad =
3436           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3437       muxer_is_requestpad = TRUE;
3438     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3439       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3440
3441       muxpad =
3442           gst_element_get_static_pad (splitmux->muxer,
3443           mux_template->name_template);
3444     } else {
3445       GST_ERROR_OBJECT (element,
3446           "unexpected pad presence %d", mux_template->presence);
3447       goto fail;
3448     }
3449   } else {
3450     /* Have a muxer pad name */
3451     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3452       if ((muxpad =
3453               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3454         muxer_is_requestpad = TRUE;
3455     }
3456     g_free ((gchar *) muxer_padname);
3457     muxer_padname = NULL;
3458   }
3459
3460   /* One way or another, we must have a muxer pad by now */
3461   if (muxpad == NULL)
3462     goto fail;
3463
3464   if (is_primary_video)
3465     gname = g_strdup ("video");
3466   else if (name == NULL)
3467     gname = gst_pad_get_name (muxpad);
3468   else
3469     gname = g_strdup (name);
3470
3471   qname = g_strdup_printf ("queue_%s", gname);
3472   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3473     g_free (qname);
3474     goto fail;
3475   }
3476   g_free (qname);
3477
3478   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3479
3480   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3481       "max-size-buffers", 5, NULL);
3482
3483   q_sink = gst_element_get_static_pad (q, "sink");
3484   q_src = gst_element_get_static_pad (q, "src");
3485
3486   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3487     if (muxer_is_requestpad)
3488       gst_element_release_request_pad (splitmux->muxer, muxpad);
3489     gst_object_unref (GST_OBJECT (muxpad));
3490     goto fail;
3491   }
3492
3493   gst_object_unref (GST_OBJECT (muxpad));
3494
3495   ctx = mq_stream_ctx_new (splitmux);
3496   /* Context holds a ref: */
3497   ctx->q = gst_object_ref (q);
3498   ctx->srcpad = q_src;
3499   ctx->sinkpad = q_sink;
3500   ctx->q_overrun_id =
3501       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3502   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3503
3504   ctx->src_pad_block_id =
3505       gst_pad_add_probe (q_src,
3506       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3507       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3508   if (is_primary_video && splitmux->reference_ctx != NULL) {
3509     splitmux->reference_ctx->is_reference = FALSE;
3510     splitmux->reference_ctx = NULL;
3511   }
3512   if (splitmux->reference_ctx == NULL) {
3513     splitmux->reference_ctx = ctx;
3514     ctx->is_reference = TRUE;
3515   }
3516
3517   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3518   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3519
3520   ctx->sink_pad_block_id =
3521       gst_pad_add_probe (q_sink,
3522       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3523       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3524       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3525
3526   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3527       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3528
3529   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3530
3531   g_free (gname);
3532
3533   if (is_primary_video)
3534     splitmux->have_video = TRUE;
3535
3536   gst_pad_set_active (ret, TRUE);
3537   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3538
3539   GST_SPLITMUX_UNLOCK (splitmux);
3540
3541   return ret;
3542 fail:
3543   GST_SPLITMUX_UNLOCK (splitmux);
3544
3545   if (q_sink)
3546     gst_object_unref (q_sink);
3547   if (q_src)
3548     gst_object_unref (q_src);
3549   return NULL;
3550 already_have_video:
3551   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3552   GST_SPLITMUX_UNLOCK (splitmux);
3553   return NULL;
3554 }
3555
3556 static void
3557 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3558 {
3559   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3560   GstPad *muxpad = NULL;
3561   MqStreamCtx *ctx =
3562       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3563
3564   GST_SPLITMUX_LOCK (splitmux);
3565
3566   if (splitmux->muxer == NULL)
3567     goto fail;                  /* Elements don't exist yet - nothing to release */
3568
3569   GST_INFO_OBJECT (pad, "releasing request pad");
3570
3571   muxpad = gst_pad_get_peer (ctx->srcpad);
3572
3573   /* Remove the context from our consideration */
3574   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3575
3576   ctx->flushing = TRUE;
3577   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3578
3579   GST_SPLITMUX_UNLOCK (splitmux);
3580
3581   if (ctx->sink_pad_block_id) {
3582     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3583     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3584   }
3585
3586   if (ctx->src_pad_block_id)
3587     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3588
3589   /* Wait for the pad to be free */
3590   GST_PAD_STREAM_LOCK (pad);
3591   GST_SPLITMUX_LOCK (splitmux);
3592   GST_PAD_STREAM_UNLOCK (pad);
3593
3594   /* Can release the context now */
3595   mq_stream_ctx_free (ctx);
3596   if (ctx == splitmux->reference_ctx)
3597     splitmux->reference_ctx = NULL;
3598
3599   /* Release and free the muxer input */
3600   if (muxpad) {
3601     gst_element_release_request_pad (splitmux->muxer, muxpad);
3602     gst_object_unref (muxpad);
3603   }
3604
3605   if (GST_PAD_PAD_TEMPLATE (pad) &&
3606       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3607               (pad)), "video"))
3608     splitmux->have_video = FALSE;
3609
3610   gst_element_remove_pad (element, pad);
3611
3612   /* Reset the internal elements only after all request pads are released */
3613   if (splitmux->contexts == NULL)
3614     gst_splitmux_reset_elements (splitmux);
3615
3616   /* Wake up other input streams to check if the completion conditions have
3617    * changed */
3618   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3619
3620 fail:
3621   GST_SPLITMUX_UNLOCK (splitmux);
3622 }
3623
3624 static GstElement *
3625 create_element (GstSplitMuxSink * splitmux,
3626     const gchar * factory, const gchar * name, gboolean locked)
3627 {
3628   GstElement *ret = gst_element_factory_make (factory, name);
3629   if (ret == NULL) {
3630     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3631     return NULL;
3632   }
3633
3634   if (locked) {
3635     /* Ensure the sink starts in locked state and NULL - it will be changed
3636      * by the filename setting code */
3637     gst_element_set_locked_state (ret, TRUE);
3638     gst_element_set_state (ret, GST_STATE_NULL);
3639   }
3640
3641   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3642     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3643     gst_object_unref (ret);
3644     return NULL;
3645   }
3646
3647   return ret;
3648 }
3649
3650 static gboolean
3651 create_muxer (GstSplitMuxSink * splitmux)
3652 {
3653   /* Create internal elements */
3654   if (splitmux->muxer == NULL) {
3655     GstElement *provided_muxer = NULL;
3656
3657     GST_OBJECT_LOCK (splitmux);
3658     if (splitmux->provided_muxer != NULL)
3659       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3660     GST_OBJECT_UNLOCK (splitmux);
3661
3662     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3663         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3664       if ((splitmux->muxer =
3665               create_element (splitmux,
3666                   splitmux->muxer_factory ? splitmux->
3667                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3668         goto fail;
3669     } else if (splitmux->async_finalize) {
3670       if ((splitmux->muxer =
3671               create_element (splitmux, splitmux->muxer_factory, "muxer",
3672                   FALSE)) == NULL)
3673         goto fail;
3674       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3675         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3676             splitmux->muxer_preset);
3677       if (splitmux->muxer_properties)
3678         gst_structure_foreach (splitmux->muxer_properties,
3679             _set_property_from_structure, splitmux->muxer);
3680     } else {
3681       /* Ensure it's not in locked state (we might be reusing an old element) */
3682       gst_element_set_locked_state (provided_muxer, FALSE);
3683       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3684         g_warning ("Could not add muxer element - splitmuxsink will not work");
3685         gst_object_unref (provided_muxer);
3686         goto fail;
3687       }
3688
3689       splitmux->muxer = provided_muxer;
3690       gst_object_unref (provided_muxer);
3691     }
3692
3693     if (splitmux->use_robust_muxing) {
3694       update_muxer_properties (splitmux);
3695     }
3696   }
3697
3698   return TRUE;
3699 fail:
3700   return FALSE;
3701 }
3702
3703 static GstElement *
3704 find_sink (GstElement * e)
3705 {
3706   GstElement *res = NULL;
3707   GstIterator *iter;
3708   gboolean done = FALSE;
3709   GValue data = { 0, };
3710
3711   if (!GST_IS_BIN (e))
3712     return e;
3713
3714   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3715     return e;
3716
3717   iter = gst_bin_iterate_sinks (GST_BIN (e));
3718   while (!done) {
3719     switch (gst_iterator_next (iter, &data)) {
3720       case GST_ITERATOR_OK:
3721       {
3722         GstElement *child = g_value_get_object (&data);
3723         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3724                 "location") != NULL) {
3725           res = child;
3726           done = TRUE;
3727         }
3728         g_value_reset (&data);
3729         break;
3730       }
3731       case GST_ITERATOR_RESYNC:
3732         gst_iterator_resync (iter);
3733         break;
3734       case GST_ITERATOR_DONE:
3735         done = TRUE;
3736         break;
3737       case GST_ITERATOR_ERROR:
3738         g_assert_not_reached ();
3739         break;
3740     }
3741   }
3742   g_value_unset (&data);
3743   gst_iterator_free (iter);
3744
3745   return res;
3746 }
3747
3748 static gboolean
3749 create_sink (GstSplitMuxSink * splitmux)
3750 {
3751   GstElement *provided_sink = NULL;
3752
3753   if (splitmux->active_sink == NULL) {
3754
3755     GST_OBJECT_LOCK (splitmux);
3756     if (splitmux->provided_sink != NULL)
3757       provided_sink = gst_object_ref (splitmux->provided_sink);
3758     GST_OBJECT_UNLOCK (splitmux);
3759
3760     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3761         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3762       if ((splitmux->sink =
3763               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3764         goto fail;
3765       splitmux->active_sink = splitmux->sink;
3766     } else if (splitmux->async_finalize) {
3767       if ((splitmux->sink =
3768               create_element (splitmux, splitmux->sink_factory, "sink",
3769                   TRUE)) == NULL)
3770         goto fail;
3771       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3772         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3773             splitmux->sink_preset);
3774       if (splitmux->sink_properties)
3775         gst_structure_foreach (splitmux->sink_properties,
3776             _set_property_from_structure, splitmux->sink);
3777       splitmux->active_sink = splitmux->sink;
3778     } else {
3779       /* Ensure the sink starts in locked state and NULL - it will be changed
3780        * by the filename setting code */
3781       gst_element_set_locked_state (provided_sink, TRUE);
3782       gst_element_set_state (provided_sink, GST_STATE_NULL);
3783       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3784         g_warning ("Could not add sink elements - splitmuxsink will not work");
3785         gst_object_unref (provided_sink);
3786         goto fail;
3787       }
3788
3789       splitmux->active_sink = provided_sink;
3790
3791       /* The bin holds a ref now, we can drop our tmp ref */
3792       gst_object_unref (provided_sink);
3793
3794       /* Find the sink element */
3795       splitmux->sink = find_sink (splitmux->active_sink);
3796       if (splitmux->sink == NULL) {
3797         g_warning
3798             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3799         goto fail;
3800       }
3801     }
3802
3803 #if 1
3804     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3805             "async") != NULL) {
3806       /* async child elements are causing state change races and weird
3807        * failures, so let's try and turn that off */
3808       g_object_set (splitmux->sink, "async", FALSE, NULL);
3809     }
3810 #endif
3811
3812     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3813       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3814       goto fail;
3815     }
3816   }
3817
3818   return TRUE;
3819 fail:
3820   return FALSE;
3821 }
3822
3823 #ifdef __GNUC__
3824 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3825 #endif
3826 static void
3827 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3828 {
3829   gchar *fname = NULL;
3830   GstSample *sample;
3831   GstCaps *caps;
3832
3833   gst_splitmux_sink_ensure_max_files (splitmux);
3834
3835   if (ctx->cur_out_buffer == NULL) {
3836     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3837   }
3838
3839   caps = gst_pad_get_current_caps (ctx->srcpad);
3840   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3841   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3842       splitmux->fragment_id, sample, &fname);
3843   gst_sample_unref (sample);
3844   if (caps)
3845     gst_caps_unref (caps);
3846
3847   if (fname == NULL) {
3848     /* Fallback to the old signal if the new one returned nothing */
3849     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3850         splitmux->fragment_id, &fname);
3851   }
3852
3853   if (!fname)
3854     fname = splitmux->location ?
3855         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3856
3857   if (fname) {
3858     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3859     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3860             "location") != NULL)
3861       g_object_set (splitmux->sink, "location", fname, NULL);
3862     g_free (fname);
3863   }
3864
3865   splitmux->fragment_id++;
3866 }
3867
3868 /* called with GST_SPLITMUX_LOCK */
3869 static void
3870 do_async_start (GstSplitMuxSink * splitmux)
3871 {
3872   GstMessage *message;
3873
3874   if (!splitmux->need_async_start) {
3875     GST_INFO_OBJECT (splitmux, "no async_start needed");
3876     return;
3877   }
3878
3879   splitmux->async_pending = TRUE;
3880
3881   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3882   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3883
3884   GST_SPLITMUX_UNLOCK (splitmux);
3885   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3886       (splitmux), message);
3887   GST_SPLITMUX_LOCK (splitmux);
3888 }
3889
3890 /* called with GST_SPLITMUX_LOCK */
3891 static void
3892 do_async_done (GstSplitMuxSink * splitmux)
3893 {
3894   GstMessage *message;
3895
3896   if (splitmux->async_pending) {
3897     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3898     splitmux->async_pending = FALSE;
3899     GST_SPLITMUX_UNLOCK (splitmux);
3900
3901     message =
3902         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3903         GST_CLOCK_TIME_NONE);
3904     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3905         (splitmux), message);
3906     GST_SPLITMUX_LOCK (splitmux);
3907   }
3908
3909   splitmux->need_async_start = FALSE;
3910 }
3911
3912 static void
3913 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3914 {
3915   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3916   splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3917
3918   splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3919   splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3920   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3921
3922   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3923   g_queue_clear (&splitmux->pending_input_gops);
3924
3925   splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3926   splitmux->fragment_total_bytes = 0;
3927   splitmux->fragment_reference_bytes = 0;
3928   splitmux->muxed_out_bytes = 0;
3929   splitmux->ready_for_output = FALSE;
3930
3931   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3932   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3933
3934   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3935   gst_queue_array_clear (splitmux->times_to_split);
3936
3937   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3938   splitmux->queued_keyframes = 0;
3939
3940   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3941   g_queue_clear (&splitmux->out_cmd_q);
3942 }
3943
3944 static GstStateChangeReturn
3945 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3946 {
3947   GstStateChangeReturn ret;
3948   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3949
3950   switch (transition) {
3951     case GST_STATE_CHANGE_NULL_TO_READY:{
3952       GST_SPLITMUX_LOCK (splitmux);
3953       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3954         ret = GST_STATE_CHANGE_FAILURE;
3955         GST_SPLITMUX_UNLOCK (splitmux);
3956         goto beach;
3957       }
3958       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3959       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3960       GST_SPLITMUX_UNLOCK (splitmux);
3961       splitmux->fragment_id = splitmux->start_index;
3962       break;
3963     }
3964     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3965       GST_SPLITMUX_LOCK (splitmux);
3966       /* Make sure contexts and tracking times are cleared, in case we're being reused */
3967       gst_splitmux_sink_reset (splitmux);
3968       /* Start by collecting one input on each pad */
3969       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3970       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3971
3972       GST_SPLITMUX_UNLOCK (splitmux);
3973
3974       GST_SPLITMUX_STATE_LOCK (splitmux);
3975       splitmux->shutdown = FALSE;
3976       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3977       break;
3978     }
3979     case GST_STATE_CHANGE_PAUSED_TO_READY:
3980     case GST_STATE_CHANGE_READY_TO_READY:
3981       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3982       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3983       /* Fall through */
3984     case GST_STATE_CHANGE_READY_TO_NULL:
3985       GST_SPLITMUX_STATE_LOCK (splitmux);
3986       splitmux->shutdown = TRUE;
3987       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3988
3989       GST_SPLITMUX_LOCK (splitmux);
3990       gst_splitmux_sink_reset (splitmux);
3991       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3992       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3993       /* Wake up any blocked threads */
3994       GST_LOG_OBJECT (splitmux,
3995           "State change -> NULL or READY. Waking threads");
3996       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3997       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3998       GST_SPLITMUX_UNLOCK (splitmux);
3999       break;
4000     default:
4001       break;
4002   }
4003
4004   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4005   if (ret == GST_STATE_CHANGE_FAILURE)
4006     goto beach;
4007
4008   switch (transition) {
4009     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4010       splitmux->need_async_start = TRUE;
4011       break;
4012     case GST_STATE_CHANGE_READY_TO_PAUSED:{
4013       /* Change state async, because our child sink might not
4014        * be ready to do that for us yet if it's state is still locked */
4015
4016       splitmux->need_async_start = TRUE;
4017       /* we want to go async to PAUSED until we managed to configure and add the
4018        * sink */
4019       GST_SPLITMUX_LOCK (splitmux);
4020       do_async_start (splitmux);
4021       GST_SPLITMUX_UNLOCK (splitmux);
4022       ret = GST_STATE_CHANGE_ASYNC;
4023       break;
4024     }
4025     case GST_STATE_CHANGE_READY_TO_NULL:
4026       GST_SPLITMUX_LOCK (splitmux);
4027       splitmux->fragment_id = 0;
4028       /* Reset internal elements only if no pad contexts are using them */
4029       if (splitmux->contexts == NULL)
4030         gst_splitmux_reset_elements (splitmux);
4031       do_async_done (splitmux);
4032       GST_SPLITMUX_UNLOCK (splitmux);
4033       break;
4034     default:
4035       break;
4036   }
4037
4038   return ret;
4039
4040 beach:
4041   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4042     /* Cleanup elements on failed transition out of NULL */
4043     gst_splitmux_reset_elements (splitmux);
4044     GST_SPLITMUX_LOCK (splitmux);
4045     do_async_done (splitmux);
4046     GST_SPLITMUX_UNLOCK (splitmux);
4047   }
4048   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4049     /* READY to READY transition only happens when we're already
4050      * in READY state, but a child element is in NULL, which
4051      * happens when there's an error changing the state of the sink.
4052      * We need to make sure not to fail the state transition, or
4053      * the core won't transition us back to NULL successfully */
4054     ret = GST_STATE_CHANGE_SUCCESS;
4055   }
4056   return ret;
4057 }
4058
4059 static void
4060 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4061 {
4062   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4063     splitmux->fragment_id = 0;
4064   }
4065 }
4066
4067 static void
4068 split_now (GstSplitMuxSink * splitmux)
4069 {
4070   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4071 }
4072
4073 static void
4074 split_after (GstSplitMuxSink * splitmux)
4075 {
4076   g_atomic_int_set (&(splitmux->split_requested), TRUE);
4077 }
4078
4079 static void
4080 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4081 {
4082   gboolean send_keyframe_requests;
4083
4084   GST_SPLITMUX_LOCK (splitmux);
4085   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4086   send_keyframe_requests = splitmux->send_keyframe_requests;
4087   GST_SPLITMUX_UNLOCK (splitmux);
4088
4089   if (send_keyframe_requests) {
4090     GstEvent *ev =
4091         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4092     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4093         GST_TIME_ARGS (split_time));
4094     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4095       GST_WARNING_OBJECT (splitmux,
4096           "Could not request keyframe at %" GST_TIME_FORMAT,
4097           GST_TIME_ARGS (split_time));
4098     }
4099   }
4100 }