Revert "splitmuxsink: Avoid assertion when WAITING_GOP_COLLECT on reference context"
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202
203 static void
204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250     GstVideoTimeCode ** next_tc);
251
252 static MqStreamBuf *
253 mq_stream_buf_new (void)
254 {
255   return g_slice_new0 (MqStreamBuf);
256 }
257
258 static void
259 mq_stream_buf_free (MqStreamBuf * data)
260 {
261   g_slice_free (MqStreamBuf, data);
262 }
263
264 static SplitMuxOutputCommand *
265 out_cmd_buf_new (void)
266 {
267   return g_slice_new0 (SplitMuxOutputCommand);
268 }
269
270 static void
271 out_cmd_buf_free (SplitMuxOutputCommand * data)
272 {
273   g_slice_free (SplitMuxOutputCommand, data);
274 }
275
276 static void
277 input_gop_free (InputGop * gop)
278 {
279   g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280   g_slice_free (InputGop, gop);
281 }
282
283 static void
284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
285 {
286   GObjectClass *gobject_class = (GObjectClass *) klass;
287   GstElementClass *gstelement_class = (GstElementClass *) klass;
288   GstBinClass *gstbin_class = (GstBinClass *) klass;
289
290   gobject_class->set_property = gst_splitmux_sink_set_property;
291   gobject_class->get_property = gst_splitmux_sink_get_property;
292   gobject_class->dispose = gst_splitmux_sink_dispose;
293   gobject_class->finalize = gst_splitmux_sink_finalize;
294
295   gst_element_class_set_static_metadata (gstelement_class,
296       "Split Muxing Bin", "Generic/Bin/Muxer",
297       "Convenience bin that muxes incoming streams into multiple time/size limited files",
298       "Jan Schmidt <jan@centricular.com>");
299
300   gst_element_class_add_static_pad_template (gstelement_class,
301       &video_sink_template);
302   gst_element_class_add_static_pad_template (gstelement_class,
303       &video_aux_sink_template);
304   gst_element_class_add_static_pad_template (gstelement_class,
305       &audio_sink_template);
306   gst_element_class_add_static_pad_template (gstelement_class,
307       &subtitle_sink_template);
308   gst_element_class_add_static_pad_template (gstelement_class,
309       &caption_sink_template);
310
311   gstelement_class->change_state =
312       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313   gstelement_class->request_new_pad =
314       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315   gstelement_class->release_pad =
316       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
317
318   gstbin_class->handle_message = bus_handler;
319
320   g_object_class_install_property (gobject_class, PROP_LOCATION,
321       g_param_spec_string ("location", "File Output Pattern",
322           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325       g_param_spec_double ("mux-overhead", "Muxing Overhead",
326           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327           DEFAULT_MUXER_OVERHEAD,
328           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
329
330   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333           DEFAULT_MAX_SIZE_TIME,
334           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335           G_PARAM_STATIC_STRINGS));
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339           DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344           "Maximum difference in timecode between first and last frame. "
345           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346           "Will only be effective if a timecode track is present.", NULL,
347           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350       g_param_spec_boolean ("send-keyframe-requests",
351           "Request keyframes at max-size-time",
352           "Request a keyframe every max-size-time ns to try splitting at that point. "
353           "Needs max-size-bytes to be 0 in order to be effective.",
354           DEFAULT_SEND_KEYFRAME_REQUESTS,
355           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358       g_param_spec_uint ("max-files", "Max files",
359           "Maximum number of files to keep on disk. Once the maximum is reached,"
360           "old files start to be deleted to make room for new ones.", 0,
361           G_MAXUINT, DEFAULT_MAX_FILES,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365           "Allow non-reference streams to be that many ns before the reference"
366           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368           G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_MUXER,
371       g_param_spec_object ("muxer", "Muxer",
372           "The muxer element to use (NULL = default mp4mux). "
373           "Valid only for async-finalize = FALSE",
374           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375   g_object_class_install_property (gobject_class, PROP_SINK,
376       g_param_spec_object ("sink", "Sink",
377           "The sink element (or element chain) to use (NULL = default filesink). "
378           "Valid only for async-finalize = FALSE",
379           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382       g_param_spec_boolean ("use-robust-muxing",
383           "Support robust-muxing mode of some muxers",
384           "Check if muxers support robust muxing via the reserved-max-duration and "
385           "reserved-duration-remaining properties and use them if so. "
386           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387           " create new fragments if the reserved header space is about to overflow. "
388           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389           "manually by the app to a non-zero value for robust muxing to have an effect.",
390           DEFAULT_USE_ROBUST_MUXING,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394       g_param_spec_boolean ("reset-muxer",
395           "Reset Muxer",
396           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400       g_param_spec_boolean ("async-finalize",
401           "Finalize fragments asynchronously",
402           "Finalize each fragment asynchronously and start a new one",
403           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405       g_param_spec_string ("muxer-factory", "Muxer factory",
406           "The muxer element factory to use (default = mp4mux). "
407           "Valid only for async-finalize = TRUE",
408           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409   /**
410    * GstSplitMuxSink:muxer-preset
411    *
412    * An optional #GstPreset name to use for the muxer. This only has an effect
413    * in `async-finalize=TRUE` mode.
414    *
415    * Since: 1.18
416    */
417   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418       g_param_spec_string ("muxer-preset", "Muxer preset",
419           "The muxer preset to use. "
420           "Valid only for async-finalize = TRUE",
421           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423       g_param_spec_boxed ("muxer-properties", "Muxer properties",
424           "The muxer element properties to use. "
425           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426           "Valid only for async-finalize = TRUE",
427           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429       g_param_spec_string ("sink-factory", "Sink factory",
430           "The sink element factory to use (default = filesink). "
431           "Valid only for async-finalize = TRUE",
432           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433   /**
434    * GstSplitMuxSink:sink-preset
435    *
436    * An optional #GstPreset name to use for the sink. This only has an effect
437    * in `async-finalize=TRUE` mode.
438    *
439    * Since: 1.18
440    */
441   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442       g_param_spec_string ("sink-preset", "Sink preset",
443           "The sink preset to use. "
444           "Valid only for async-finalize = TRUE",
445           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447       g_param_spec_boxed ("sink-properties", "Sink properties",
448           "The sink element properties to use. "
449           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450           "Valid only for async-finalize = TRUE",
451           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452   g_object_class_install_property (gobject_class, PROP_START_INDEX,
453       g_param_spec_int ("start-index", "Start Index",
454           "Start value of fragment index.",
455           0, G_MAXINT, DEFAULT_START_INDEX,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457
458   /**
459    * GstSplitMuxSink::muxer-pad-map
460    *
461    * An optional GstStructure that provides a map from splitmuxsink sinkpad
462    * names to muxer pad names they should feed. Splitmuxsink has some default
463    * mapping behaviour to link video to video pads and audio to audio pads
464    * that usually works fine. This property is useful if you need to ensure
465    * a particular mapping to muxed streams.
466    *
467    * The GstStructure contains string fields like so:
468    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
469    *
470    * Since: 1.18
471    */
472   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
475           GST_TYPE_STRUCTURE,
476           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
477
478   /**
479    * GstSplitMuxSink::format-location:
480    * @splitmux: the #GstSplitMuxSink
481    * @fragment_id: the sequence number of the file to be created
482    *
483    * Returns: the location to be used for the next output file. This must be
484    *    a newly-allocated string which will be freed with g_free() by the
485    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
486    *    g_strdup_printf() or similar functions to allocate it.
487    */
488   signals[SIGNAL_FORMAT_LOCATION] =
489       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
491
492   /**
493    * GstSplitMuxSink::format-location-full:
494    * @splitmux: the #GstSplitMuxSink
495    * @fragment_id: the sequence number of the file to be created
496    * @first_sample: A #GstSample containing the first buffer
497    *   from the reference stream in the new file
498    *
499    * Returns: the location to be used for the next output file. This must be
500    *    a newly-allocated string which will be freed with g_free() by the
501    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
502    *    g_strdup_printf() or similar functions to allocate it.
503    *
504    * Since: 1.12
505    */
506   signals[SIGNAL_FORMAT_LOCATION_FULL] =
507       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
509       GST_TYPE_SAMPLE);
510
511   /**
512    * GstSplitMuxSink::split-now:
513    * @splitmux: the #GstSplitMuxSink
514    *
515    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516    * The current GOP will be output to the new file.
517    *
518    * Since: 1.14
519    */
520   signals[SIGNAL_SPLIT_NOW] =
521       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
524       G_TYPE_NONE, 0);
525
526   /**
527    * GstSplitMuxSink::split-after:
528    * @splitmux: the #GstSplitMuxSink
529    *
530    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
532    *
533    * Since: 1.16
534    */
535   signals[SIGNAL_SPLIT_AFTER] =
536       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
539       G_TYPE_NONE, 0);
540
541   /**
542    * GstSplitMuxSink::split-at-running-time:
543    * @splitmux: the #GstSplitMuxSink
544    *
545    * When called by the user, this action signal splits the video file (and
546    * begins a new one) as soon as the given running time is reached. If this
547    * action signal is called multiple times, running times are queued up and
548    * processed in the order they were given.
549    *
550    * Note that this is prone to race conditions, where said running time is
551    * reached and surpassed before we had a chance to split. The file will
552    * still split immediately, but in order to make sure that the split doesn't
553    * happen too late, it is recommended to call this action signal from
554    * something that will prevent further buffers from flowing into
555    * splitmuxsink before the split is completed, such as a pad probe before
556    * splitmuxsink.
557    *
558    *
559    * Since: 1.16
560    */
561   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
566
567   /**
568    * GstSplitMuxSink::muxer-added:
569    * @splitmux: the #GstSplitMuxSink
570    * @muxer: the newly added muxer element
571    *
572    * Since: 1.14
573    */
574   signals[SIGNAL_MUXER_ADDED] =
575       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
577
578   /**
579    * GstSplitMuxSink::sink-added:
580    * @splitmux: the #GstSplitMuxSink
581    * @sink: the newly added sink element
582    *
583    * Since: 1.14
584    */
585   signals[SIGNAL_SINK_ADDED] =
586       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
588
589   klass->split_now = split_now;
590   klass->split_after = split_after;
591   klass->split_at_running_time = split_at_running_time;
592 }
593
594 static void
595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
596 {
597   g_mutex_init (&splitmux->lock);
598   g_mutex_init (&splitmux->state_lock);
599   g_cond_init (&splitmux->input_cond);
600   g_cond_init (&splitmux->output_cond);
601   g_queue_init (&splitmux->out_cmd_q);
602
603   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606   splitmux->max_files = DEFAULT_MAX_FILES;
607   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
611
612   splitmux->threshold_timecode_str = NULL;
613
614   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616   splitmux->muxer_properties = NULL;
617   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618   splitmux->sink_properties = NULL;
619
620   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621   splitmux->split_requested = FALSE;
622   splitmux->do_split_next_gop = FALSE;
623   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
625
626   g_queue_init (&splitmux->pending_input_gops);
627 }
628
629 static void
630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
631 {
632   if (splitmux->muxer) {
633     gst_element_set_locked_state (splitmux->muxer, TRUE);
634     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
636   }
637   if (splitmux->active_sink) {
638     gst_element_set_locked_state (splitmux->active_sink, TRUE);
639     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
641   }
642
643   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
644 }
645
646 static void
647 gst_splitmux_sink_dispose (GObject * object)
648 {
649   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
650
651   /* Calling parent dispose invalidates all child pointers */
652   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
653
654   G_OBJECT_CLASS (parent_class)->dispose (object);
655 }
656
657 static void
658 gst_splitmux_sink_finalize (GObject * object)
659 {
660   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
661
662   g_cond_clear (&splitmux->input_cond);
663   g_cond_clear (&splitmux->output_cond);
664   g_mutex_clear (&splitmux->lock);
665   g_mutex_clear (&splitmux->state_lock);
666   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667   g_queue_clear (&splitmux->out_cmd_q);
668   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669   g_queue_clear (&splitmux->pending_input_gops);
670
671   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
672
673   if (splitmux->muxerpad_map)
674     gst_structure_free (splitmux->muxerpad_map);
675
676   if (splitmux->provided_sink)
677     gst_object_unref (splitmux->provided_sink);
678   if (splitmux->provided_muxer)
679     gst_object_unref (splitmux->provided_muxer);
680
681   if (splitmux->muxer_factory)
682     g_free (splitmux->muxer_factory);
683   if (splitmux->muxer_preset)
684     g_free (splitmux->muxer_preset);
685   if (splitmux->muxer_properties)
686     gst_structure_free (splitmux->muxer_properties);
687   if (splitmux->sink_factory)
688     g_free (splitmux->sink_factory);
689   if (splitmux->sink_preset)
690     g_free (splitmux->sink_preset);
691   if (splitmux->sink_properties)
692     gst_structure_free (splitmux->sink_properties);
693
694   if (splitmux->threshold_timecode_str)
695     g_free (splitmux->threshold_timecode_str);
696   if (splitmux->tc_interval)
697     gst_video_time_code_interval_free (splitmux->tc_interval);
698
699   if (splitmux->times_to_split)
700     gst_queue_array_free (splitmux->times_to_split);
701
702   g_free (splitmux->location);
703
704   /* Make sure to free any un-released contexts. There should not be any,
705    * because the dispose will have freed all request pads though */
706   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707   g_list_free (splitmux->contexts);
708
709   G_OBJECT_CLASS (parent_class)->finalize (object);
710 }
711
712 /*
713  * Set any time threshold to the muxer, if it has
714  * reserved-max-duration and reserved-duration-remaining
715  * properties. Called when creating/claiming the muxer
716  * in create_elements() */
717 static void
718 update_muxer_properties (GstSplitMuxSink * sink)
719 {
720   GObjectClass *klass;
721   GstClockTime threshold_time;
722
723   sink->muxer_has_reserved_props = FALSE;
724   if (sink->muxer == NULL)
725     return;
726   klass = G_OBJECT_GET_CLASS (sink->muxer);
727   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
728     return;
729   if (g_object_class_find_property (klass,
730           "reserved-duration-remaining") == NULL)
731     return;
732   sink->muxer_has_reserved_props = TRUE;
733
734   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735       GST_TIME_ARGS (sink->threshold_time));
736   GST_OBJECT_LOCK (sink);
737   threshold_time = sink->threshold_time;
738   GST_OBJECT_UNLOCK (sink);
739
740   if (threshold_time > 0) {
741     /* Tell the muxer how much space to reserve */
742     GstClockTime muxer_threshold = threshold_time;
743     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
744   }
745 }
746
747 static void
748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749     const GValue * value, GParamSpec * pspec)
750 {
751   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
752
753   switch (prop_id) {
754     case PROP_LOCATION:{
755       GST_OBJECT_LOCK (splitmux);
756       g_free (splitmux->location);
757       splitmux->location = g_value_dup_string (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     }
761     case PROP_START_INDEX:
762       GST_OBJECT_LOCK (splitmux);
763       splitmux->start_index = g_value_get_int (value);
764       GST_OBJECT_UNLOCK (splitmux);
765       break;
766     case PROP_MAX_SIZE_BYTES:
767       GST_OBJECT_LOCK (splitmux);
768       splitmux->threshold_bytes = g_value_get_uint64 (value);
769       GST_OBJECT_UNLOCK (splitmux);
770       break;
771     case PROP_MAX_SIZE_TIME:
772       GST_OBJECT_LOCK (splitmux);
773       splitmux->threshold_time = g_value_get_uint64 (value);
774       GST_OBJECT_UNLOCK (splitmux);
775       break;
776     case PROP_MAX_SIZE_TIMECODE:
777       GST_OBJECT_LOCK (splitmux);
778       g_free (splitmux->threshold_timecode_str);
779       /* will be calculated later */
780       g_clear_pointer (&splitmux->tc_interval,
781           gst_video_time_code_interval_free);
782
783       splitmux->threshold_timecode_str = g_value_dup_string (value);
784       if (splitmux->threshold_timecode_str) {
785         splitmux->tc_interval =
786             gst_video_time_code_interval_new_from_string
787             (splitmux->threshold_timecode_str);
788         if (!splitmux->tc_interval) {
789           g_warning ("Wrong timecode string %s",
790               splitmux->threshold_timecode_str);
791           g_free (splitmux->threshold_timecode_str);
792           splitmux->threshold_timecode_str = NULL;
793         }
794       }
795       splitmux->next_fragment_start_tc_time =
796           calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
797           splitmux->fragment_start_time, NULL);
798       if (splitmux->tc_interval && splitmux->fragment_start_tc
799           && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
800         GST_WARNING_OBJECT (splitmux,
801             "Couldn't calculate next fragment start time for timecode mode");
802       }
803       GST_OBJECT_UNLOCK (splitmux);
804       break;
805     case PROP_SEND_KEYFRAME_REQUESTS:
806       GST_OBJECT_LOCK (splitmux);
807       splitmux->send_keyframe_requests = g_value_get_boolean (value);
808       GST_OBJECT_UNLOCK (splitmux);
809       break;
810     case PROP_MAX_FILES:
811       GST_OBJECT_LOCK (splitmux);
812       splitmux->max_files = g_value_get_uint (value);
813       GST_OBJECT_UNLOCK (splitmux);
814       break;
815     case PROP_MUXER_OVERHEAD:
816       GST_OBJECT_LOCK (splitmux);
817       splitmux->mux_overhead = g_value_get_double (value);
818       GST_OBJECT_UNLOCK (splitmux);
819       break;
820     case PROP_USE_ROBUST_MUXING:
821       GST_OBJECT_LOCK (splitmux);
822       splitmux->use_robust_muxing = g_value_get_boolean (value);
823       GST_OBJECT_UNLOCK (splitmux);
824       if (splitmux->use_robust_muxing)
825         update_muxer_properties (splitmux);
826       break;
827     case PROP_ALIGNMENT_THRESHOLD:
828       GST_OBJECT_LOCK (splitmux);
829       splitmux->alignment_threshold = g_value_get_uint64 (value);
830       GST_OBJECT_UNLOCK (splitmux);
831       break;
832     case PROP_SINK:
833       GST_OBJECT_LOCK (splitmux);
834       gst_clear_object (&splitmux->provided_sink);
835       splitmux->provided_sink = g_value_get_object (value);
836       if (splitmux->provided_sink)
837         gst_object_ref_sink (splitmux->provided_sink);
838       GST_OBJECT_UNLOCK (splitmux);
839       break;
840     case PROP_MUXER:
841       GST_OBJECT_LOCK (splitmux);
842       gst_clear_object (&splitmux->provided_muxer);
843       splitmux->provided_muxer = g_value_get_object (value);
844       if (splitmux->provided_muxer)
845         gst_object_ref_sink (splitmux->provided_muxer);
846       GST_OBJECT_UNLOCK (splitmux);
847       break;
848     case PROP_RESET_MUXER:
849       GST_OBJECT_LOCK (splitmux);
850       splitmux->reset_muxer = g_value_get_boolean (value);
851       GST_OBJECT_UNLOCK (splitmux);
852       break;
853     case PROP_ASYNC_FINALIZE:
854       GST_OBJECT_LOCK (splitmux);
855       splitmux->async_finalize = g_value_get_boolean (value);
856       GST_OBJECT_UNLOCK (splitmux);
857       break;
858     case PROP_MUXER_FACTORY:
859       GST_OBJECT_LOCK (splitmux);
860       if (splitmux->muxer_factory)
861         g_free (splitmux->muxer_factory);
862       splitmux->muxer_factory = g_value_dup_string (value);
863       GST_OBJECT_UNLOCK (splitmux);
864       break;
865     case PROP_MUXER_PRESET:
866       GST_OBJECT_LOCK (splitmux);
867       if (splitmux->muxer_preset)
868         g_free (splitmux->muxer_preset);
869       splitmux->muxer_preset = g_value_dup_string (value);
870       GST_OBJECT_UNLOCK (splitmux);
871       break;
872     case PROP_MUXER_PROPERTIES:
873       GST_OBJECT_LOCK (splitmux);
874       if (splitmux->muxer_properties)
875         gst_structure_free (splitmux->muxer_properties);
876       if (gst_value_get_structure (value))
877         splitmux->muxer_properties =
878             gst_structure_copy (gst_value_get_structure (value));
879       else
880         splitmux->muxer_properties = NULL;
881       GST_OBJECT_UNLOCK (splitmux);
882       break;
883     case PROP_SINK_FACTORY:
884       GST_OBJECT_LOCK (splitmux);
885       if (splitmux->sink_factory)
886         g_free (splitmux->sink_factory);
887       splitmux->sink_factory = g_value_dup_string (value);
888       GST_OBJECT_UNLOCK (splitmux);
889       break;
890     case PROP_SINK_PRESET:
891       GST_OBJECT_LOCK (splitmux);
892       if (splitmux->sink_preset)
893         g_free (splitmux->sink_preset);
894       splitmux->sink_preset = g_value_dup_string (value);
895       GST_OBJECT_UNLOCK (splitmux);
896       break;
897     case PROP_SINK_PROPERTIES:
898       GST_OBJECT_LOCK (splitmux);
899       if (splitmux->sink_properties)
900         gst_structure_free (splitmux->sink_properties);
901       if (gst_value_get_structure (value))
902         splitmux->sink_properties =
903             gst_structure_copy (gst_value_get_structure (value));
904       else
905         splitmux->sink_properties = NULL;
906       GST_OBJECT_UNLOCK (splitmux);
907       break;
908     case PROP_MUXERPAD_MAP:
909     {
910       const GstStructure *s = gst_value_get_structure (value);
911       GST_SPLITMUX_LOCK (splitmux);
912       if (splitmux->muxerpad_map) {
913         gst_structure_free (splitmux->muxerpad_map);
914       }
915       if (s)
916         splitmux->muxerpad_map = gst_structure_copy (s);
917       else
918         splitmux->muxerpad_map = NULL;
919       GST_SPLITMUX_UNLOCK (splitmux);
920       break;
921     }
922     default:
923       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
924       break;
925   }
926 }
927
928 static void
929 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
930     GValue * value, GParamSpec * pspec)
931 {
932   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
933
934   switch (prop_id) {
935     case PROP_LOCATION:
936       GST_OBJECT_LOCK (splitmux);
937       g_value_set_string (value, splitmux->location);
938       GST_OBJECT_UNLOCK (splitmux);
939       break;
940     case PROP_START_INDEX:
941       GST_OBJECT_LOCK (splitmux);
942       g_value_set_int (value, splitmux->start_index);
943       GST_OBJECT_UNLOCK (splitmux);
944       break;
945     case PROP_MAX_SIZE_BYTES:
946       GST_OBJECT_LOCK (splitmux);
947       g_value_set_uint64 (value, splitmux->threshold_bytes);
948       GST_OBJECT_UNLOCK (splitmux);
949       break;
950     case PROP_MAX_SIZE_TIME:
951       GST_OBJECT_LOCK (splitmux);
952       g_value_set_uint64 (value, splitmux->threshold_time);
953       GST_OBJECT_UNLOCK (splitmux);
954       break;
955     case PROP_MAX_SIZE_TIMECODE:
956       GST_OBJECT_LOCK (splitmux);
957       g_value_set_string (value, splitmux->threshold_timecode_str);
958       GST_OBJECT_UNLOCK (splitmux);
959       break;
960     case PROP_SEND_KEYFRAME_REQUESTS:
961       GST_OBJECT_LOCK (splitmux);
962       g_value_set_boolean (value, splitmux->send_keyframe_requests);
963       GST_OBJECT_UNLOCK (splitmux);
964       break;
965     case PROP_MAX_FILES:
966       GST_OBJECT_LOCK (splitmux);
967       g_value_set_uint (value, splitmux->max_files);
968       GST_OBJECT_UNLOCK (splitmux);
969       break;
970     case PROP_MUXER_OVERHEAD:
971       GST_OBJECT_LOCK (splitmux);
972       g_value_set_double (value, splitmux->mux_overhead);
973       GST_OBJECT_UNLOCK (splitmux);
974       break;
975     case PROP_USE_ROBUST_MUXING:
976       GST_OBJECT_LOCK (splitmux);
977       g_value_set_boolean (value, splitmux->use_robust_muxing);
978       GST_OBJECT_UNLOCK (splitmux);
979       break;
980     case PROP_ALIGNMENT_THRESHOLD:
981       GST_OBJECT_LOCK (splitmux);
982       g_value_set_uint64 (value, splitmux->alignment_threshold);
983       GST_OBJECT_UNLOCK (splitmux);
984       break;
985     case PROP_SINK:
986       GST_OBJECT_LOCK (splitmux);
987       g_value_set_object (value, splitmux->provided_sink);
988       GST_OBJECT_UNLOCK (splitmux);
989       break;
990     case PROP_MUXER:
991       GST_OBJECT_LOCK (splitmux);
992       g_value_set_object (value, splitmux->provided_muxer);
993       GST_OBJECT_UNLOCK (splitmux);
994       break;
995     case PROP_RESET_MUXER:
996       GST_OBJECT_LOCK (splitmux);
997       g_value_set_boolean (value, splitmux->reset_muxer);
998       GST_OBJECT_UNLOCK (splitmux);
999       break;
1000     case PROP_ASYNC_FINALIZE:
1001       GST_OBJECT_LOCK (splitmux);
1002       g_value_set_boolean (value, splitmux->async_finalize);
1003       GST_OBJECT_UNLOCK (splitmux);
1004       break;
1005     case PROP_MUXER_FACTORY:
1006       GST_OBJECT_LOCK (splitmux);
1007       g_value_set_string (value, splitmux->muxer_factory);
1008       GST_OBJECT_UNLOCK (splitmux);
1009       break;
1010     case PROP_MUXER_PRESET:
1011       GST_OBJECT_LOCK (splitmux);
1012       g_value_set_string (value, splitmux->muxer_preset);
1013       GST_OBJECT_UNLOCK (splitmux);
1014       break;
1015     case PROP_MUXER_PROPERTIES:
1016       GST_OBJECT_LOCK (splitmux);
1017       gst_value_set_structure (value, splitmux->muxer_properties);
1018       GST_OBJECT_UNLOCK (splitmux);
1019       break;
1020     case PROP_SINK_FACTORY:
1021       GST_OBJECT_LOCK (splitmux);
1022       g_value_set_string (value, splitmux->sink_factory);
1023       GST_OBJECT_UNLOCK (splitmux);
1024       break;
1025     case PROP_SINK_PRESET:
1026       GST_OBJECT_LOCK (splitmux);
1027       g_value_set_string (value, splitmux->sink_preset);
1028       GST_OBJECT_UNLOCK (splitmux);
1029       break;
1030     case PROP_SINK_PROPERTIES:
1031       GST_OBJECT_LOCK (splitmux);
1032       gst_value_set_structure (value, splitmux->sink_properties);
1033       GST_OBJECT_UNLOCK (splitmux);
1034       break;
1035     case PROP_MUXERPAD_MAP:
1036       GST_SPLITMUX_LOCK (splitmux);
1037       gst_value_set_structure (value, splitmux->muxerpad_map);
1038       GST_SPLITMUX_UNLOCK (splitmux);
1039       break;
1040     default:
1041       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1042       break;
1043   }
1044 }
1045
1046 /* Convenience function */
1047 static inline GstClockTimeDiff
1048 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1049 {
1050   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1051
1052   if (GST_CLOCK_TIME_IS_VALID (val)) {
1053     gboolean sign =
1054         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1055     if (sign > 0)
1056       res = val;
1057     else if (sign < 0)
1058       res = -val;
1059   }
1060   return res;
1061 }
1062
1063 static void
1064 mq_stream_ctx_reset (MqStreamCtx * ctx)
1065 {
1066   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1067   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1068   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1069   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1070   g_queue_clear (&ctx->queued_bufs);
1071 }
1072
1073 static MqStreamCtx *
1074 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1075 {
1076   MqStreamCtx *ctx;
1077
1078   ctx = g_new0 (MqStreamCtx, 1);
1079   ctx->splitmux = splitmux;
1080   g_queue_init (&ctx->queued_bufs);
1081   mq_stream_ctx_reset (ctx);
1082
1083   return ctx;
1084 }
1085
1086 static void
1087 mq_stream_ctx_free (MqStreamCtx * ctx)
1088 {
1089   if (ctx->q) {
1090     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1091
1092     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1093
1094     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1095       gst_element_set_locked_state (ctx->q, TRUE);
1096       gst_element_set_state (ctx->q, GST_STATE_NULL);
1097       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1098       gst_object_unref (parent);
1099     }
1100     gst_object_unref (ctx->q);
1101   }
1102   gst_object_unref (ctx->sinkpad);
1103   gst_object_unref (ctx->srcpad);
1104   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1105   g_queue_clear (&ctx->queued_bufs);
1106   g_free (ctx);
1107 }
1108
1109 static void
1110 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1111     GstElement * sink)
1112 {
1113   gchar *location = NULL;
1114   GstMessage *msg;
1115   const gchar *msg_name = opened ?
1116       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1117   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1118
1119   if (!opened) {
1120     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1121     if (rtime)
1122       running_time = *rtime;
1123   }
1124
1125   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1126           "location") != NULL)
1127     g_object_get (sink, "location", &location, NULL);
1128
1129   GST_DEBUG_OBJECT (splitmux,
1130       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1131       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1132
1133   /* If it's in the middle of a teardown, the reference_ctc might have become
1134    * NULL */
1135   if (splitmux->reference_ctx) {
1136     msg = gst_message_new_element (GST_OBJECT (splitmux),
1137         gst_structure_new (msg_name,
1138             "location", G_TYPE_STRING, location,
1139             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1140             "sink", GST_TYPE_ELEMENT, sink, NULL));
1141     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1142   }
1143
1144   g_free (location);
1145 }
1146
1147 static void
1148 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1149 {
1150   GstEvent *eos;
1151   GstPad *pad;
1152   MqStreamCtx *ctx;
1153
1154   eos = gst_event_new_eos ();
1155   pad = helper->pad;
1156   ctx = helper->ctx;
1157
1158   GST_SPLITMUX_LOCK (splitmux);
1159   if (!pad)
1160     pad = gst_pad_get_peer (ctx->srcpad);
1161   GST_SPLITMUX_UNLOCK (splitmux);
1162
1163   gst_pad_send_event (pad, eos);
1164   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1165
1166   gst_object_unref (pad);
1167   g_free (helper);
1168 }
1169
1170 /* Called with lock held, drops the lock to send EOS to the
1171  * pad
1172  */
1173 static void
1174 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1175 {
1176   GstEvent *eos;
1177   GstPad *pad;
1178
1179   eos = gst_event_new_eos ();
1180   pad = gst_pad_get_peer (ctx->srcpad);
1181
1182   ctx->out_eos = TRUE;
1183
1184   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1185   GST_SPLITMUX_UNLOCK (splitmux);
1186   gst_pad_send_event (pad, eos);
1187   GST_SPLITMUX_LOCK (splitmux);
1188
1189   gst_object_unref (pad);
1190 }
1191
1192 /* Called with lock held. Schedules an EOS event to the ctx pad
1193  * to happen in another thread */
1194 static void
1195 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1196 {
1197   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1198   GstPad *srcpad, *sinkpad;
1199
1200   srcpad = ctx->srcpad;
1201   sinkpad = gst_pad_get_peer (srcpad);
1202
1203   helper->ctx = ctx;
1204   helper->pad = sinkpad;        /* Takes the reference */
1205
1206   ctx->out_eos_async_done = TRUE;
1207
1208   /* There used to be a bug here, where we had to explicitly remove
1209    * the SINK flag so that GstBin would ignore it for EOS purposes.
1210    * That fixed a race where if splitmuxsink really reaches EOS
1211    * before an asynchronous background element has finished, then
1212    * the bin wouldn't actually send EOS to the pipeline. Even after
1213    * finishing and removing the old element, the bin didn't re-check
1214    * EOS status on removing a SINK element. That bug was fixed
1215    * in core. */
1216   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1217       sinkpad, ctx);
1218
1219   g_assert_nonnull (helper->pad);
1220   gst_element_call_async (GST_ELEMENT (splitmux),
1221       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1222 }
1223
1224 /* Called with lock held. TRUE iff all contexts have a
1225  * pending (or delivered) async eos event */
1226 static gboolean
1227 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1228 {
1229   gboolean ret = TRUE;
1230   GList *item;
1231
1232   for (item = splitmux->contexts; item; item = item->next) {
1233     MqStreamCtx *ctx = item->data;
1234     ret &= ctx->out_eos_async_done;
1235   }
1236   return ret;
1237 }
1238
1239 /* Called with splitmux lock held to check if this output
1240  * context needs to sleep to wait for the release of the
1241  * next GOP, or to send EOS to close out the current file
1242  */
1243 static GstFlowReturn
1244 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1245 {
1246   if (ctx->caps_change)
1247     return GST_FLOW_OK;
1248
1249   do {
1250     /* When first starting up, the reference stream has to output
1251      * the first buffer to prepare the muxer and sink */
1252     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1253     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1254
1255     if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1256         && my_max_out_running_time != G_MAXINT64) {
1257       my_max_out_running_time -= splitmux->alignment_threshold;
1258       GST_LOG_OBJECT (ctx->srcpad,
1259           "Max out running time currently %" GST_STIME_FORMAT
1260           ", with threshold applied it is %" GST_STIME_FORMAT,
1261           GST_STIME_ARGS (splitmux->max_out_running_time),
1262           GST_STIME_ARGS (my_max_out_running_time));
1263     }
1264
1265     if (ctx->flushing
1266         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1267       return GST_FLOW_FLUSHING;
1268
1269     GST_LOG_OBJECT (ctx->srcpad,
1270         "Checking running time %" GST_STIME_FORMAT " against max %"
1271         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1272         GST_STIME_ARGS (my_max_out_running_time));
1273
1274     if (can_output) {
1275       if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1276           ctx->out_running_time < my_max_out_running_time) {
1277         return GST_FLOW_OK;
1278       }
1279
1280       switch (splitmux->output_state) {
1281         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1282           /* We only get here if we've finished outputting a GOP and need to know
1283            * what to do next */
1284           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1285           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1286           continue;
1287
1288         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1289         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1290           /* We've reached the max out running_time to get here, so end this file now */
1291           if (ctx->out_eos == FALSE) {
1292             if (splitmux->async_finalize) {
1293               /* We must set EOS asynchronously at this point. We cannot defer
1294                * it, because we need all contexts to wake up, for the
1295                * reference context to eventually give us something at
1296                * START_NEXT_FILE. Otherwise, collectpads might choose another
1297                * context to give us the first buffer, and format-location-full
1298                * will not contain a valid sample. */
1299               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1300                   GINT_TO_POINTER (1));
1301               eos_context_async (ctx, splitmux);
1302               if (all_contexts_are_async_eos (splitmux)) {
1303                 GST_INFO_OBJECT (splitmux,
1304                     "All contexts are async_eos. Moving to the next file.");
1305                 /* We can start the next file once we've asked each pad to go EOS */
1306                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1307                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1308                 continue;
1309               }
1310             } else {
1311               send_eos (splitmux, ctx);
1312               continue;
1313             }
1314           } else {
1315             GST_INFO_OBJECT (splitmux,
1316                 "At end-of-file state, but context %p is already EOS", ctx);
1317           }
1318           break;
1319         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1320           if (ctx->is_reference) {
1321             GstFlowReturn ret = GST_FLOW_OK;
1322
1323             /* Special handling on the reference ctx to start new fragments
1324              * and collect commands from the command queue */
1325             /* drops the splitmux lock briefly: */
1326             /* We must have reference ctx in order for format-location-full to
1327              * have a sample */
1328             ret = start_next_fragment (splitmux, ctx);
1329             if (ret != GST_FLOW_OK)
1330               return ret;
1331
1332             continue;
1333           }
1334           break;
1335         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1336           do {
1337             SplitMuxOutputCommand *cmd =
1338                 g_queue_pop_tail (&splitmux->out_cmd_q);
1339             if (cmd != NULL) {
1340               /* If we pop the last command, we need to make our queues bigger */
1341               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1342                 grow_blocked_queues (splitmux);
1343
1344               if (cmd->start_new_fragment) {
1345                 if (splitmux->muxed_out_bytes > 0) {
1346                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1347                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1348                 } else {
1349                   GST_DEBUG_OBJECT (splitmux,
1350                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1351                 }
1352               } else {
1353                 GST_DEBUG_OBJECT (splitmux,
1354                     "Got new output cmd for time %" GST_STIME_FORMAT,
1355                     GST_STIME_ARGS (cmd->max_output_ts));
1356
1357                 /* Extend the output range immediately */
1358                 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1359                     || cmd->max_output_ts > splitmux->max_out_running_time)
1360                   splitmux->max_out_running_time = cmd->max_output_ts;
1361                 GST_DEBUG_OBJECT (splitmux,
1362                     "Max out running time now %" GST_STIME_FORMAT,
1363                     GST_STIME_ARGS (splitmux->max_out_running_time));
1364                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1365               }
1366               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1367
1368               out_cmd_buf_free (cmd);
1369               break;
1370             } else {
1371               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1372             }
1373           } while (!ctx->flushing && splitmux->output_state ==
1374               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1375           /* loop and re-check the state */
1376           continue;
1377         }
1378         case SPLITMUX_OUTPUT_STATE_STOPPED:
1379           return GST_FLOW_FLUSHING;
1380       }
1381     } else {
1382       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1383     }
1384
1385     GST_INFO_OBJECT (ctx->srcpad,
1386         "Sleeping for running time %"
1387         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1388         GST_STIME_ARGS (ctx->out_running_time),
1389         GST_STIME_ARGS (splitmux->max_out_running_time));
1390     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1391     GST_INFO_OBJECT (ctx->srcpad,
1392         "Woken for new max running time %" GST_STIME_FORMAT,
1393         GST_STIME_ARGS (splitmux->max_out_running_time));
1394   }
1395   while (1);
1396
1397   return GST_FLOW_OK;
1398 }
1399
1400 static GstClockTime
1401 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1402     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1403     GstVideoTimeCode ** next_tc)
1404 {
1405   GstVideoTimeCode *target_tc;
1406   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1407
1408   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1409     return GST_CLOCK_TIME_NONE;
1410
1411   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1412   if (!target_tc) {
1413     GST_ELEMENT_ERROR (splitmux,
1414         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1415     return GST_CLOCK_TIME_NONE;
1416   }
1417
1418   /* Convert to ns */
1419   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1420   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1421
1422   /* Add running_time, accounting for wraparound. */
1423   if (target_tc_time >= cur_tc_time) {
1424     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1425   } else {
1426     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1427
1428     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1429         (cur_tc->config.fps_d == 1001)) {
1430       /* Checking fps_d is probably unneeded, but better safe than sorry
1431        * (e.g. someone accidentally set a flag) */
1432       GstVideoTimeCode *tc_for_offset;
1433
1434       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1435        * but slightly less. Calculate that duration from a fake timecode. The
1436        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1437        * is to add one frame to 23:59:59;29 */
1438       tc_for_offset =
1439           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1440           NULL, cur_tc->config.flags, 23, 59, 59,
1441           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1442       day_in_ns =
1443           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1444           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1445           cur_tc->config.fps_n);
1446       gst_video_time_code_free (tc_for_offset);
1447     }
1448     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1449   }
1450
1451 #ifndef GST_DISABLE_GST_DEBUG
1452   {
1453     gchar *next_max_tc_str, *cur_tc_str;
1454
1455     cur_tc_str = gst_video_time_code_to_string (cur_tc);
1456     next_max_tc_str = gst_video_time_code_to_string (target_tc);
1457
1458     GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1459         " from ref timecode %s time: %" GST_TIME_FORMAT,
1460         next_max_tc_str,
1461         GST_TIME_ARGS (next_max_tc_time),
1462         cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1463
1464     g_free (next_max_tc_str);
1465     g_free (cur_tc_str);
1466   }
1467 #endif
1468
1469   if (next_tc)
1470     *next_tc = target_tc;
1471   else
1472     gst_video_time_code_free (target_tc);
1473
1474   return next_max_tc_time;
1475 }
1476
1477 static gboolean
1478 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1479     GstClockTimeDiff running_time_dts)
1480 {
1481   GstEvent *ev;
1482   GstClockTime target_time;
1483   gboolean timecode_based = FALSE;
1484   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1485   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1486   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1487   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1488   InputGop *newest_gop = NULL;
1489   GList *l;
1490
1491   if (!splitmux->send_keyframe_requests)
1492     return TRUE;
1493
1494   /* Find the newest GOP where we passed in DTS the start PTS */
1495   for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1496     InputGop *tmp = l->data;
1497
1498     GST_TRACE_OBJECT (splitmux,
1499         "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1500         " and start time %" GST_STIME_FORMAT,
1501         GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1502
1503     if (tmp->sent_fku) {
1504       GST_DEBUG_OBJECT (splitmux,
1505           "Already checked for a keyframe request for this GOP");
1506       return TRUE;
1507     }
1508
1509     if (running_time_dts == GST_CLOCK_STIME_NONE ||
1510         tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1511         running_time_dts >= tmp->start_time_pts) {
1512       GST_DEBUG_OBJECT (splitmux,
1513           "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1514           GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1515           GST_STIME_ARGS (tmp->start_time));
1516       newest_gop = tmp;
1517       break;
1518     }
1519   }
1520
1521   if (!newest_gop) {
1522     GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1523     return TRUE;
1524   }
1525
1526   if (splitmux->tc_interval) {
1527     if (newest_gop->start_tc
1528         && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1529       GstVideoTimeCode *next_tc = NULL;
1530       max_tc_time =
1531           calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1532           newest_gop->start_time, &next_tc);
1533
1534       /* calculate the next expected keyframe time to prevent too early fku
1535        * event */
1536       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1537         next_max_tc_time =
1538             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1539       }
1540       if (next_tc)
1541         gst_video_time_code_free (next_tc);
1542
1543       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1544           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1545
1546       if (!timecode_based) {
1547         GST_WARNING_OBJECT (splitmux,
1548             "Couldn't calculate maximum fragment time for timecode mode");
1549       }
1550     } else {
1551       /* This can happen in the presence of GAP events that trigger
1552        * a new fragment start */
1553       GST_WARNING_OBJECT (splitmux,
1554           "No buffer available to calculate next timecode");
1555     }
1556   }
1557
1558   if ((splitmux->threshold_time == 0 && !timecode_based)
1559       || splitmux->threshold_bytes != 0)
1560     return TRUE;
1561
1562   if (timecode_based) {
1563     /* We might have rounding errors: aim slightly earlier */
1564     if (max_tc_time >= tc_rounding_error) {
1565       target_time = max_tc_time - tc_rounding_error;
1566     } else {
1567       /* unreliable target time */
1568       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1569           " is smaller than allowed rounding error, set it to zero",
1570           GST_TIME_ARGS (max_tc_time));
1571       target_time = 0;
1572     }
1573
1574     if (next_max_tc_time >= tc_rounding_error) {
1575       next_fku_time = next_max_tc_time - tc_rounding_error;
1576     } else {
1577       /* unreliable target time */
1578       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1579           " is smaller than allowed rounding error, set it to zero",
1580           GST_TIME_ARGS (next_max_tc_time));
1581       next_fku_time = 0;
1582     }
1583   } else {
1584     target_time = newest_gop->start_time + splitmux->threshold_time;
1585   }
1586
1587   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1588     GstClockTime allowed_time = splitmux->next_fku_time;
1589
1590     if (timecode_based) {
1591       if (allowed_time >= tc_rounding_error) {
1592         allowed_time -= tc_rounding_error;
1593       } else {
1594         /* unreliable next force key unit time */
1595         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1596             GST_TIME_FORMAT
1597             " is smaller than allowed rounding error, set it to zero",
1598             GST_TIME_ARGS (splitmux->next_fku_time));
1599         allowed_time = 0;
1600       }
1601     }
1602
1603     if (target_time < allowed_time) {
1604       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1605           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1606           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1607           GST_TIME_ARGS (target_time),
1608           GST_TIME_ARGS (splitmux->next_fku_time),
1609           GST_TIME_ARGS (allowed_time));
1610
1611       return TRUE;
1612     } else if (allowed_time != splitmux->next_fku_time &&
1613         target_time < splitmux->next_fku_time) {
1614       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1615           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1616           ", but the difference is smaller than allowed rounding error",
1617           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1618     }
1619   }
1620
1621   if (!timecode_based) {
1622     next_fku_time = target_time + splitmux->threshold_time;
1623   }
1624
1625   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1626       ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1627       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1628
1629   newest_gop->sent_fku = TRUE;
1630
1631   splitmux->next_fku_time = next_fku_time;
1632   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1633
1634   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1635 }
1636
1637 static GstPadProbeReturn
1638 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1639 {
1640   GstSplitMuxSink *splitmux = ctx->splitmux;
1641   MqStreamBuf *buf_info = NULL;
1642   GstFlowReturn ret = GST_FLOW_OK;
1643
1644   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1645
1646   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1647   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1648     g_warning ("Buffer list handling not implemented");
1649     return GST_PAD_PROBE_DROP;
1650   }
1651   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1652       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1653     GstEvent *event = gst_pad_probe_info_get_event (info);
1654     gboolean locked = FALSE, wait = !ctx->is_reference;
1655
1656     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1657
1658     switch (GST_EVENT_TYPE (event)) {
1659       case GST_EVENT_SEGMENT:
1660         gst_event_copy_segment (event, &ctx->out_segment);
1661         break;
1662       case GST_EVENT_FLUSH_STOP:
1663         GST_SPLITMUX_LOCK (splitmux);
1664         locked = TRUE;
1665         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1666         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1667         g_queue_clear (&ctx->queued_bufs);
1668         g_queue_clear (&ctx->queued_bufs);
1669         /* If this is the reference context, we just threw away any queued keyframes */
1670         if (ctx->is_reference)
1671           splitmux->queued_keyframes = 0;
1672         ctx->flushing = FALSE;
1673         wait = FALSE;
1674         break;
1675       case GST_EVENT_FLUSH_START:
1676         GST_SPLITMUX_LOCK (splitmux);
1677         locked = TRUE;
1678         GST_LOG_OBJECT (pad, "Flush start");
1679         ctx->flushing = TRUE;
1680         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1681         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1682         break;
1683       case GST_EVENT_EOS:
1684         GST_SPLITMUX_LOCK (splitmux);
1685         locked = TRUE;
1686         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1687           goto beach;
1688         ctx->out_eos = TRUE;
1689
1690         if (ctx == splitmux->reference_ctx) {
1691           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1692           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1693         }
1694
1695         GST_INFO_OBJECT (splitmux,
1696             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1697         break;
1698       case GST_EVENT_GAP:{
1699         GstClockTime gap_ts;
1700         GstClockTimeDiff rtime;
1701
1702         gst_event_parse_gap (event, &gap_ts, NULL);
1703         if (gap_ts == GST_CLOCK_TIME_NONE)
1704           break;
1705
1706         GST_SPLITMUX_LOCK (splitmux);
1707         locked = TRUE;
1708
1709         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1710           goto beach;
1711
1712         /* When we get a gap event on the
1713          * reference stream and we're trying to open a
1714          * new file, we need to store it until we get
1715          * the buffer afterwards
1716          */
1717         if (ctx->is_reference &&
1718             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1719           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1720           gst_event_replace (&ctx->pending_gap, event);
1721           GST_SPLITMUX_UNLOCK (splitmux);
1722           return GST_PAD_PROBE_HANDLED;
1723         }
1724
1725         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1726
1727         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1728             GST_STIME_ARGS (rtime));
1729
1730         if (rtime != GST_CLOCK_STIME_NONE) {
1731           ctx->out_running_time = rtime;
1732           complete_or_wait_on_out (splitmux, ctx);
1733         }
1734         break;
1735       }
1736       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1737         const GstStructure *s;
1738         GstClockTimeDiff ts = 0;
1739
1740         s = gst_event_get_structure (event);
1741         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1742           break;
1743
1744         gst_structure_get_int64 (s, "timestamp", &ts);
1745
1746         GST_SPLITMUX_LOCK (splitmux);
1747         locked = TRUE;
1748
1749         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1750           goto beach;
1751         ctx->out_running_time = ts;
1752         if (!ctx->is_reference)
1753           ret = complete_or_wait_on_out (splitmux, ctx);
1754         GST_SPLITMUX_UNLOCK (splitmux);
1755         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1756         return GST_PAD_PROBE_DROP;
1757       }
1758       case GST_EVENT_CAPS:{
1759         GstPad *peer;
1760
1761         if (!ctx->is_reference)
1762           break;
1763
1764         peer = gst_pad_get_peer (pad);
1765         if (peer) {
1766           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1767
1768           gst_object_unref (peer);
1769
1770           if (ok)
1771             break;
1772
1773         } else {
1774           break;
1775         }
1776         /* This is in the case the muxer doesn't allow this change of caps */
1777         GST_SPLITMUX_LOCK (splitmux);
1778         locked = TRUE;
1779         ctx->caps_change = TRUE;
1780
1781         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1782           GST_DEBUG_OBJECT (splitmux,
1783               "New caps were not accepted. Switching output file");
1784           if (ctx->out_eos == FALSE) {
1785             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1786             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1787           }
1788         }
1789
1790         /* Lets it fall through, if it fails again, then the muxer just can't
1791          * support this format, but at least we have a closed file.
1792          */
1793         break;
1794       }
1795       default:
1796         break;
1797     }
1798
1799     /* We need to make sure events aren't passed
1800      * until the muxer / sink are ready for it */
1801     if (!locked)
1802       GST_SPLITMUX_LOCK (splitmux);
1803     if (wait)
1804       ret = complete_or_wait_on_out (splitmux, ctx);
1805     GST_SPLITMUX_UNLOCK (splitmux);
1806
1807     /* Don't try to forward sticky events before the next buffer is there
1808      * because it would cause a new file to be created without the first
1809      * buffer being available.
1810      */
1811     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1812     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1813       gst_event_unref (event);
1814       return GST_PAD_PROBE_HANDLED;
1815     } else {
1816       return GST_PAD_PROBE_PASS;
1817     }
1818   }
1819
1820   /* Allow everything through until the configured next stopping point */
1821   GST_SPLITMUX_LOCK (splitmux);
1822
1823   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1824   if (buf_info == NULL) {
1825     /* Can only happen due to a poorly timed flush */
1826     ret = GST_FLOW_FLUSHING;
1827     goto beach;
1828   }
1829
1830   /* If we have popped a keyframe, decrement the queued_gop count */
1831   if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1832     splitmux->queued_keyframes--;
1833
1834   ctx->out_running_time = buf_info->run_ts;
1835   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1836
1837   GST_LOG_OBJECT (splitmux,
1838       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1839       " size %" G_GUINT64_FORMAT,
1840       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1841
1842   ctx->caps_change = FALSE;
1843
1844   ret = complete_or_wait_on_out (splitmux, ctx);
1845
1846   splitmux->muxed_out_bytes += buf_info->buf_size;
1847
1848 #ifndef GST_DISABLE_GST_DEBUG
1849   {
1850     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1851     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1852         " run ts %" GST_STIME_FORMAT, buf,
1853         GST_STIME_ARGS (ctx->out_running_time));
1854   }
1855 #endif
1856
1857   ctx->cur_out_buffer = NULL;
1858   GST_SPLITMUX_UNLOCK (splitmux);
1859
1860   /* pending_gap is protected by the STREAM lock */
1861   if (ctx->pending_gap) {
1862     /* If we previously stored a gap event, send it now */
1863     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1864
1865     GST_DEBUG_OBJECT (splitmux,
1866         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1867
1868     gst_pad_send_event (peer, ctx->pending_gap);
1869     ctx->pending_gap = NULL;
1870
1871     gst_object_unref (peer);
1872   }
1873
1874   mq_stream_buf_free (buf_info);
1875
1876   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1877   return GST_PAD_PROBE_PASS;
1878
1879 beach:
1880   GST_SPLITMUX_UNLOCK (splitmux);
1881   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1882   return GST_PAD_PROBE_DROP;
1883 }
1884
1885 static gboolean
1886 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1887 {
1888   return gst_pad_send_event (peer, gst_event_ref (*event));
1889 }
1890
1891 static void
1892 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1893 {
1894   if (ctx->fragment_block_id > 0) {
1895     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1896     ctx->fragment_block_id = 0;
1897   }
1898 }
1899
1900 static void
1901 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1902 {
1903   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1904
1905   gst_pad_sticky_events_foreach (ctx->srcpad,
1906       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1907
1908   /* Clear EOS flag if not actually EOS */
1909   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1910   ctx->out_eos_async_done = ctx->out_eos;
1911
1912   gst_object_unref (peer);
1913 }
1914
1915 static void
1916 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1917 {
1918   GstPad *sinkpad, *srcpad, *newpad;
1919   GstPadTemplate *templ;
1920
1921   srcpad = ctx->srcpad;
1922   sinkpad = gst_pad_get_peer (srcpad);
1923
1924   templ = sinkpad->padtemplate;
1925   newpad =
1926       gst_element_request_pad (splitmux->muxer, templ,
1927       GST_PAD_NAME (sinkpad), NULL);
1928
1929   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1930       newpad);
1931   if (!gst_pad_unlink (srcpad, sinkpad)) {
1932     gst_object_unref (sinkpad);
1933     goto fail;
1934   }
1935   if (gst_pad_link_full (srcpad, newpad,
1936           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1937     gst_element_release_request_pad (splitmux->muxer, newpad);
1938     gst_object_unref (sinkpad);
1939     gst_object_unref (newpad);
1940     goto fail;
1941   }
1942   gst_object_unref (newpad);
1943   gst_object_unref (sinkpad);
1944   return;
1945
1946 fail:
1947   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1948       ("Could not create the new muxer/sink"), NULL);
1949 }
1950
1951 static GstPadProbeReturn
1952 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1953 {
1954   return GST_PAD_PROBE_OK;
1955 }
1956
1957 static void
1958 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1959 {
1960   ctx->fragment_block_id =
1961       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1962       NULL, NULL);
1963 }
1964
1965 static gboolean
1966 _set_property_from_structure (GQuark field_id, const GValue * value,
1967     gpointer user_data)
1968 {
1969   const gchar *property_name = g_quark_to_string (field_id);
1970   GObject *element = G_OBJECT (user_data);
1971
1972   g_object_set_property (element, property_name, value);
1973
1974   return TRUE;
1975 }
1976
1977 static void
1978 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1979 {
1980   gst_element_set_locked_state (element, TRUE);
1981   gst_element_set_state (element, GST_STATE_NULL);
1982   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1983   gst_bin_remove (GST_BIN (splitmux), element);
1984 }
1985
1986
1987 static void
1988 _send_event (const GValue * value, gpointer user_data)
1989 {
1990   GstPad *pad = g_value_get_object (value);
1991   GstEvent *ev = user_data;
1992
1993   gst_pad_send_event (pad, gst_event_ref (ev));
1994 }
1995
1996 /* Called with lock held when a fragment
1997  * reaches EOS and it is time to restart
1998  * a new fragment
1999  */
2000 static GstFlowReturn
2001 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2002 {
2003   GstElement *muxer, *sink;
2004
2005   g_assert (ctx->is_reference);
2006
2007   /* 1 change to new file */
2008   splitmux->switching_fragment = TRUE;
2009
2010   /* We need to drop the splitmux lock to acquire the state lock
2011    * here and ensure there's no racy state change going on elsewhere */
2012   muxer = gst_object_ref (splitmux->muxer);
2013   sink = gst_object_ref (splitmux->active_sink);
2014
2015   GST_SPLITMUX_UNLOCK (splitmux);
2016   GST_SPLITMUX_STATE_LOCK (splitmux);
2017
2018   if (splitmux->shutdown) {
2019     GST_DEBUG_OBJECT (splitmux,
2020         "Shutdown requested. Aborting fragment switch.");
2021     GST_SPLITMUX_LOCK (splitmux);
2022     GST_SPLITMUX_STATE_UNLOCK (splitmux);
2023     gst_object_unref (muxer);
2024     gst_object_unref (sink);
2025     return GST_FLOW_FLUSHING;
2026   }
2027
2028   if (splitmux->async_finalize) {
2029     if (splitmux->muxed_out_bytes > 0
2030         || splitmux->fragment_id != splitmux->start_index) {
2031       gchar *newname;
2032       GstElement *new_sink, *new_muxer;
2033
2034       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2035           splitmux->fragment_id);
2036       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2037       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2038       GST_SPLITMUX_LOCK (splitmux);
2039       if ((splitmux->sink =
2040               create_element (splitmux, splitmux->sink_factory, newname,
2041                   TRUE)) == NULL)
2042         goto fail;
2043       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2044         gst_preset_load_preset (GST_PRESET (splitmux->sink),
2045             splitmux->sink_preset);
2046       if (splitmux->sink_properties)
2047         gst_structure_foreach (splitmux->sink_properties,
2048             _set_property_from_structure, splitmux->sink);
2049       splitmux->active_sink = splitmux->sink;
2050       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2051       g_free (newname);
2052       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2053       if ((splitmux->muxer =
2054               create_element (splitmux, splitmux->muxer_factory, newname,
2055                   TRUE)) == NULL)
2056         goto fail;
2057       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2058               "async") != NULL) {
2059         /* async child elements are causing state change races and weird
2060          * failures, so let's try and turn that off */
2061         g_object_set (splitmux->sink, "async", FALSE, NULL);
2062       }
2063       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2064         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2065             splitmux->muxer_preset);
2066       if (splitmux->muxer_properties)
2067         gst_structure_foreach (splitmux->muxer_properties,
2068             _set_property_from_structure, splitmux->muxer);
2069       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2070       g_free (newname);
2071       new_sink = splitmux->sink;
2072       new_muxer = splitmux->muxer;
2073       GST_SPLITMUX_UNLOCK (splitmux);
2074       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2075       gst_element_link (new_muxer, new_sink);
2076
2077       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2078         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2079                     EOS_FROM_US)) == 2) {
2080           _lock_and_set_to_null (muxer, splitmux);
2081           _lock_and_set_to_null (sink, splitmux);
2082         } else {
2083           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2084               GINT_TO_POINTER (2));
2085         }
2086       }
2087       gst_object_unref (muxer);
2088       gst_object_unref (sink);
2089       muxer = new_muxer;
2090       sink = new_sink;
2091       gst_object_ref (muxer);
2092       gst_object_ref (sink);
2093     }
2094   } else {
2095
2096     gst_element_set_locked_state (muxer, TRUE);
2097     gst_element_set_locked_state (sink, TRUE);
2098     gst_element_set_state (sink, GST_STATE_NULL);
2099
2100     if (splitmux->reset_muxer) {
2101       gst_element_set_state (muxer, GST_STATE_NULL);
2102     } else {
2103       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2104       GstEvent *ev;
2105       guint32 seqnum;
2106
2107       ev = gst_event_new_flush_start ();
2108       seqnum = gst_event_get_seqnum (ev);
2109       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110       gst_event_unref (ev);
2111
2112       gst_iterator_resync (it);
2113
2114       ev = gst_event_new_flush_stop (TRUE);
2115       gst_event_set_seqnum (ev, seqnum);
2116       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2117       gst_event_unref (ev);
2118
2119       gst_iterator_free (it);
2120     }
2121   }
2122
2123   GST_SPLITMUX_LOCK (splitmux);
2124   set_next_filename (splitmux, ctx);
2125   splitmux->muxed_out_bytes = 0;
2126   GST_SPLITMUX_UNLOCK (splitmux);
2127
2128   if (gst_element_set_state (sink,
2129           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2130     gst_element_set_state (sink, GST_STATE_NULL);
2131     gst_element_set_locked_state (muxer, FALSE);
2132     gst_element_set_locked_state (sink, FALSE);
2133
2134     goto fail_output;
2135   }
2136
2137   if (gst_element_set_state (muxer,
2138           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2139     gst_element_set_state (muxer, GST_STATE_NULL);
2140     gst_element_set_state (sink, GST_STATE_NULL);
2141     gst_element_set_locked_state (muxer, FALSE);
2142     gst_element_set_locked_state (sink, FALSE);
2143     goto fail_muxer;
2144   }
2145
2146   gst_element_set_locked_state (muxer, FALSE);
2147   gst_element_set_locked_state (sink, FALSE);
2148
2149   gst_object_unref (sink);
2150   gst_object_unref (muxer);
2151
2152   GST_SPLITMUX_LOCK (splitmux);
2153   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2154   splitmux->switching_fragment = FALSE;
2155   do_async_done (splitmux);
2156
2157   splitmux->ready_for_output = TRUE;
2158
2159   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2160   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2161
2162   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2163
2164   /* FIXME: Is this always the correct next state? */
2165   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2166   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2167   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2168   return GST_FLOW_OK;
2169
2170 fail:
2171   gst_object_unref (sink);
2172   gst_object_unref (muxer);
2173
2174   GST_SPLITMUX_LOCK (splitmux);
2175   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2176   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2177       ("Could not create the new muxer/sink"), NULL);
2178   return GST_FLOW_ERROR;
2179
2180 fail_output:
2181   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2182       ("Could not start new output sink"), NULL);
2183   gst_object_unref (sink);
2184   gst_object_unref (muxer);
2185
2186   GST_SPLITMUX_LOCK (splitmux);
2187   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2188   splitmux->switching_fragment = FALSE;
2189   return GST_FLOW_ERROR;
2190
2191 fail_muxer:
2192   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2193       ("Could not start new muxer"), NULL);
2194   gst_object_unref (sink);
2195   gst_object_unref (muxer);
2196
2197   GST_SPLITMUX_LOCK (splitmux);
2198   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2199   splitmux->switching_fragment = FALSE;
2200   return GST_FLOW_ERROR;
2201 }
2202
2203 static void
2204 bus_handler (GstBin * bin, GstMessage * message)
2205 {
2206   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2207
2208   switch (GST_MESSAGE_TYPE (message)) {
2209     case GST_MESSAGE_EOS:{
2210       /* If the state is draining out the current file, drop this EOS */
2211       GstElement *sink;
2212
2213       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2214       GST_SPLITMUX_LOCK (splitmux);
2215
2216       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2217
2218       if (splitmux->async_finalize) {
2219
2220         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2221           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2222                       EOS_FROM_US)) == 2) {
2223             GstElement *muxer;
2224             GstPad *sinksink, *muxersrc;
2225
2226             sinksink = gst_element_get_static_pad (sink, "sink");
2227             muxersrc = gst_pad_get_peer (sinksink);
2228             muxer = gst_pad_get_parent_element (muxersrc);
2229             gst_object_unref (sinksink);
2230             gst_object_unref (muxersrc);
2231
2232             gst_element_call_async (muxer,
2233                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2234                 gst_object_ref (splitmux), gst_object_unref);
2235             gst_element_call_async (sink,
2236                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2237                 gst_object_ref (splitmux), gst_object_unref);
2238             gst_object_unref (muxer);
2239           } else {
2240             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2241                 GINT_TO_POINTER (2));
2242           }
2243           GST_DEBUG_OBJECT (splitmux,
2244               "Caught async EOS from previous muxer+sink. Dropping.");
2245           /* We forward the EOS so that it gets aggregated as normal. If the sink
2246            * finishes and is removed before the end, it will be de-aggregated */
2247           gst_message_unref (message);
2248           GST_SPLITMUX_UNLOCK (splitmux);
2249           return;
2250         }
2251       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2252         GST_DEBUG_OBJECT (splitmux,
2253             "Passing EOS message. Output state %d max_out_running_time %"
2254             GST_STIME_FORMAT, splitmux->output_state,
2255             GST_STIME_ARGS (splitmux->max_out_running_time));
2256       } else {
2257         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2258         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2259         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2260
2261         gst_message_unref (message);
2262         GST_SPLITMUX_UNLOCK (splitmux);
2263         return;
2264       }
2265       GST_SPLITMUX_UNLOCK (splitmux);
2266       break;
2267     }
2268     case GST_MESSAGE_ASYNC_START:
2269     case GST_MESSAGE_ASYNC_DONE:
2270       /* Ignore state changes from our children while switching */
2271       GST_SPLITMUX_LOCK (splitmux);
2272       if (splitmux->switching_fragment) {
2273         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2274             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2275           GST_LOG_OBJECT (splitmux,
2276               "Ignoring state change from child %" GST_PTR_FORMAT
2277               " while switching", GST_MESSAGE_SRC (message));
2278           gst_message_unref (message);
2279           GST_SPLITMUX_UNLOCK (splitmux);
2280           return;
2281         }
2282       }
2283       GST_SPLITMUX_UNLOCK (splitmux);
2284       break;
2285     case GST_MESSAGE_WARNING:
2286     {
2287       GError *gerror = NULL;
2288
2289       gst_message_parse_warning (message, &gerror, NULL);
2290
2291       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2292         GList *item;
2293         gboolean caps_change = FALSE;
2294
2295         GST_SPLITMUX_LOCK (splitmux);
2296
2297         for (item = splitmux->contexts; item; item = item->next) {
2298           MqStreamCtx *ctx = item->data;
2299
2300           if (ctx->caps_change) {
2301             caps_change = TRUE;
2302             break;
2303           }
2304         }
2305
2306         GST_SPLITMUX_UNLOCK (splitmux);
2307
2308         if (caps_change) {
2309           GST_LOG_OBJECT (splitmux,
2310               "Ignoring warning change from child %" GST_PTR_FORMAT
2311               " while switching caps", GST_MESSAGE_SRC (message));
2312           gst_message_unref (message);
2313           return;
2314         }
2315       }
2316       break;
2317     }
2318     default:
2319       break;
2320   }
2321
2322   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2323 }
2324
2325 static void
2326 ctx_set_unblock (MqStreamCtx * ctx)
2327 {
2328   ctx->need_unblock = TRUE;
2329 }
2330
2331 static gboolean
2332 need_new_fragment (GstSplitMuxSink * splitmux,
2333     GstClockTime queued_time, GstClockTime queued_gop_time,
2334     guint64 queued_bytes)
2335 {
2336   guint64 thresh_bytes;
2337   GstClockTime thresh_time;
2338   gboolean check_robust_muxing;
2339   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2340   GstClockTime *ptr_to_time;
2341   const InputGop *gop, *next_gop;
2342
2343   GST_OBJECT_LOCK (splitmux);
2344   thresh_bytes = splitmux->threshold_bytes;
2345   thresh_time = splitmux->threshold_time;
2346   ptr_to_time = (GstClockTime *)
2347       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2348   if (ptr_to_time)
2349     time_to_split = *ptr_to_time;
2350   check_robust_muxing = splitmux->use_robust_muxing
2351       && splitmux->muxer_has_reserved_props;
2352   GST_OBJECT_UNLOCK (splitmux);
2353
2354   /* Have we muxed at least one thing from the reference
2355    * stream into the file? If not, no other streams can have
2356    * either */
2357   if (splitmux->fragment_reference_bytes <= 0) {
2358     GST_TRACE_OBJECT (splitmux,
2359         "Not ready to split - nothing muxed on the reference stream");
2360     return FALSE;
2361   }
2362
2363   /* User told us to split now */
2364   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2365     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2366     return TRUE;
2367   }
2368
2369   gop = g_queue_peek_head (&splitmux->pending_input_gops);
2370   /* We need a full GOP queued up at this point */
2371   g_assert (gop != NULL);
2372   next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2373   /* And the beginning of the next GOP or otherwise EOS */
2374
2375   /* User told us to split at this running time */
2376   if (gop->start_time >= time_to_split) {
2377     GST_OBJECT_LOCK (splitmux);
2378     /* Dequeue running time */
2379     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2380     /* Empty any running times after this that are past now */
2381     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382     while (ptr_to_time) {
2383       time_to_split = *ptr_to_time;
2384       if (gop->start_time < time_to_split) {
2385         break;
2386       }
2387       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2388       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2389     }
2390     GST_TRACE_OBJECT (splitmux,
2391         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2392         GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2393         GST_STIME_ARGS (time_to_split));
2394     GST_OBJECT_UNLOCK (splitmux);
2395     return TRUE;
2396   }
2397
2398   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2399     GST_TRACE_OBJECT (splitmux,
2400         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2401     return TRUE;                /* Would overrun byte limit */
2402   }
2403
2404   if (thresh_time > 0 && queued_time > thresh_time) {
2405     GST_TRACE_OBJECT (splitmux,
2406         "queued time %" GST_STIME_FORMAT " overruns time limit",
2407         GST_STIME_ARGS (queued_time));
2408     return TRUE;                /* Would overrun time limit */
2409   }
2410
2411   if (splitmux->tc_interval) {
2412     GstClockTime next_gop_start_time =
2413         next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2414
2415     if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2416         GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2417         next_gop_start_time >
2418         splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2419       GST_TRACE_OBJECT (splitmux,
2420           "in running time %" GST_STIME_FORMAT " overruns time limit %"
2421           GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2422           GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2423       return TRUE;
2424     }
2425   }
2426
2427   if (check_robust_muxing) {
2428     GstClockTime mux_reserved_remain;
2429
2430     g_object_get (splitmux->muxer,
2431         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2432
2433     GST_LOG_OBJECT (splitmux,
2434         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2435         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2436         mux_reserved_remain, queued_gop_time);
2437
2438     if (queued_gop_time >= mux_reserved_remain) {
2439       GST_INFO_OBJECT (splitmux,
2440           "File is about to run out of header room - %" G_GUINT64_FORMAT
2441           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2442           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2443       return TRUE;
2444     }
2445   }
2446
2447   /* Continue and mux this GOP */
2448   return FALSE;
2449 }
2450
2451 /* probably we want to add this API? */
2452 static void
2453 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2454 {
2455   GstVideoTimeCode *timecode = NULL;
2456
2457   g_return_if_fail (old_tc != NULL);
2458
2459   if (*old_tc == new_tc)
2460     return;
2461
2462   if (new_tc)
2463     timecode = gst_video_time_code_copy (new_tc);
2464
2465   if (*old_tc)
2466     gst_video_time_code_free (*old_tc);
2467
2468   *old_tc = timecode;
2469 }
2470
2471 /* Called with splitmux lock held */
2472 /* Called when entering ProcessingCompleteGop state
2473  * Assess if mq contents overflowed the current file
2474  *   -> If yes, need to switch to new file
2475  *   -> if no, set max_out_running_time to let this GOP in and
2476  *      go to COLLECTING_GOP_START state
2477  */
2478 static void
2479 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2480     GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2481 {
2482   guint64 queued_bytes;
2483   GstClockTimeDiff queued_time = 0;
2484   GstClockTimeDiff queued_gop_time = 0;
2485   SplitMuxOutputCommand *cmd;
2486
2487   /* Assess if the multiqueue contents overflowed the current file */
2488   /* When considering if a newly gathered GOP overflows
2489    * the time limit for the file, only consider the running time of the
2490    * reference stream. Other streams might have run ahead a little bit,
2491    * but extra pieces won't be released to the muxer beyond the reference
2492    * stream cut-off anyway - so it forms the limit. */
2493   queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2494   queued_time = next_gop_start_time;
2495   /* queued_gop_time tracks how much unwritten data there is waiting to
2496    * be written to this fragment including this GOP */
2497   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2498     queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2499   else
2500     queued_gop_time = queued_time - gop->start_time;
2501
2502   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2503   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2504       " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2505       " gop start time %" GST_STIME_FORMAT,
2506       GST_STIME_ARGS (queued_time), queued_bytes,
2507       GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2508
2509   if (queued_gop_time < 0)
2510     goto error_gop_duration;
2511
2512   if (queued_time < splitmux->fragment_start_time)
2513     goto error_queued_time;
2514
2515   queued_time -= splitmux->fragment_start_time;
2516   if (queued_time < queued_gop_time)
2517     queued_gop_time = queued_time;
2518
2519   /* Expand queued bytes estimate by muxer overhead */
2520   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2521
2522   /* Check for overrun - have we output at least one byte and overrun
2523    * either threshold? */
2524   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2525     if (splitmux->async_finalize) {
2526       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2527       *sink_running_time = splitmux->reference_ctx->out_running_time;
2528       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2529           RUNNING_TIME, sink_running_time, g_free);
2530     }
2531     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2532     /* Tell the output side to start a new fragment */
2533     GST_INFO_OBJECT (splitmux,
2534         "This GOP (dur %" GST_STIME_FORMAT
2535         ") would overflow the fragment, Sending start_new_fragment cmd",
2536         GST_STIME_ARGS (queued_gop_time));
2537     cmd = out_cmd_buf_new ();
2538     cmd->start_new_fragment = TRUE;
2539     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2540     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2541
2542     splitmux->fragment_start_time = gop->start_time;
2543     splitmux->fragment_start_time_pts = gop->start_time_pts;
2544     splitmux->fragment_total_bytes = 0;
2545     splitmux->fragment_reference_bytes = 0;
2546
2547     video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2548     splitmux->next_fragment_start_tc_time =
2549         calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2550         splitmux->fragment_start_time, NULL);
2551     if (splitmux->tc_interval && splitmux->fragment_start_tc
2552         && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2553       GST_WARNING_OBJECT (splitmux,
2554           "Couldn't calculate next fragment start time for timecode mode");
2555     }
2556   }
2557
2558   /* And set up to collect the next GOP */
2559   if (max_out_running_time != G_MAXINT64) {
2560     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2561   } else {
2562     /* This is probably already the current state, but just in case: */
2563     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2564   }
2565
2566   /* And wake all input contexts to send a wake-up event */
2567   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2568   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2569
2570   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2571   splitmux->fragment_total_bytes += gop->total_bytes;
2572   splitmux->fragment_reference_bytes += gop->reference_bytes;
2573
2574   if (gop->total_bytes > 0) {
2575     GST_LOG_OBJECT (splitmux,
2576         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2577         " time %" GST_STIME_FORMAT,
2578         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2579
2580     /* Send this GOP to the output command queue */
2581     cmd = out_cmd_buf_new ();
2582     cmd->start_new_fragment = FALSE;
2583     cmd->max_output_ts = max_out_running_time;
2584     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2585         GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2586     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2587
2588     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2589   }
2590
2591   return;
2592
2593 error_gop_duration:
2594   GST_ELEMENT_ERROR (splitmux,
2595       STREAM, FAILED, ("Timestamping error on input streams"),
2596       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2597           GST_STIME_ARGS (queued_gop_time)));
2598   return;
2599 error_queued_time:
2600   GST_ELEMENT_ERROR (splitmux,
2601       STREAM, FAILED, ("Timestamping error on input streams"),
2602       ("Queued time is negative. Input went backwards. queued_time - %"
2603           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2604   return;
2605 }
2606
2607 /* Called with splitmux lock held */
2608 /* Called from each input pad when it is has all the pieces
2609  * for a GOP or EOS, starting with the reference pad which has set the
2610  * splitmux->max_in_running_time
2611  */
2612 static void
2613 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2614 {
2615   GList *cur;
2616   GstEvent *event;
2617
2618   /* On ENDING_FILE, the reference stream sends a command to start a new
2619    * fragment, then releases the GOP for output in the new fragment.
2620    *  If some streams received no buffer during the last GOP that overran,
2621    * because its next buffer has a timestamp bigger than
2622    * ctx->max_in_running_time, its queue is empty. In that case the only
2623    * way to wakeup the output thread is by injecting an event in the
2624    * queue. This usually happen with subtitle streams.
2625    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2626   if (ctx->need_unblock) {
2627     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2628     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2629         GST_EVENT_TYPE_SERIALIZED,
2630         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2631             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2632
2633     GST_SPLITMUX_UNLOCK (splitmux);
2634     gst_pad_send_event (ctx->sinkpad, event);
2635     GST_SPLITMUX_LOCK (splitmux);
2636
2637     ctx->need_unblock = FALSE;
2638     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2639     /* state may have changed while we were unlocked. Loop again if so */
2640     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2641       return;
2642   }
2643
2644   do {
2645     GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2646
2647     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2648       GstClockTimeDiff max_out_running_time;
2649       gboolean ready = TRUE;
2650       InputGop *gop;
2651       const InputGop *next_gop;
2652
2653       gop = g_queue_peek_head (&splitmux->pending_input_gops);
2654       next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2655
2656       /* If we have no GOP or no next GOP here then the reference context is
2657        * at EOS, otherwise use the start time of the next GOP if we're far
2658        * enough in the GOP to know it */
2659       if (gop && next_gop) {
2660         if (!splitmux->reference_ctx->in_eos
2661             && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2662             && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2663           GST_LOG_OBJECT (splitmux,
2664               "No further GOPs finished collecting, waiting until current DTS %"
2665               GST_STIME_FORMAT " has passed next GOP start PTS %"
2666               GST_STIME_FORMAT,
2667               GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2668               GST_STIME_ARGS (next_gop->start_time_pts));
2669           break;
2670         }
2671
2672         GST_LOG_OBJECT (splitmux,
2673             "Finished collecting GOP with start time %" GST_STIME_FORMAT
2674             ", next GOP start time %" GST_STIME_FORMAT,
2675             GST_STIME_ARGS (gop->start_time),
2676             GST_STIME_ARGS (next_gop->start_time));
2677         next_gop_start = next_gop->start_time;
2678         max_out_running_time =
2679             splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2680       } else if (!next_gop) {
2681         GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2682         next_gop_start = splitmux->max_in_running_time;
2683         max_out_running_time = G_MAXINT64;
2684       } else if (!gop) {
2685         GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2686         break;
2687       } else {
2688         g_assert_not_reached ();
2689       }
2690
2691       g_assert (gop != NULL);
2692
2693       /* Iterate each pad, and check that the input running time is at least
2694        * up to the start running time of the next GOP or EOS, and if so handle
2695        * the collected GOP */
2696       GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2697           GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2698       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2699           cur = g_list_next (cur)) {
2700         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2701
2702         GST_LOG_OBJECT (splitmux,
2703             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2704             " EOS %d", tmpctx, tmpctx->sinkpad,
2705             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2706
2707         if (next_gop_start != GST_CLOCK_STIME_NONE &&
2708             tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2709           GST_LOG_OBJECT (splitmux,
2710               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2711               tmpctx, tmpctx->sinkpad);
2712           ready = FALSE;
2713           break;
2714         }
2715       }
2716       if (ready) {
2717         GST_DEBUG_OBJECT (splitmux,
2718             "Collected GOP is complete. Processing (ctx %p)", ctx);
2719         /* All pads have a complete GOP, release it into the multiqueue */
2720         handle_gathered_gop (splitmux, gop, next_gop_start,
2721             max_out_running_time);
2722
2723         g_queue_pop_head (&splitmux->pending_input_gops);
2724         input_gop_free (gop);
2725
2726         /* The user has requested a split, we can split now that the previous GOP
2727          * has been collected to the correct location */
2728         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2729                 TRUE, FALSE)) {
2730           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2731         }
2732       }
2733     }
2734
2735     /* If upstream reached EOS we are not expecting more data, no need to wait
2736      * here. */
2737     if (ctx->in_eos)
2738       return;
2739
2740     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2741         !ctx->flushing &&
2742         ctx->in_running_time >= next_gop_start &&
2743         next_gop_start != GST_CLOCK_STIME_NONE) {
2744       /* Some pad is not yet ready, or GOP is being pushed
2745        * either way, sleep and wait to get woken */
2746       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2747       GST_SPLITMUX_WAIT_INPUT (splitmux);
2748       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2749     } else {
2750       /* This pad is not ready or the state changed - break out and get another
2751        * buffer / event */
2752       break;
2753     }
2754   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2755 }
2756
2757 static GstPadProbeReturn
2758 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2759 {
2760   GstSplitMuxSink *splitmux = ctx->splitmux;
2761   GstFlowReturn ret = GST_FLOW_OK;
2762   GstBuffer *buf;
2763   MqStreamBuf *buf_info = NULL;
2764   GstClockTime ts, pts, dts;
2765   GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2766   gboolean loop_again;
2767   gboolean keyframe = FALSE;
2768
2769   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2770
2771   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2772   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2773     g_warning ("Buffer list handling not implemented");
2774     return GST_PAD_PROBE_DROP;
2775   }
2776   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2777       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2778     GstEvent *event = gst_pad_probe_info_get_event (info);
2779
2780     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2781
2782     switch (GST_EVENT_TYPE (event)) {
2783       case GST_EVENT_SEGMENT:
2784         gst_event_copy_segment (event, &ctx->in_segment);
2785         break;
2786       case GST_EVENT_FLUSH_STOP:
2787         GST_SPLITMUX_LOCK (splitmux);
2788         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2789         ctx->in_eos = FALSE;
2790         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2791         GST_SPLITMUX_UNLOCK (splitmux);
2792         break;
2793       case GST_EVENT_EOS:
2794         GST_SPLITMUX_LOCK (splitmux);
2795         ctx->in_eos = TRUE;
2796
2797         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2798           ret = GST_FLOW_FLUSHING;
2799           goto beach;
2800         }
2801
2802         if (ctx->is_reference) {
2803           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2804           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2805           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2806           /* Wake up other input pads to collect this GOP */
2807           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2808           if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2809             GST_WARNING_OBJECT (splitmux,
2810                 "EOS with no buffers received on the reference pad");
2811
2812             /* - child muxer and sink might be still locked state
2813              *   (see gst_splitmux_reset_elements()) so should be unlocked
2814              *   for state change of splitmuxsink to be applied to child
2815              * - would need to post async done message
2816              * - location on sink element is still null then it will post
2817              *   error message on bus (muxer will produce something, header
2818              *   data for example)
2819              *
2820              * Calls start_next_fragment() here, the method will address
2821              * everything the above mentioned one */
2822             ret = start_next_fragment (splitmux, ctx);
2823             if (ret != GST_FLOW_OK)
2824               goto beach;
2825           } else {
2826             check_completed_gop (splitmux, ctx);
2827           }
2828         } else if (splitmux->input_state ==
2829             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2830           /* If we are waiting for a GOP to be completed (ie, for aux
2831            * pads to catch up), then this pad is complete, so check
2832            * if the whole GOP is.
2833            */
2834           if (!g_queue_is_empty (&splitmux->pending_input_gops))
2835             check_completed_gop (splitmux, ctx);
2836         }
2837         GST_SPLITMUX_UNLOCK (splitmux);
2838         break;
2839       case GST_EVENT_GAP:{
2840         GstClockTime gap_ts;
2841         GstClockTimeDiff rtime;
2842
2843         gst_event_parse_gap (event, &gap_ts, NULL);
2844         if (gap_ts == GST_CLOCK_TIME_NONE)
2845           break;
2846
2847         GST_SPLITMUX_LOCK (splitmux);
2848
2849         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2850           ret = GST_FLOW_FLUSHING;
2851           goto beach;
2852         }
2853         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2854
2855         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2856             GST_STIME_ARGS (rtime));
2857
2858         if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2859           /* If this GAP event happens before the first fragment then
2860            * initialize the fragment start time here. */
2861           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2862             splitmux->fragment_start_time = rtime;
2863             GST_LOG_OBJECT (splitmux,
2864                 "Fragment start time now %" GST_STIME_FORMAT,
2865                 GST_STIME_ARGS (splitmux->fragment_start_time));
2866
2867             /* Also take this as the first start time when starting up,
2868              * so that we start counting overflow from the first frame */
2869             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2870               splitmux->max_in_running_time = rtime;
2871             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2872               splitmux->max_in_running_time_dts = rtime;
2873           }
2874
2875           /* Similarly take it as fragment start PTS and GOP start time if
2876            * these are not set */
2877           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2878             splitmux->fragment_start_time_pts = rtime;
2879
2880           if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2881             InputGop *gop = g_slice_new0 (InputGop);
2882
2883             gop->from_gap = TRUE;
2884             gop->start_time = rtime;
2885             gop->start_time_pts = rtime;
2886
2887             g_queue_push_tail (&splitmux->pending_input_gops, gop);
2888           }
2889         }
2890
2891         GST_SPLITMUX_UNLOCK (splitmux);
2892         break;
2893       }
2894       default:
2895         break;
2896     }
2897     return GST_PAD_PROBE_PASS;
2898   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2899     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2900       case GST_QUERY_ALLOCATION:
2901         return GST_PAD_PROBE_DROP;
2902       default:
2903         return GST_PAD_PROBE_PASS;
2904     }
2905   }
2906
2907   buf = gst_pad_probe_info_get_buffer (info);
2908   buf_info = mq_stream_buf_new ();
2909
2910   pts = GST_BUFFER_PTS (buf);
2911   dts = GST_BUFFER_DTS (buf);
2912   if (GST_BUFFER_PTS_IS_VALID (buf))
2913     ts = GST_BUFFER_PTS (buf);
2914   else
2915     ts = GST_BUFFER_DTS (buf);
2916
2917   GST_LOG_OBJECT (pad,
2918       "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2919       GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2920       GST_TIME_ARGS (dts));
2921
2922   GST_SPLITMUX_LOCK (splitmux);
2923
2924   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2925     ret = GST_FLOW_FLUSHING;
2926     goto beach;
2927   }
2928
2929   /* If this buffer has a timestamp, advance the input timestamp of the
2930    * stream */
2931   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2932     running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2933
2934     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2935         GST_STIME_ARGS (running_time));
2936
2937     /* in running time is always the maximum PTS (or DTS) that was observed so far */
2938     if (GST_CLOCK_STIME_IS_VALID (running_time)
2939         && running_time > ctx->in_running_time)
2940       ctx->in_running_time = running_time;
2941   } else {
2942     running_time = ctx->in_running_time;
2943   }
2944
2945   if (GST_CLOCK_TIME_IS_VALID (pts))
2946     running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2947   else
2948     running_time_pts = GST_CLOCK_STIME_NONE;
2949
2950   if (GST_CLOCK_TIME_IS_VALID (dts)) {
2951     running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2952
2953     /* DTS > PTS makes conceptually no sense so catch such invalid DTS here
2954      * by clamping to the PTS */
2955     running_time_dts = MIN (running_time_pts, running_time_dts);
2956   } else {
2957     /* If there is no DTS then assume PTS=DTS */
2958     running_time_dts = running_time_pts;
2959   }
2960
2961   /* Try to make sure we have a valid running time */
2962   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2963     ctx->in_running_time =
2964         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2965   }
2966
2967   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2968       GST_STIME_ARGS (ctx->in_running_time));
2969
2970   buf_info->run_ts = ctx->in_running_time;
2971   buf_info->buf_size = gst_buffer_get_size (buf);
2972   buf_info->duration = GST_BUFFER_DURATION (buf);
2973
2974   if (ctx->is_reference) {
2975     InputGop *gop = NULL;
2976     GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2977
2978     /* initialize fragment_start_time if it was not set yet (i.e. for the
2979      * first fragment), or otherwise set it to the minimum observed time */
2980     if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
2981         || splitmux->fragment_start_time > running_time) {
2982       if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
2983         splitmux->fragment_start_time_pts = running_time_pts;
2984       splitmux->fragment_start_time = running_time;
2985
2986       GST_LOG_OBJECT (splitmux,
2987           "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
2988           GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
2989           GST_STIME_ARGS (splitmux->fragment_start_time_pts));
2990
2991       /* Also take this as the first start time when starting up,
2992        * so that we start counting overflow from the first frame */
2993       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
2994           || splitmux->max_in_running_time < splitmux->fragment_start_time)
2995         splitmux->max_in_running_time = splitmux->fragment_start_time;
2996
2997       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2998         splitmux->max_in_running_time_dts = running_time_dts;
2999
3000       if (tc_meta) {
3001         video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
3002
3003         splitmux->next_fragment_start_tc_time =
3004             calculate_next_max_timecode (splitmux, &tc_meta->tc,
3005             running_time, NULL);
3006         if (splitmux->tc_interval
3007             && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
3008         {
3009           GST_WARNING_OBJECT (splitmux,
3010               "Couldn't calculate next fragment start time for timecode mode");
3011         }
3012 #ifndef GST_DISABLE_GST_DEBUG
3013         {
3014           gchar *tc_str;
3015
3016           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3017           GST_DEBUG_OBJECT (splitmux,
3018               "Initialize fragment start timecode %s, next fragment start timecode time %"
3019               GST_TIME_FORMAT, tc_str,
3020               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
3021           g_free (tc_str);
3022         }
3023 #endif
3024       }
3025     }
3026
3027
3028     /* First check if we're at the very first GOP and the tracking was created
3029      * from a GAP event. In that case don't start a new GOP on keyframes but
3030      * just updated it as needed */
3031     gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3032
3033     if (!gop || (!gop->from_gap
3034             && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
3035       gop = g_slice_new0 (InputGop);
3036
3037       gop->start_time = running_time;
3038       gop->start_time_pts = running_time_pts;
3039
3040       GST_LOG_OBJECT (splitmux,
3041           "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3042           GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3043           GST_STIME_ARGS (gop->start_time_pts));
3044
3045       if (tc_meta) {
3046         video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3047
3048 #ifndef GST_DISABLE_GST_DEBUG
3049         {
3050           gchar *tc_str;
3051
3052           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3053           GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3054           g_free (tc_str);
3055         }
3056 #endif
3057       }
3058
3059       g_queue_push_tail (&splitmux->pending_input_gops, gop);
3060     } else {
3061       gop->from_gap = FALSE;
3062
3063       if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3064           || gop->start_time > running_time) {
3065         gop->start_time = running_time;
3066
3067         GST_LOG_OBJECT (splitmux,
3068             "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3069             GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3070             GST_STIME_ARGS (gop->start_time_pts));
3071
3072         if (tc_meta) {
3073           video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3074
3075 #ifndef GST_DISABLE_GST_DEBUG
3076           {
3077             gchar *tc_str;
3078
3079             tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3080             GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3081                 tc_str);
3082             g_free (tc_str);
3083           }
3084 #endif
3085         }
3086       }
3087     }
3088
3089     /* Check whether we need to request next keyframe depending on
3090      * current running time */
3091     if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3092       GST_WARNING_OBJECT (splitmux,
3093           "Could not request a keyframe. Files may not split at the exact location they should");
3094     }
3095   }
3096
3097   {
3098     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3099
3100     if (gop) {
3101       GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3102           " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3103           G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3104           gop->total_bytes, gop->total_bytes);
3105     }
3106   }
3107
3108   loop_again = TRUE;
3109   do {
3110     if (ctx->flushing) {
3111       ret = GST_FLOW_FLUSHING;
3112       goto beach;
3113     }
3114
3115     switch (splitmux->input_state) {
3116       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3117         if (ctx->is_reference) {
3118           const InputGop *gop, *next_gop;
3119
3120           /* This is the reference context. If it's a keyframe,
3121            * it marks the start of a new GOP and we should wait in
3122            * check_completed_gop before continuing, but either way
3123            * (keyframe or no, we'll pass this buffer through after
3124            * so set loop_again to FALSE */
3125           loop_again = FALSE;
3126
3127           gop = g_queue_peek_head (&splitmux->pending_input_gops);
3128           g_assert (gop != NULL);
3129           next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3130
3131           if (ctx->in_running_time > splitmux->max_in_running_time)
3132             splitmux->max_in_running_time = ctx->in_running_time;
3133           if (running_time_dts > splitmux->max_in_running_time_dts)
3134             splitmux->max_in_running_time_dts = running_time_dts;
3135
3136           GST_LOG_OBJECT (splitmux,
3137               "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3138               GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3139               GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3140
3141           if (!next_gop) {
3142             GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3143             /* Allow other input pads to catch up to here too */
3144             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3145             break;
3146           }
3147
3148           if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3149             GST_INFO_OBJECT (pad,
3150                 "Have keyframe with running time %" GST_STIME_FORMAT,
3151                 GST_STIME_ARGS (ctx->in_running_time));
3152             keyframe = TRUE;
3153           }
3154
3155           if (running_time_dts != GST_CLOCK_STIME_NONE
3156               && running_time_dts < next_gop->start_time_pts) {
3157             GST_DEBUG_OBJECT (splitmux,
3158                 "Waiting until DTS (%" GST_STIME_FORMAT
3159                 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3160                 GST_STIME_ARGS (running_time_dts),
3161                 GST_STIME_ARGS (next_gop->start_time_pts));
3162             /* Allow other input pads to catch up to here too */
3163             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3164             break;
3165           }
3166
3167           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3168           /* Wake up other input pads to collect this GOP */
3169           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3170           check_completed_gop (splitmux, ctx);
3171         } else {
3172           /* Pass this buffer if the reference ctx is far enough ahead */
3173           if (ctx->in_running_time < splitmux->max_in_running_time) {
3174             loop_again = FALSE;
3175             break;
3176           }
3177
3178           /* We're still waiting for a keyframe on the reference pad, sleep */
3179           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3180           GST_SPLITMUX_WAIT_INPUT (splitmux);
3181           GST_LOG_OBJECT (pad,
3182               "Done sleeping for GOP start input state now %d",
3183               splitmux->input_state);
3184         }
3185         break;
3186       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3187         /* We're collecting a GOP, this is only ever called for non-reference
3188          * contexts as the reference context would be waiting inside
3189          * check_completed_gop() */
3190
3191         g_assert (!ctx->is_reference);
3192
3193         /* If we overran the target timestamp, it might be time to process
3194          * the GOP, otherwise bail out for more data. */
3195         GST_LOG_OBJECT (pad,
3196             "Checking TS %" GST_STIME_FORMAT " against max %"
3197             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3198             GST_STIME_ARGS (splitmux->max_in_running_time));
3199
3200         if (ctx->in_running_time < splitmux->max_in_running_time) {
3201           loop_again = FALSE;
3202           break;
3203         }
3204
3205         GST_LOG_OBJECT (pad,
3206             "Collected last packet of GOP. Checking other pads");
3207
3208         if (g_queue_is_empty (&splitmux->pending_input_gops)) {
3209           GST_WARNING_OBJECT (pad,
3210               "Reference was closed without GOP, dropping");
3211           goto drop;
3212         }
3213
3214         check_completed_gop (splitmux, ctx);
3215         break;
3216       }
3217       case SPLITMUX_INPUT_STATE_FINISHING_UP:
3218         loop_again = FALSE;
3219         break;
3220       default:
3221         loop_again = FALSE;
3222         break;
3223     }
3224   }
3225   while (loop_again);
3226
3227   if (keyframe && ctx->is_reference)
3228     splitmux->queued_keyframes++;
3229   buf_info->keyframe = keyframe;
3230
3231   /* Update total input byte counter for overflow detect unless we're after
3232    * EOS now */
3233   if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
3234       && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
3235     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3236
3237     /* We must have a GOP at this point */
3238     g_assert (gop != NULL);
3239
3240     gop->total_bytes += buf_info->buf_size;
3241     if (ctx->is_reference) {
3242       gop->reference_bytes += buf_info->buf_size;
3243     }
3244   }
3245
3246   /* Now add this buffer to the queue just before returning */
3247   g_queue_push_head (&ctx->queued_bufs, buf_info);
3248
3249   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3250       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3251
3252   GST_SPLITMUX_UNLOCK (splitmux);
3253   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3254   return GST_PAD_PROBE_PASS;
3255
3256 beach:
3257   GST_SPLITMUX_UNLOCK (splitmux);
3258   if (buf_info)
3259     mq_stream_buf_free (buf_info);
3260   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3261   return GST_PAD_PROBE_PASS;
3262 drop:
3263   GST_SPLITMUX_UNLOCK (splitmux);
3264   if (buf_info)
3265     mq_stream_buf_free (buf_info);
3266   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = GST_FLOW_EOS;
3267   return GST_PAD_PROBE_DROP;
3268 }
3269
3270 static void
3271 grow_blocked_queues (GstSplitMuxSink * splitmux)
3272 {
3273   GList *cur;
3274
3275   /* Scan other queues for full-ness and grow them */
3276   for (cur = g_list_first (splitmux->contexts);
3277       cur != NULL; cur = g_list_next (cur)) {
3278     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3279     guint cur_limit;
3280     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3281
3282     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3283     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3284
3285     if (cur_len >= cur_limit) {
3286       cur_limit = cur_len + 1;
3287       GST_DEBUG_OBJECT (tmpctx->q,
3288           "Queue overflowed and needs enlarging. Growing to %u buffers",
3289           cur_limit);
3290       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3291     }
3292   }
3293 }
3294
3295 static void
3296 handle_q_underrun (GstElement * q, gpointer user_data)
3297 {
3298   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3299   GstSplitMuxSink *splitmux = ctx->splitmux;
3300
3301   GST_SPLITMUX_LOCK (splitmux);
3302   GST_DEBUG_OBJECT (q,
3303       "Queue reported underrun with %d keyframes and %d cmds enqueued",
3304       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3305   grow_blocked_queues (splitmux);
3306   GST_SPLITMUX_UNLOCK (splitmux);
3307 }
3308
3309 static void
3310 handle_q_overrun (GstElement * q, gpointer user_data)
3311 {
3312   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3313   GstSplitMuxSink *splitmux = ctx->splitmux;
3314   gboolean allow_grow = FALSE;
3315
3316   GST_SPLITMUX_LOCK (splitmux);
3317   GST_DEBUG_OBJECT (q,
3318       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3319       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3320
3321   if (splitmux->queued_keyframes < 2) {
3322     /* Less than a full GOP queued, grow the queue */
3323     allow_grow = TRUE;
3324   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3325     allow_grow = TRUE;
3326   } else {
3327     /* If another queue is starved, grow */
3328     GList *cur;
3329     for (cur = g_list_first (splitmux->contexts);
3330         cur != NULL; cur = g_list_next (cur)) {
3331       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3332       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3333         allow_grow = TRUE;
3334       }
3335     }
3336   }
3337   GST_SPLITMUX_UNLOCK (splitmux);
3338
3339   if (allow_grow) {
3340     guint cur_limit;
3341
3342     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3343     cur_limit++;
3344
3345     GST_DEBUG_OBJECT (q,
3346         "Queue overflowed and needs enlarging. Growing to %u buffers",
3347         cur_limit);
3348
3349     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3350   }
3351 }
3352
3353 /* Called with SPLITMUX lock held */
3354 static const gchar *
3355 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3356 {
3357   const gchar *ret = NULL;
3358
3359   if (splitmux->muxerpad_map == NULL)
3360     return NULL;
3361
3362   if (sinkpad_name == NULL) {
3363     GST_WARNING_OBJECT (splitmux,
3364         "Can't look up request pad in pad map without providing a pad name");
3365     return NULL;
3366   }
3367
3368   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3369   if (ret) {
3370     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3371         ret);
3372     return g_strdup (ret);
3373   }
3374
3375   return NULL;
3376 }
3377
3378 static GstPad *
3379 gst_splitmux_sink_request_new_pad (GstElement * element,
3380     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3381 {
3382   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3383   GstPadTemplate *mux_template = NULL;
3384   GstPad *ret = NULL, *muxpad = NULL;
3385   GstElement *q;
3386   GstPad *q_sink = NULL, *q_src = NULL;
3387   gchar *gname, *qname;
3388   gboolean is_primary_video = FALSE, is_video = FALSE,
3389       muxer_is_requestpad = FALSE;
3390   MqStreamCtx *ctx;
3391   const gchar *muxer_padname = NULL;
3392
3393   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3394
3395   GST_SPLITMUX_LOCK (splitmux);
3396   if (!create_muxer (splitmux))
3397     goto fail;
3398   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3399
3400   if (g_str_equal (templ->name_template, "video") ||
3401       g_str_has_prefix (templ->name_template, "video_aux_")) {
3402     is_primary_video = g_str_equal (templ->name_template, "video");
3403     if (is_primary_video && splitmux->have_video)
3404       goto already_have_video;
3405     is_video = TRUE;
3406   }
3407
3408   /* See if there's a pad map and it lists this pad */
3409   muxer_padname = lookup_muxer_pad (splitmux, name);
3410
3411   if (muxer_padname == NULL) {
3412     if (is_video) {
3413       /* FIXME: Look for a pad template with matching caps, rather than by name */
3414       GST_DEBUG_OBJECT (element,
3415           "searching for pad-template with name 'video_%%u'");
3416       mux_template =
3417           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3418           (splitmux->muxer), "video_%u");
3419
3420       /* Fallback to find sink pad templates named 'video' (flvmux) */
3421       if (!mux_template) {
3422         GST_DEBUG_OBJECT (element,
3423             "searching for pad-template with name 'video'");
3424         mux_template =
3425             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3426             (splitmux->muxer), "video");
3427       }
3428       name = NULL;
3429     } else {
3430       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3431           templ->name_template);
3432       mux_template =
3433           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3434           (splitmux->muxer), templ->name_template);
3435
3436       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3437       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3438         GST_DEBUG_OBJECT (element,
3439             "searching for pad-template with name 'audio'");
3440         mux_template =
3441             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3442             (splitmux->muxer), "audio");
3443         name = NULL;
3444       }
3445     }
3446
3447     if (mux_template == NULL) {
3448       GST_DEBUG_OBJECT (element,
3449           "searching for pad-template with name 'sink_%%d'");
3450       mux_template =
3451           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3452           (splitmux->muxer), "sink_%d");
3453       name = NULL;
3454     }
3455     if (mux_template == NULL) {
3456       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3457       mux_template =
3458           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3459           (splitmux->muxer), "sink");
3460       name = NULL;
3461     }
3462
3463     if (mux_template == NULL) {
3464       GST_ERROR_OBJECT (element,
3465           "unable to find a suitable sink pad-template on the muxer");
3466       goto fail;
3467     }
3468     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3469         mux_template->name_template);
3470
3471     if (mux_template->presence == GST_PAD_REQUEST) {
3472       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3473
3474       muxpad =
3475           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3476       muxer_is_requestpad = TRUE;
3477     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3478       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3479
3480       muxpad =
3481           gst_element_get_static_pad (splitmux->muxer,
3482           mux_template->name_template);
3483     } else {
3484       GST_ERROR_OBJECT (element,
3485           "unexpected pad presence %d", mux_template->presence);
3486       goto fail;
3487     }
3488   } else {
3489     /* Have a muxer pad name */
3490     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3491       if ((muxpad =
3492               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3493         muxer_is_requestpad = TRUE;
3494     }
3495     g_free ((gchar *) muxer_padname);
3496     muxer_padname = NULL;
3497   }
3498
3499   /* One way or another, we must have a muxer pad by now */
3500   if (muxpad == NULL)
3501     goto fail;
3502
3503   if (is_primary_video)
3504     gname = g_strdup ("video");
3505   else if (name == NULL)
3506     gname = gst_pad_get_name (muxpad);
3507   else
3508     gname = g_strdup (name);
3509
3510   qname = g_strdup_printf ("queue_%s", gname);
3511   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3512     g_free (qname);
3513     goto fail;
3514   }
3515   g_free (qname);
3516
3517   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3518
3519   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3520       "max-size-buffers", 5, NULL);
3521
3522   q_sink = gst_element_get_static_pad (q, "sink");
3523   q_src = gst_element_get_static_pad (q, "src");
3524
3525   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3526     if (muxer_is_requestpad)
3527       gst_element_release_request_pad (splitmux->muxer, muxpad);
3528     gst_object_unref (GST_OBJECT (muxpad));
3529     goto fail;
3530   }
3531
3532   gst_object_unref (GST_OBJECT (muxpad));
3533
3534   ctx = mq_stream_ctx_new (splitmux);
3535   /* Context holds a ref: */
3536   ctx->q = gst_object_ref (q);
3537   ctx->srcpad = q_src;
3538   ctx->sinkpad = q_sink;
3539   ctx->q_overrun_id =
3540       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3541   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3542
3543   ctx->src_pad_block_id =
3544       gst_pad_add_probe (q_src,
3545       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3546       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3547   if (is_primary_video && splitmux->reference_ctx != NULL) {
3548     splitmux->reference_ctx->is_reference = FALSE;
3549     splitmux->reference_ctx = NULL;
3550   }
3551   if (splitmux->reference_ctx == NULL) {
3552     splitmux->reference_ctx = ctx;
3553     ctx->is_reference = TRUE;
3554   }
3555
3556   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3557   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3558
3559   ctx->sink_pad_block_id =
3560       gst_pad_add_probe (q_sink,
3561       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3562       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3563       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3564
3565   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3566       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3567
3568   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3569
3570   g_free (gname);
3571
3572   if (is_primary_video)
3573     splitmux->have_video = TRUE;
3574
3575   gst_pad_set_active (ret, TRUE);
3576   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3577
3578   GST_SPLITMUX_UNLOCK (splitmux);
3579
3580   return ret;
3581 fail:
3582   GST_SPLITMUX_UNLOCK (splitmux);
3583
3584   if (q_sink)
3585     gst_object_unref (q_sink);
3586   if (q_src)
3587     gst_object_unref (q_src);
3588   return NULL;
3589 already_have_video:
3590   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3591   GST_SPLITMUX_UNLOCK (splitmux);
3592   return NULL;
3593 }
3594
3595 static void
3596 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3597 {
3598   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3599   GstPad *muxpad = NULL;
3600   MqStreamCtx *ctx =
3601       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3602
3603   GST_SPLITMUX_LOCK (splitmux);
3604
3605   if (splitmux->muxer == NULL)
3606     goto fail;                  /* Elements don't exist yet - nothing to release */
3607
3608   GST_INFO_OBJECT (pad, "releasing request pad");
3609
3610   muxpad = gst_pad_get_peer (ctx->srcpad);
3611
3612   /* Remove the context from our consideration */
3613   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3614
3615   ctx->flushing = TRUE;
3616   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3617
3618   GST_SPLITMUX_UNLOCK (splitmux);
3619
3620   if (ctx->sink_pad_block_id) {
3621     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3622     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3623   }
3624
3625   if (ctx->src_pad_block_id)
3626     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3627
3628   /* Wait for the pad to be free */
3629   GST_PAD_STREAM_LOCK (pad);
3630   GST_SPLITMUX_LOCK (splitmux);
3631   GST_PAD_STREAM_UNLOCK (pad);
3632
3633   /* Can release the context now */
3634   mq_stream_ctx_free (ctx);
3635   if (ctx == splitmux->reference_ctx)
3636     splitmux->reference_ctx = NULL;
3637
3638   /* Release and free the muxer input */
3639   if (muxpad) {
3640     gst_element_release_request_pad (splitmux->muxer, muxpad);
3641     gst_object_unref (muxpad);
3642   }
3643
3644   if (GST_PAD_PAD_TEMPLATE (pad) &&
3645       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3646               (pad)), "video"))
3647     splitmux->have_video = FALSE;
3648
3649   gst_element_remove_pad (element, pad);
3650
3651   /* Reset the internal elements only after all request pads are released */
3652   if (splitmux->contexts == NULL)
3653     gst_splitmux_reset_elements (splitmux);
3654
3655   /* Wake up other input streams to check if the completion conditions have
3656    * changed */
3657   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3658
3659 fail:
3660   GST_SPLITMUX_UNLOCK (splitmux);
3661 }
3662
3663 static GstElement *
3664 create_element (GstSplitMuxSink * splitmux,
3665     const gchar * factory, const gchar * name, gboolean locked)
3666 {
3667   GstElement *ret = gst_element_factory_make (factory, name);
3668   if (ret == NULL) {
3669     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3670     return NULL;
3671   }
3672
3673   if (locked) {
3674     /* Ensure the sink starts in locked state and NULL - it will be changed
3675      * by the filename setting code */
3676     gst_element_set_locked_state (ret, TRUE);
3677     gst_element_set_state (ret, GST_STATE_NULL);
3678   }
3679
3680   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3681     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3682     gst_object_unref (ret);
3683     return NULL;
3684   }
3685
3686   return ret;
3687 }
3688
3689 static gboolean
3690 create_muxer (GstSplitMuxSink * splitmux)
3691 {
3692   /* Create internal elements */
3693   if (splitmux->muxer == NULL) {
3694     GstElement *provided_muxer = NULL;
3695
3696     GST_OBJECT_LOCK (splitmux);
3697     if (splitmux->provided_muxer != NULL)
3698       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3699     GST_OBJECT_UNLOCK (splitmux);
3700
3701     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3702         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3703       if ((splitmux->muxer =
3704               create_element (splitmux,
3705                   splitmux->muxer_factory ? splitmux->
3706                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3707         goto fail;
3708     } else if (splitmux->async_finalize) {
3709       if ((splitmux->muxer =
3710               create_element (splitmux, splitmux->muxer_factory, "muxer",
3711                   FALSE)) == NULL)
3712         goto fail;
3713       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3714         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3715             splitmux->muxer_preset);
3716       if (splitmux->muxer_properties)
3717         gst_structure_foreach (splitmux->muxer_properties,
3718             _set_property_from_structure, splitmux->muxer);
3719     } else {
3720       /* Ensure it's not in locked state (we might be reusing an old element) */
3721       gst_element_set_locked_state (provided_muxer, FALSE);
3722       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3723         g_warning ("Could not add muxer element - splitmuxsink will not work");
3724         gst_object_unref (provided_muxer);
3725         goto fail;
3726       }
3727
3728       splitmux->muxer = provided_muxer;
3729       gst_object_unref (provided_muxer);
3730     }
3731
3732     if (splitmux->use_robust_muxing) {
3733       update_muxer_properties (splitmux);
3734     }
3735   }
3736
3737   return TRUE;
3738 fail:
3739   return FALSE;
3740 }
3741
3742 static GstElement *
3743 find_sink (GstElement * e)
3744 {
3745   GstElement *res = NULL;
3746   GstIterator *iter;
3747   gboolean done = FALSE;
3748   GValue data = { 0, };
3749
3750   if (!GST_IS_BIN (e))
3751     return e;
3752
3753   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3754     return e;
3755
3756   iter = gst_bin_iterate_sinks (GST_BIN (e));
3757   while (!done) {
3758     switch (gst_iterator_next (iter, &data)) {
3759       case GST_ITERATOR_OK:
3760       {
3761         GstElement *child = g_value_get_object (&data);
3762         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3763                 "location") != NULL) {
3764           res = child;
3765           done = TRUE;
3766         }
3767         g_value_reset (&data);
3768         break;
3769       }
3770       case GST_ITERATOR_RESYNC:
3771         gst_iterator_resync (iter);
3772         break;
3773       case GST_ITERATOR_DONE:
3774         done = TRUE;
3775         break;
3776       case GST_ITERATOR_ERROR:
3777         g_assert_not_reached ();
3778         break;
3779     }
3780   }
3781   g_value_unset (&data);
3782   gst_iterator_free (iter);
3783
3784   return res;
3785 }
3786
3787 static gboolean
3788 create_sink (GstSplitMuxSink * splitmux)
3789 {
3790   GstElement *provided_sink = NULL;
3791
3792   if (splitmux->active_sink == NULL) {
3793
3794     GST_OBJECT_LOCK (splitmux);
3795     if (splitmux->provided_sink != NULL)
3796       provided_sink = gst_object_ref (splitmux->provided_sink);
3797     GST_OBJECT_UNLOCK (splitmux);
3798
3799     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3800         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3801       if ((splitmux->sink =
3802               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3803         goto fail;
3804       splitmux->active_sink = splitmux->sink;
3805     } else if (splitmux->async_finalize) {
3806       if ((splitmux->sink =
3807               create_element (splitmux, splitmux->sink_factory, "sink",
3808                   TRUE)) == NULL)
3809         goto fail;
3810       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3811         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3812             splitmux->sink_preset);
3813       if (splitmux->sink_properties)
3814         gst_structure_foreach (splitmux->sink_properties,
3815             _set_property_from_structure, splitmux->sink);
3816       splitmux->active_sink = splitmux->sink;
3817     } else {
3818       /* Ensure the sink starts in locked state and NULL - it will be changed
3819        * by the filename setting code */
3820       gst_element_set_locked_state (provided_sink, TRUE);
3821       gst_element_set_state (provided_sink, GST_STATE_NULL);
3822       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3823         g_warning ("Could not add sink elements - splitmuxsink will not work");
3824         gst_object_unref (provided_sink);
3825         goto fail;
3826       }
3827
3828       splitmux->active_sink = provided_sink;
3829
3830       /* The bin holds a ref now, we can drop our tmp ref */
3831       gst_object_unref (provided_sink);
3832
3833       /* Find the sink element */
3834       splitmux->sink = find_sink (splitmux->active_sink);
3835       if (splitmux->sink == NULL) {
3836         g_warning
3837             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3838         goto fail;
3839       }
3840     }
3841
3842 #if 1
3843     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3844             "async") != NULL) {
3845       /* async child elements are causing state change races and weird
3846        * failures, so let's try and turn that off */
3847       g_object_set (splitmux->sink, "async", FALSE, NULL);
3848     }
3849 #endif
3850
3851     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3852       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3853       goto fail;
3854     }
3855   }
3856
3857   return TRUE;
3858 fail:
3859   return FALSE;
3860 }
3861
3862 #ifdef __GNUC__
3863 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3864 #endif
3865 static void
3866 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3867 {
3868   gchar *fname = NULL;
3869   GstSample *sample;
3870   GstCaps *caps;
3871
3872   gst_splitmux_sink_ensure_max_files (splitmux);
3873
3874   if (ctx->cur_out_buffer == NULL) {
3875     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3876   }
3877
3878   caps = gst_pad_get_current_caps (ctx->srcpad);
3879   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3880   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3881       splitmux->fragment_id, sample, &fname);
3882   gst_sample_unref (sample);
3883   if (caps)
3884     gst_caps_unref (caps);
3885
3886   if (fname == NULL) {
3887     /* Fallback to the old signal if the new one returned nothing */
3888     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3889         splitmux->fragment_id, &fname);
3890   }
3891
3892   if (!fname)
3893     fname = splitmux->location ?
3894         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3895
3896   if (fname) {
3897     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3898     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3899             "location") != NULL)
3900       g_object_set (splitmux->sink, "location", fname, NULL);
3901     g_free (fname);
3902   }
3903
3904   splitmux->fragment_id++;
3905 }
3906
3907 /* called with GST_SPLITMUX_LOCK */
3908 static void
3909 do_async_start (GstSplitMuxSink * splitmux)
3910 {
3911   GstMessage *message;
3912
3913   if (!splitmux->need_async_start) {
3914     GST_INFO_OBJECT (splitmux, "no async_start needed");
3915     return;
3916   }
3917
3918   splitmux->async_pending = TRUE;
3919
3920   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3921   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3922
3923   GST_SPLITMUX_UNLOCK (splitmux);
3924   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3925       (splitmux), message);
3926   GST_SPLITMUX_LOCK (splitmux);
3927 }
3928
3929 /* called with GST_SPLITMUX_LOCK */
3930 static void
3931 do_async_done (GstSplitMuxSink * splitmux)
3932 {
3933   GstMessage *message;
3934
3935   if (splitmux->async_pending) {
3936     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3937     splitmux->async_pending = FALSE;
3938     GST_SPLITMUX_UNLOCK (splitmux);
3939
3940     message =
3941         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3942         GST_CLOCK_TIME_NONE);
3943     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3944         (splitmux), message);
3945     GST_SPLITMUX_LOCK (splitmux);
3946   }
3947
3948   splitmux->need_async_start = FALSE;
3949 }
3950
3951 static void
3952 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3953 {
3954   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3955   splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3956
3957   splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3958   splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3959   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3960
3961   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3962   g_queue_clear (&splitmux->pending_input_gops);
3963
3964   splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3965   splitmux->fragment_total_bytes = 0;
3966   splitmux->fragment_reference_bytes = 0;
3967   splitmux->muxed_out_bytes = 0;
3968   splitmux->ready_for_output = FALSE;
3969
3970   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3971   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3972
3973   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3974   gst_queue_array_clear (splitmux->times_to_split);
3975
3976   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3977   splitmux->queued_keyframes = 0;
3978
3979   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3980   g_queue_clear (&splitmux->out_cmd_q);
3981 }
3982
3983 static GstStateChangeReturn
3984 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3985 {
3986   GstStateChangeReturn ret;
3987   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3988
3989   switch (transition) {
3990     case GST_STATE_CHANGE_NULL_TO_READY:{
3991       GST_SPLITMUX_LOCK (splitmux);
3992       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3993         ret = GST_STATE_CHANGE_FAILURE;
3994         GST_SPLITMUX_UNLOCK (splitmux);
3995         goto beach;
3996       }
3997       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3998       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3999       GST_SPLITMUX_UNLOCK (splitmux);
4000       splitmux->fragment_id = splitmux->start_index;
4001       break;
4002     }
4003     case GST_STATE_CHANGE_READY_TO_PAUSED:{
4004       GST_SPLITMUX_LOCK (splitmux);
4005       /* Make sure contexts and tracking times are cleared, in case we're being reused */
4006       gst_splitmux_sink_reset (splitmux);
4007       /* Start by collecting one input on each pad */
4008       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
4009       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
4010
4011       GST_SPLITMUX_UNLOCK (splitmux);
4012
4013       GST_SPLITMUX_STATE_LOCK (splitmux);
4014       splitmux->shutdown = FALSE;
4015       GST_SPLITMUX_STATE_UNLOCK (splitmux);
4016       break;
4017     }
4018     case GST_STATE_CHANGE_PAUSED_TO_READY:
4019     case GST_STATE_CHANGE_READY_TO_READY:
4020       g_atomic_int_set (&(splitmux->split_requested), FALSE);
4021       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
4022       /* Fall through */
4023     case GST_STATE_CHANGE_READY_TO_NULL:
4024       GST_SPLITMUX_STATE_LOCK (splitmux);
4025       splitmux->shutdown = TRUE;
4026       GST_SPLITMUX_STATE_UNLOCK (splitmux);
4027
4028       GST_SPLITMUX_LOCK (splitmux);
4029       gst_splitmux_sink_reset (splitmux);
4030       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
4031       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
4032       /* Wake up any blocked threads */
4033       GST_LOG_OBJECT (splitmux,
4034           "State change -> NULL or READY. Waking threads");
4035       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
4036       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
4037       GST_SPLITMUX_UNLOCK (splitmux);
4038       break;
4039     default:
4040       break;
4041   }
4042
4043   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4044   if (ret == GST_STATE_CHANGE_FAILURE)
4045     goto beach;
4046
4047   switch (transition) {
4048     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4049       splitmux->need_async_start = TRUE;
4050       break;
4051     case GST_STATE_CHANGE_READY_TO_PAUSED:{
4052       /* Change state async, because our child sink might not
4053        * be ready to do that for us yet if it's state is still locked */
4054
4055       splitmux->need_async_start = TRUE;
4056       /* we want to go async to PAUSED until we managed to configure and add the
4057        * sink */
4058       GST_SPLITMUX_LOCK (splitmux);
4059       do_async_start (splitmux);
4060       GST_SPLITMUX_UNLOCK (splitmux);
4061       ret = GST_STATE_CHANGE_ASYNC;
4062       break;
4063     }
4064     case GST_STATE_CHANGE_READY_TO_NULL:
4065       GST_SPLITMUX_LOCK (splitmux);
4066       splitmux->fragment_id = 0;
4067       /* Reset internal elements only if no pad contexts are using them */
4068       if (splitmux->contexts == NULL)
4069         gst_splitmux_reset_elements (splitmux);
4070       do_async_done (splitmux);
4071       GST_SPLITMUX_UNLOCK (splitmux);
4072       break;
4073     default:
4074       break;
4075   }
4076
4077   return ret;
4078
4079 beach:
4080   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4081     /* Cleanup elements on failed transition out of NULL */
4082     gst_splitmux_reset_elements (splitmux);
4083     GST_SPLITMUX_LOCK (splitmux);
4084     do_async_done (splitmux);
4085     GST_SPLITMUX_UNLOCK (splitmux);
4086   }
4087   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4088     /* READY to READY transition only happens when we're already
4089      * in READY state, but a child element is in NULL, which
4090      * happens when there's an error changing the state of the sink.
4091      * We need to make sure not to fail the state transition, or
4092      * the core won't transition us back to NULL successfully */
4093     ret = GST_STATE_CHANGE_SUCCESS;
4094   }
4095   return ret;
4096 }
4097
4098 static void
4099 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4100 {
4101   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4102     splitmux->fragment_id = 0;
4103   }
4104 }
4105
4106 static void
4107 split_now (GstSplitMuxSink * splitmux)
4108 {
4109   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4110 }
4111
4112 static void
4113 split_after (GstSplitMuxSink * splitmux)
4114 {
4115   g_atomic_int_set (&(splitmux->split_requested), TRUE);
4116 }
4117
4118 static void
4119 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4120 {
4121   gboolean send_keyframe_requests;
4122
4123   GST_SPLITMUX_LOCK (splitmux);
4124   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4125   send_keyframe_requests = splitmux->send_keyframe_requests;
4126   GST_SPLITMUX_UNLOCK (splitmux);
4127
4128   if (send_keyframe_requests) {
4129     GstEvent *ev =
4130         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4131     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4132         GST_TIME_ARGS (split_time));
4133     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4134       GST_WARNING_OBJECT (splitmux,
4135           "Could not request keyframe at %" GST_TIME_FORMAT,
4136           GST_TIME_ARGS (split_time));
4137     }
4138   }
4139 }