ad6c3d37827a1a0eb666fcf797d4fa2f3d8ddf60
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202
203 static void
204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250     GstVideoTimeCode ** next_tc);
251
252 static MqStreamBuf *
253 mq_stream_buf_new (void)
254 {
255   return g_slice_new0 (MqStreamBuf);
256 }
257
258 static void
259 mq_stream_buf_free (MqStreamBuf * data)
260 {
261   g_slice_free (MqStreamBuf, data);
262 }
263
264 static SplitMuxOutputCommand *
265 out_cmd_buf_new (void)
266 {
267   return g_slice_new0 (SplitMuxOutputCommand);
268 }
269
270 static void
271 out_cmd_buf_free (SplitMuxOutputCommand * data)
272 {
273   g_slice_free (SplitMuxOutputCommand, data);
274 }
275
276 static void
277 input_gop_free (InputGop * gop)
278 {
279   g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280   g_slice_free (InputGop, gop);
281 }
282
283 static void
284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
285 {
286   GObjectClass *gobject_class = (GObjectClass *) klass;
287   GstElementClass *gstelement_class = (GstElementClass *) klass;
288   GstBinClass *gstbin_class = (GstBinClass *) klass;
289
290   gobject_class->set_property = gst_splitmux_sink_set_property;
291   gobject_class->get_property = gst_splitmux_sink_get_property;
292   gobject_class->dispose = gst_splitmux_sink_dispose;
293   gobject_class->finalize = gst_splitmux_sink_finalize;
294
295   gst_element_class_set_static_metadata (gstelement_class,
296       "Split Muxing Bin", "Generic/Bin/Muxer",
297       "Convenience bin that muxes incoming streams into multiple time/size limited files",
298       "Jan Schmidt <jan@centricular.com>");
299
300   gst_element_class_add_static_pad_template (gstelement_class,
301       &video_sink_template);
302   gst_element_class_add_static_pad_template (gstelement_class,
303       &video_aux_sink_template);
304   gst_element_class_add_static_pad_template (gstelement_class,
305       &audio_sink_template);
306   gst_element_class_add_static_pad_template (gstelement_class,
307       &subtitle_sink_template);
308   gst_element_class_add_static_pad_template (gstelement_class,
309       &caption_sink_template);
310
311   gstelement_class->change_state =
312       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313   gstelement_class->request_new_pad =
314       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315   gstelement_class->release_pad =
316       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
317
318   gstbin_class->handle_message = bus_handler;
319
320   g_object_class_install_property (gobject_class, PROP_LOCATION,
321       g_param_spec_string ("location", "File Output Pattern",
322           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325       g_param_spec_double ("mux-overhead", "Muxing Overhead",
326           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327           DEFAULT_MUXER_OVERHEAD,
328           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
329
330   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333           DEFAULT_MAX_SIZE_TIME,
334           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335           G_PARAM_STATIC_STRINGS));
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339           DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344           "Maximum difference in timecode between first and last frame. "
345           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346           "Will only be effective if a timecode track is present.", NULL,
347           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350       g_param_spec_boolean ("send-keyframe-requests",
351           "Request keyframes at max-size-time",
352           "Request a keyframe every max-size-time ns to try splitting at that point. "
353           "Needs max-size-bytes to be 0 in order to be effective.",
354           DEFAULT_SEND_KEYFRAME_REQUESTS,
355           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358       g_param_spec_uint ("max-files", "Max files",
359           "Maximum number of files to keep on disk. Once the maximum is reached,"
360           "old files start to be deleted to make room for new ones.", 0,
361           G_MAXUINT, DEFAULT_MAX_FILES,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365           "Allow non-reference streams to be that many ns before the reference"
366           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368           G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_MUXER,
371       g_param_spec_object ("muxer", "Muxer",
372           "The muxer element to use (NULL = default mp4mux). "
373           "Valid only for async-finalize = FALSE",
374           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375   g_object_class_install_property (gobject_class, PROP_SINK,
376       g_param_spec_object ("sink", "Sink",
377           "The sink element (or element chain) to use (NULL = default filesink). "
378           "Valid only for async-finalize = FALSE",
379           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382       g_param_spec_boolean ("use-robust-muxing",
383           "Support robust-muxing mode of some muxers",
384           "Check if muxers support robust muxing via the reserved-max-duration and "
385           "reserved-duration-remaining properties and use them if so. "
386           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387           " create new fragments if the reserved header space is about to overflow. "
388           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389           "manually by the app to a non-zero value for robust muxing to have an effect.",
390           DEFAULT_USE_ROBUST_MUXING,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394       g_param_spec_boolean ("reset-muxer",
395           "Reset Muxer",
396           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400       g_param_spec_boolean ("async-finalize",
401           "Finalize fragments asynchronously",
402           "Finalize each fragment asynchronously and start a new one",
403           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405       g_param_spec_string ("muxer-factory", "Muxer factory",
406           "The muxer element factory to use (default = mp4mux). "
407           "Valid only for async-finalize = TRUE",
408           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409   /**
410    * GstSplitMuxSink:muxer-preset
411    *
412    * An optional #GstPreset name to use for the muxer. This only has an effect
413    * in `async-finalize=TRUE` mode.
414    *
415    * Since: 1.18
416    */
417   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418       g_param_spec_string ("muxer-preset", "Muxer preset",
419           "The muxer preset to use. "
420           "Valid only for async-finalize = TRUE",
421           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423       g_param_spec_boxed ("muxer-properties", "Muxer properties",
424           "The muxer element properties to use. "
425           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426           "Valid only for async-finalize = TRUE",
427           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429       g_param_spec_string ("sink-factory", "Sink factory",
430           "The sink element factory to use (default = filesink). "
431           "Valid only for async-finalize = TRUE",
432           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433   /**
434    * GstSplitMuxSink:sink-preset
435    *
436    * An optional #GstPreset name to use for the sink. This only has an effect
437    * in `async-finalize=TRUE` mode.
438    *
439    * Since: 1.18
440    */
441   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442       g_param_spec_string ("sink-preset", "Sink preset",
443           "The sink preset to use. "
444           "Valid only for async-finalize = TRUE",
445           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447       g_param_spec_boxed ("sink-properties", "Sink properties",
448           "The sink element properties to use. "
449           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450           "Valid only for async-finalize = TRUE",
451           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452   g_object_class_install_property (gobject_class, PROP_START_INDEX,
453       g_param_spec_int ("start-index", "Start Index",
454           "Start value of fragment index.",
455           0, G_MAXINT, DEFAULT_START_INDEX,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457
458   /**
459    * GstSplitMuxSink::muxer-pad-map
460    *
461    * An optional GstStructure that provides a map from splitmuxsink sinkpad
462    * names to muxer pad names they should feed. Splitmuxsink has some default
463    * mapping behaviour to link video to video pads and audio to audio pads
464    * that usually works fine. This property is useful if you need to ensure
465    * a particular mapping to muxed streams.
466    *
467    * The GstStructure contains string fields like so:
468    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
469    *
470    * Since: 1.18
471    */
472   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
475           GST_TYPE_STRUCTURE,
476           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
477
478   /**
479    * GstSplitMuxSink::format-location:
480    * @splitmux: the #GstSplitMuxSink
481    * @fragment_id: the sequence number of the file to be created
482    *
483    * Returns: the location to be used for the next output file. This must be
484    *    a newly-allocated string which will be freed with g_free() by the
485    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
486    *    g_strdup_printf() or similar functions to allocate it.
487    */
488   signals[SIGNAL_FORMAT_LOCATION] =
489       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
491
492   /**
493    * GstSplitMuxSink::format-location-full:
494    * @splitmux: the #GstSplitMuxSink
495    * @fragment_id: the sequence number of the file to be created
496    * @first_sample: A #GstSample containing the first buffer
497    *   from the reference stream in the new file
498    *
499    * Returns: the location to be used for the next output file. This must be
500    *    a newly-allocated string which will be freed with g_free() by the
501    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
502    *    g_strdup_printf() or similar functions to allocate it.
503    *
504    * Since: 1.12
505    */
506   signals[SIGNAL_FORMAT_LOCATION_FULL] =
507       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
509       GST_TYPE_SAMPLE);
510
511   /**
512    * GstSplitMuxSink::split-now:
513    * @splitmux: the #GstSplitMuxSink
514    *
515    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516    * The current GOP will be output to the new file.
517    *
518    * Since: 1.14
519    */
520   signals[SIGNAL_SPLIT_NOW] =
521       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
524       G_TYPE_NONE, 0);
525
526   /**
527    * GstSplitMuxSink::split-after:
528    * @splitmux: the #GstSplitMuxSink
529    *
530    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
532    *
533    * Since: 1.16
534    */
535   signals[SIGNAL_SPLIT_AFTER] =
536       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
539       G_TYPE_NONE, 0);
540
541   /**
542    * GstSplitMuxSink::split-at-running-time:
543    * @splitmux: the #GstSplitMuxSink
544    *
545    * When called by the user, this action signal splits the video file (and
546    * begins a new one) as soon as the given running time is reached. If this
547    * action signal is called multiple times, running times are queued up and
548    * processed in the order they were given.
549    *
550    * Note that this is prone to race conditions, where said running time is
551    * reached and surpassed before we had a chance to split. The file will
552    * still split immediately, but in order to make sure that the split doesn't
553    * happen too late, it is recommended to call this action signal from
554    * something that will prevent further buffers from flowing into
555    * splitmuxsink before the split is completed, such as a pad probe before
556    * splitmuxsink.
557    *
558    *
559    * Since: 1.16
560    */
561   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
566
567   /**
568    * GstSplitMuxSink::muxer-added:
569    * @splitmux: the #GstSplitMuxSink
570    * @muxer: the newly added muxer element
571    *
572    * Since: 1.14
573    */
574   signals[SIGNAL_MUXER_ADDED] =
575       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
577
578   /**
579    * GstSplitMuxSink::sink-added:
580    * @splitmux: the #GstSplitMuxSink
581    * @sink: the newly added sink element
582    *
583    * Since: 1.14
584    */
585   signals[SIGNAL_SINK_ADDED] =
586       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
588
589   klass->split_now = split_now;
590   klass->split_after = split_after;
591   klass->split_at_running_time = split_at_running_time;
592 }
593
594 static void
595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
596 {
597   g_mutex_init (&splitmux->lock);
598   g_mutex_init (&splitmux->state_lock);
599   g_cond_init (&splitmux->input_cond);
600   g_cond_init (&splitmux->output_cond);
601   g_queue_init (&splitmux->out_cmd_q);
602
603   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606   splitmux->max_files = DEFAULT_MAX_FILES;
607   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
611
612   splitmux->threshold_timecode_str = NULL;
613
614   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616   splitmux->muxer_properties = NULL;
617   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618   splitmux->sink_properties = NULL;
619
620   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621   splitmux->split_requested = FALSE;
622   splitmux->do_split_next_gop = FALSE;
623   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
625
626   g_queue_init (&splitmux->pending_input_gops);
627 }
628
629 static void
630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
631 {
632   if (splitmux->muxer) {
633     gst_element_set_locked_state (splitmux->muxer, TRUE);
634     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
636   }
637   if (splitmux->active_sink) {
638     gst_element_set_locked_state (splitmux->active_sink, TRUE);
639     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
641   }
642
643   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
644 }
645
646 static void
647 gst_splitmux_sink_dispose (GObject * object)
648 {
649   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
650
651   /* Calling parent dispose invalidates all child pointers */
652   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
653
654   G_OBJECT_CLASS (parent_class)->dispose (object);
655 }
656
657 static void
658 gst_splitmux_sink_finalize (GObject * object)
659 {
660   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
661
662   g_cond_clear (&splitmux->input_cond);
663   g_cond_clear (&splitmux->output_cond);
664   g_mutex_clear (&splitmux->lock);
665   g_mutex_clear (&splitmux->state_lock);
666   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667   g_queue_clear (&splitmux->out_cmd_q);
668   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669   g_queue_clear (&splitmux->pending_input_gops);
670
671   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
672
673   if (splitmux->muxerpad_map)
674     gst_structure_free (splitmux->muxerpad_map);
675
676   if (splitmux->provided_sink)
677     gst_object_unref (splitmux->provided_sink);
678   if (splitmux->provided_muxer)
679     gst_object_unref (splitmux->provided_muxer);
680
681   if (splitmux->muxer_factory)
682     g_free (splitmux->muxer_factory);
683   if (splitmux->muxer_preset)
684     g_free (splitmux->muxer_preset);
685   if (splitmux->muxer_properties)
686     gst_structure_free (splitmux->muxer_properties);
687   if (splitmux->sink_factory)
688     g_free (splitmux->sink_factory);
689   if (splitmux->sink_preset)
690     g_free (splitmux->sink_preset);
691   if (splitmux->sink_properties)
692     gst_structure_free (splitmux->sink_properties);
693
694   if (splitmux->threshold_timecode_str)
695     g_free (splitmux->threshold_timecode_str);
696   if (splitmux->tc_interval)
697     gst_video_time_code_interval_free (splitmux->tc_interval);
698
699   if (splitmux->times_to_split)
700     gst_queue_array_free (splitmux->times_to_split);
701
702   g_free (splitmux->location);
703
704   /* Make sure to free any un-released contexts. There should not be any,
705    * because the dispose will have freed all request pads though */
706   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707   g_list_free (splitmux->contexts);
708
709   G_OBJECT_CLASS (parent_class)->finalize (object);
710 }
711
712 /*
713  * Set any time threshold to the muxer, if it has
714  * reserved-max-duration and reserved-duration-remaining
715  * properties. Called when creating/claiming the muxer
716  * in create_elements() */
717 static void
718 update_muxer_properties (GstSplitMuxSink * sink)
719 {
720   GObjectClass *klass;
721   GstClockTime threshold_time;
722
723   sink->muxer_has_reserved_props = FALSE;
724   if (sink->muxer == NULL)
725     return;
726   klass = G_OBJECT_GET_CLASS (sink->muxer);
727   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
728     return;
729   if (g_object_class_find_property (klass,
730           "reserved-duration-remaining") == NULL)
731     return;
732   sink->muxer_has_reserved_props = TRUE;
733
734   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735       GST_TIME_ARGS (sink->threshold_time));
736   GST_OBJECT_LOCK (sink);
737   threshold_time = sink->threshold_time;
738   GST_OBJECT_UNLOCK (sink);
739
740   if (threshold_time > 0) {
741     /* Tell the muxer how much space to reserve */
742     GstClockTime muxer_threshold = threshold_time;
743     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
744   }
745 }
746
747 static void
748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749     const GValue * value, GParamSpec * pspec)
750 {
751   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
752
753   switch (prop_id) {
754     case PROP_LOCATION:{
755       GST_OBJECT_LOCK (splitmux);
756       g_free (splitmux->location);
757       splitmux->location = g_value_dup_string (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     }
761     case PROP_START_INDEX:
762       GST_OBJECT_LOCK (splitmux);
763       splitmux->start_index = g_value_get_int (value);
764       GST_OBJECT_UNLOCK (splitmux);
765       break;
766     case PROP_MAX_SIZE_BYTES:
767       GST_OBJECT_LOCK (splitmux);
768       splitmux->threshold_bytes = g_value_get_uint64 (value);
769       GST_OBJECT_UNLOCK (splitmux);
770       break;
771     case PROP_MAX_SIZE_TIME:
772       GST_OBJECT_LOCK (splitmux);
773       splitmux->threshold_time = g_value_get_uint64 (value);
774       GST_OBJECT_UNLOCK (splitmux);
775       break;
776     case PROP_MAX_SIZE_TIMECODE:
777       GST_OBJECT_LOCK (splitmux);
778       g_free (splitmux->threshold_timecode_str);
779       /* will be calculated later */
780       g_clear_pointer (&splitmux->tc_interval,
781           gst_video_time_code_interval_free);
782
783       splitmux->threshold_timecode_str = g_value_dup_string (value);
784       if (splitmux->threshold_timecode_str) {
785         splitmux->tc_interval =
786             gst_video_time_code_interval_new_from_string
787             (splitmux->threshold_timecode_str);
788         if (!splitmux->tc_interval) {
789           g_warning ("Wrong timecode string %s",
790               splitmux->threshold_timecode_str);
791           g_free (splitmux->threshold_timecode_str);
792           splitmux->threshold_timecode_str = NULL;
793         }
794       }
795       splitmux->next_fragment_start_tc_time =
796           calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
797           splitmux->fragment_start_time, NULL);
798       if (splitmux->tc_interval && splitmux->fragment_start_tc
799           && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
800         GST_WARNING_OBJECT (splitmux,
801             "Couldn't calculate next fragment start time for timecode mode");
802       }
803       GST_OBJECT_UNLOCK (splitmux);
804       break;
805     case PROP_SEND_KEYFRAME_REQUESTS:
806       GST_OBJECT_LOCK (splitmux);
807       splitmux->send_keyframe_requests = g_value_get_boolean (value);
808       GST_OBJECT_UNLOCK (splitmux);
809       break;
810     case PROP_MAX_FILES:
811       GST_OBJECT_LOCK (splitmux);
812       splitmux->max_files = g_value_get_uint (value);
813       GST_OBJECT_UNLOCK (splitmux);
814       break;
815     case PROP_MUXER_OVERHEAD:
816       GST_OBJECT_LOCK (splitmux);
817       splitmux->mux_overhead = g_value_get_double (value);
818       GST_OBJECT_UNLOCK (splitmux);
819       break;
820     case PROP_USE_ROBUST_MUXING:
821       GST_OBJECT_LOCK (splitmux);
822       splitmux->use_robust_muxing = g_value_get_boolean (value);
823       GST_OBJECT_UNLOCK (splitmux);
824       if (splitmux->use_robust_muxing)
825         update_muxer_properties (splitmux);
826       break;
827     case PROP_ALIGNMENT_THRESHOLD:
828       GST_OBJECT_LOCK (splitmux);
829       splitmux->alignment_threshold = g_value_get_uint64 (value);
830       GST_OBJECT_UNLOCK (splitmux);
831       break;
832     case PROP_SINK:
833       GST_OBJECT_LOCK (splitmux);
834       gst_clear_object (&splitmux->provided_sink);
835       splitmux->provided_sink = g_value_get_object (value);
836       if (splitmux->provided_sink)
837         gst_object_ref_sink (splitmux->provided_sink);
838       GST_OBJECT_UNLOCK (splitmux);
839       break;
840     case PROP_MUXER:
841       GST_OBJECT_LOCK (splitmux);
842       gst_clear_object (&splitmux->provided_muxer);
843       splitmux->provided_muxer = g_value_get_object (value);
844       if (splitmux->provided_muxer)
845         gst_object_ref_sink (splitmux->provided_muxer);
846       GST_OBJECT_UNLOCK (splitmux);
847       break;
848     case PROP_RESET_MUXER:
849       GST_OBJECT_LOCK (splitmux);
850       splitmux->reset_muxer = g_value_get_boolean (value);
851       GST_OBJECT_UNLOCK (splitmux);
852       break;
853     case PROP_ASYNC_FINALIZE:
854       GST_OBJECT_LOCK (splitmux);
855       splitmux->async_finalize = g_value_get_boolean (value);
856       GST_OBJECT_UNLOCK (splitmux);
857       break;
858     case PROP_MUXER_FACTORY:
859       GST_OBJECT_LOCK (splitmux);
860       if (splitmux->muxer_factory)
861         g_free (splitmux->muxer_factory);
862       splitmux->muxer_factory = g_value_dup_string (value);
863       GST_OBJECT_UNLOCK (splitmux);
864       break;
865     case PROP_MUXER_PRESET:
866       GST_OBJECT_LOCK (splitmux);
867       if (splitmux->muxer_preset)
868         g_free (splitmux->muxer_preset);
869       splitmux->muxer_preset = g_value_dup_string (value);
870       GST_OBJECT_UNLOCK (splitmux);
871       break;
872     case PROP_MUXER_PROPERTIES:
873       GST_OBJECT_LOCK (splitmux);
874       if (splitmux->muxer_properties)
875         gst_structure_free (splitmux->muxer_properties);
876       if (gst_value_get_structure (value))
877         splitmux->muxer_properties =
878             gst_structure_copy (gst_value_get_structure (value));
879       else
880         splitmux->muxer_properties = NULL;
881       GST_OBJECT_UNLOCK (splitmux);
882       break;
883     case PROP_SINK_FACTORY:
884       GST_OBJECT_LOCK (splitmux);
885       if (splitmux->sink_factory)
886         g_free (splitmux->sink_factory);
887       splitmux->sink_factory = g_value_dup_string (value);
888       GST_OBJECT_UNLOCK (splitmux);
889       break;
890     case PROP_SINK_PRESET:
891       GST_OBJECT_LOCK (splitmux);
892       if (splitmux->sink_preset)
893         g_free (splitmux->sink_preset);
894       splitmux->sink_preset = g_value_dup_string (value);
895       GST_OBJECT_UNLOCK (splitmux);
896       break;
897     case PROP_SINK_PROPERTIES:
898       GST_OBJECT_LOCK (splitmux);
899       if (splitmux->sink_properties)
900         gst_structure_free (splitmux->sink_properties);
901       if (gst_value_get_structure (value))
902         splitmux->sink_properties =
903             gst_structure_copy (gst_value_get_structure (value));
904       else
905         splitmux->sink_properties = NULL;
906       GST_OBJECT_UNLOCK (splitmux);
907       break;
908     case PROP_MUXERPAD_MAP:
909     {
910       const GstStructure *s = gst_value_get_structure (value);
911       GST_SPLITMUX_LOCK (splitmux);
912       if (splitmux->muxerpad_map) {
913         gst_structure_free (splitmux->muxerpad_map);
914       }
915       if (s)
916         splitmux->muxerpad_map = gst_structure_copy (s);
917       else
918         splitmux->muxerpad_map = NULL;
919       GST_SPLITMUX_UNLOCK (splitmux);
920       break;
921     }
922     default:
923       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
924       break;
925   }
926 }
927
928 static void
929 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
930     GValue * value, GParamSpec * pspec)
931 {
932   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
933
934   switch (prop_id) {
935     case PROP_LOCATION:
936       GST_OBJECT_LOCK (splitmux);
937       g_value_set_string (value, splitmux->location);
938       GST_OBJECT_UNLOCK (splitmux);
939       break;
940     case PROP_START_INDEX:
941       GST_OBJECT_LOCK (splitmux);
942       g_value_set_int (value, splitmux->start_index);
943       GST_OBJECT_UNLOCK (splitmux);
944       break;
945     case PROP_MAX_SIZE_BYTES:
946       GST_OBJECT_LOCK (splitmux);
947       g_value_set_uint64 (value, splitmux->threshold_bytes);
948       GST_OBJECT_UNLOCK (splitmux);
949       break;
950     case PROP_MAX_SIZE_TIME:
951       GST_OBJECT_LOCK (splitmux);
952       g_value_set_uint64 (value, splitmux->threshold_time);
953       GST_OBJECT_UNLOCK (splitmux);
954       break;
955     case PROP_MAX_SIZE_TIMECODE:
956       GST_OBJECT_LOCK (splitmux);
957       g_value_set_string (value, splitmux->threshold_timecode_str);
958       GST_OBJECT_UNLOCK (splitmux);
959       break;
960     case PROP_SEND_KEYFRAME_REQUESTS:
961       GST_OBJECT_LOCK (splitmux);
962       g_value_set_boolean (value, splitmux->send_keyframe_requests);
963       GST_OBJECT_UNLOCK (splitmux);
964       break;
965     case PROP_MAX_FILES:
966       GST_OBJECT_LOCK (splitmux);
967       g_value_set_uint (value, splitmux->max_files);
968       GST_OBJECT_UNLOCK (splitmux);
969       break;
970     case PROP_MUXER_OVERHEAD:
971       GST_OBJECT_LOCK (splitmux);
972       g_value_set_double (value, splitmux->mux_overhead);
973       GST_OBJECT_UNLOCK (splitmux);
974       break;
975     case PROP_USE_ROBUST_MUXING:
976       GST_OBJECT_LOCK (splitmux);
977       g_value_set_boolean (value, splitmux->use_robust_muxing);
978       GST_OBJECT_UNLOCK (splitmux);
979       break;
980     case PROP_ALIGNMENT_THRESHOLD:
981       GST_OBJECT_LOCK (splitmux);
982       g_value_set_uint64 (value, splitmux->alignment_threshold);
983       GST_OBJECT_UNLOCK (splitmux);
984       break;
985     case PROP_SINK:
986       GST_OBJECT_LOCK (splitmux);
987       g_value_set_object (value, splitmux->provided_sink);
988       GST_OBJECT_UNLOCK (splitmux);
989       break;
990     case PROP_MUXER:
991       GST_OBJECT_LOCK (splitmux);
992       g_value_set_object (value, splitmux->provided_muxer);
993       GST_OBJECT_UNLOCK (splitmux);
994       break;
995     case PROP_RESET_MUXER:
996       GST_OBJECT_LOCK (splitmux);
997       g_value_set_boolean (value, splitmux->reset_muxer);
998       GST_OBJECT_UNLOCK (splitmux);
999       break;
1000     case PROP_ASYNC_FINALIZE:
1001       GST_OBJECT_LOCK (splitmux);
1002       g_value_set_boolean (value, splitmux->async_finalize);
1003       GST_OBJECT_UNLOCK (splitmux);
1004       break;
1005     case PROP_MUXER_FACTORY:
1006       GST_OBJECT_LOCK (splitmux);
1007       g_value_set_string (value, splitmux->muxer_factory);
1008       GST_OBJECT_UNLOCK (splitmux);
1009       break;
1010     case PROP_MUXER_PRESET:
1011       GST_OBJECT_LOCK (splitmux);
1012       g_value_set_string (value, splitmux->muxer_preset);
1013       GST_OBJECT_UNLOCK (splitmux);
1014       break;
1015     case PROP_MUXER_PROPERTIES:
1016       GST_OBJECT_LOCK (splitmux);
1017       gst_value_set_structure (value, splitmux->muxer_properties);
1018       GST_OBJECT_UNLOCK (splitmux);
1019       break;
1020     case PROP_SINK_FACTORY:
1021       GST_OBJECT_LOCK (splitmux);
1022       g_value_set_string (value, splitmux->sink_factory);
1023       GST_OBJECT_UNLOCK (splitmux);
1024       break;
1025     case PROP_SINK_PRESET:
1026       GST_OBJECT_LOCK (splitmux);
1027       g_value_set_string (value, splitmux->sink_preset);
1028       GST_OBJECT_UNLOCK (splitmux);
1029       break;
1030     case PROP_SINK_PROPERTIES:
1031       GST_OBJECT_LOCK (splitmux);
1032       gst_value_set_structure (value, splitmux->sink_properties);
1033       GST_OBJECT_UNLOCK (splitmux);
1034       break;
1035     case PROP_MUXERPAD_MAP:
1036       GST_SPLITMUX_LOCK (splitmux);
1037       gst_value_set_structure (value, splitmux->muxerpad_map);
1038       GST_SPLITMUX_UNLOCK (splitmux);
1039       break;
1040     default:
1041       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1042       break;
1043   }
1044 }
1045
1046 /* Convenience function */
1047 static inline GstClockTimeDiff
1048 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1049 {
1050   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1051
1052   if (GST_CLOCK_TIME_IS_VALID (val)) {
1053     gboolean sign =
1054         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1055     if (sign > 0)
1056       res = val;
1057     else if (sign < 0)
1058       res = -val;
1059   }
1060   return res;
1061 }
1062
1063 static void
1064 mq_stream_ctx_reset (MqStreamCtx * ctx)
1065 {
1066   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1067   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1068   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1069   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1070   g_queue_clear (&ctx->queued_bufs);
1071 }
1072
1073 static MqStreamCtx *
1074 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1075 {
1076   MqStreamCtx *ctx;
1077
1078   ctx = g_new0 (MqStreamCtx, 1);
1079   ctx->splitmux = splitmux;
1080   g_queue_init (&ctx->queued_bufs);
1081   mq_stream_ctx_reset (ctx);
1082
1083   return ctx;
1084 }
1085
1086 static void
1087 mq_stream_ctx_free (MqStreamCtx * ctx)
1088 {
1089   if (ctx->q) {
1090     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1091
1092     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1093
1094     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1095       gst_element_set_locked_state (ctx->q, TRUE);
1096       gst_element_set_state (ctx->q, GST_STATE_NULL);
1097       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1098       gst_object_unref (parent);
1099     }
1100     gst_object_unref (ctx->q);
1101   }
1102   gst_object_unref (ctx->sinkpad);
1103   gst_object_unref (ctx->srcpad);
1104   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1105   g_queue_clear (&ctx->queued_bufs);
1106   g_free (ctx);
1107 }
1108
1109 static void
1110 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1111     GstElement * sink)
1112 {
1113   gchar *location = NULL;
1114   GstMessage *msg;
1115   const gchar *msg_name = opened ?
1116       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1117   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1118
1119   if (!opened) {
1120     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1121     if (rtime)
1122       running_time = *rtime;
1123   }
1124
1125   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1126           "location") != NULL)
1127     g_object_get (sink, "location", &location, NULL);
1128
1129   GST_DEBUG_OBJECT (splitmux,
1130       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1131       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1132
1133   /* If it's in the middle of a teardown, the reference_ctc might have become
1134    * NULL */
1135   if (splitmux->reference_ctx) {
1136     msg = gst_message_new_element (GST_OBJECT (splitmux),
1137         gst_structure_new (msg_name,
1138             "location", G_TYPE_STRING, location,
1139             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1140             "sink", GST_TYPE_ELEMENT, sink, NULL));
1141     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1142   }
1143
1144   g_free (location);
1145 }
1146
1147 static void
1148 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1149 {
1150   GstEvent *eos;
1151   GstPad *pad;
1152   MqStreamCtx *ctx;
1153
1154   eos = gst_event_new_eos ();
1155   pad = helper->pad;
1156   ctx = helper->ctx;
1157
1158   GST_SPLITMUX_LOCK (splitmux);
1159   if (!pad)
1160     pad = gst_pad_get_peer (ctx->srcpad);
1161   GST_SPLITMUX_UNLOCK (splitmux);
1162
1163   gst_pad_send_event (pad, eos);
1164   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1165
1166   gst_object_unref (pad);
1167   g_free (helper);
1168 }
1169
1170 /* Called with lock held, drops the lock to send EOS to the
1171  * pad
1172  */
1173 static void
1174 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1175 {
1176   GstEvent *eos;
1177   GstPad *pad;
1178
1179   eos = gst_event_new_eos ();
1180   pad = gst_pad_get_peer (ctx->srcpad);
1181
1182   ctx->out_eos = TRUE;
1183
1184   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1185   GST_SPLITMUX_UNLOCK (splitmux);
1186   gst_pad_send_event (pad, eos);
1187   GST_SPLITMUX_LOCK (splitmux);
1188
1189   gst_object_unref (pad);
1190 }
1191
1192 /* Called with lock held. Schedules an EOS event to the ctx pad
1193  * to happen in another thread */
1194 static void
1195 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1196 {
1197   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1198   GstPad *srcpad, *sinkpad;
1199
1200   srcpad = ctx->srcpad;
1201   sinkpad = gst_pad_get_peer (srcpad);
1202
1203   helper->ctx = ctx;
1204   helper->pad = sinkpad;        /* Takes the reference */
1205
1206   ctx->out_eos_async_done = TRUE;
1207
1208   /* There used to be a bug here, where we had to explicitly remove
1209    * the SINK flag so that GstBin would ignore it for EOS purposes.
1210    * That fixed a race where if splitmuxsink really reaches EOS
1211    * before an asynchronous background element has finished, then
1212    * the bin wouldn't actually send EOS to the pipeline. Even after
1213    * finishing and removing the old element, the bin didn't re-check
1214    * EOS status on removing a SINK element. That bug was fixed
1215    * in core. */
1216   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1217       sinkpad, ctx);
1218
1219   g_assert_nonnull (helper->pad);
1220   gst_element_call_async (GST_ELEMENT (splitmux),
1221       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1222 }
1223
1224 /* Called with lock held. TRUE iff all contexts have a
1225  * pending (or delivered) async eos event */
1226 static gboolean
1227 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1228 {
1229   gboolean ret = TRUE;
1230   GList *item;
1231
1232   for (item = splitmux->contexts; item; item = item->next) {
1233     MqStreamCtx *ctx = item->data;
1234     ret &= ctx->out_eos_async_done;
1235   }
1236   return ret;
1237 }
1238
1239 /* Called with splitmux lock held to check if this output
1240  * context needs to sleep to wait for the release of the
1241  * next GOP, or to send EOS to close out the current file
1242  */
1243 static GstFlowReturn
1244 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1245 {
1246   if (ctx->caps_change)
1247     return GST_FLOW_OK;
1248
1249   do {
1250     /* When first starting up, the reference stream has to output
1251      * the first buffer to prepare the muxer and sink */
1252     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1253     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1254
1255     if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1256         && my_max_out_running_time != G_MAXINT64) {
1257       my_max_out_running_time -= splitmux->alignment_threshold;
1258       GST_LOG_OBJECT (ctx->srcpad,
1259           "Max out running time currently %" GST_STIME_FORMAT
1260           ", with threshold applied it is %" GST_STIME_FORMAT,
1261           GST_STIME_ARGS (splitmux->max_out_running_time),
1262           GST_STIME_ARGS (my_max_out_running_time));
1263     }
1264
1265     if (ctx->flushing
1266         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1267       return GST_FLOW_FLUSHING;
1268
1269     GST_LOG_OBJECT (ctx->srcpad,
1270         "Checking running time %" GST_STIME_FORMAT " against max %"
1271         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1272         GST_STIME_ARGS (my_max_out_running_time));
1273
1274     if (can_output) {
1275       if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1276           ctx->out_running_time < my_max_out_running_time) {
1277         return GST_FLOW_OK;
1278       }
1279
1280       switch (splitmux->output_state) {
1281         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1282           /* We only get here if we've finished outputting a GOP and need to know
1283            * what to do next */
1284           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1285           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1286           continue;
1287
1288         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1289         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1290           /* We've reached the max out running_time to get here, so end this file now */
1291           if (ctx->out_eos == FALSE) {
1292             if (splitmux->async_finalize) {
1293               /* We must set EOS asynchronously at this point. We cannot defer
1294                * it, because we need all contexts to wake up, for the
1295                * reference context to eventually give us something at
1296                * START_NEXT_FILE. Otherwise, collectpads might choose another
1297                * context to give us the first buffer, and format-location-full
1298                * will not contain a valid sample. */
1299               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1300                   GINT_TO_POINTER (1));
1301               eos_context_async (ctx, splitmux);
1302               if (all_contexts_are_async_eos (splitmux)) {
1303                 GST_INFO_OBJECT (splitmux,
1304                     "All contexts are async_eos. Moving to the next file.");
1305                 /* We can start the next file once we've asked each pad to go EOS */
1306                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1307                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1308                 continue;
1309               }
1310             } else {
1311               send_eos (splitmux, ctx);
1312               continue;
1313             }
1314           } else {
1315             GST_INFO_OBJECT (splitmux,
1316                 "At end-of-file state, but context %p is already EOS", ctx);
1317           }
1318           break;
1319         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1320           if (ctx->is_reference) {
1321             GstFlowReturn ret = GST_FLOW_OK;
1322
1323             /* Special handling on the reference ctx to start new fragments
1324              * and collect commands from the command queue */
1325             /* drops the splitmux lock briefly: */
1326             /* We must have reference ctx in order for format-location-full to
1327              * have a sample */
1328             ret = start_next_fragment (splitmux, ctx);
1329             if (ret != GST_FLOW_OK)
1330               return ret;
1331
1332             continue;
1333           }
1334           break;
1335         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1336           do {
1337             SplitMuxOutputCommand *cmd =
1338                 g_queue_pop_tail (&splitmux->out_cmd_q);
1339             if (cmd != NULL) {
1340               /* If we pop the last command, we need to make our queues bigger */
1341               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1342                 grow_blocked_queues (splitmux);
1343
1344               if (cmd->start_new_fragment) {
1345                 if (splitmux->muxed_out_bytes > 0) {
1346                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1347                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1348                 } else {
1349                   GST_DEBUG_OBJECT (splitmux,
1350                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1351                 }
1352               } else {
1353                 GST_DEBUG_OBJECT (splitmux,
1354                     "Got new output cmd for time %" GST_STIME_FORMAT,
1355                     GST_STIME_ARGS (cmd->max_output_ts));
1356
1357                 /* Extend the output range immediately */
1358                 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1359                     || cmd->max_output_ts > splitmux->max_out_running_time)
1360                   splitmux->max_out_running_time = cmd->max_output_ts;
1361                 GST_DEBUG_OBJECT (splitmux,
1362                     "Max out running time now %" GST_STIME_FORMAT,
1363                     GST_STIME_ARGS (splitmux->max_out_running_time));
1364                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1365               }
1366               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1367
1368               out_cmd_buf_free (cmd);
1369               break;
1370             } else {
1371               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1372             }
1373           } while (!ctx->flushing && splitmux->output_state ==
1374               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1375           /* loop and re-check the state */
1376           continue;
1377         }
1378         case SPLITMUX_OUTPUT_STATE_STOPPED:
1379           return GST_FLOW_FLUSHING;
1380       }
1381     } else {
1382       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1383     }
1384
1385     GST_INFO_OBJECT (ctx->srcpad,
1386         "Sleeping for running time %"
1387         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1388         GST_STIME_ARGS (ctx->out_running_time),
1389         GST_STIME_ARGS (splitmux->max_out_running_time));
1390     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1391     GST_INFO_OBJECT (ctx->srcpad,
1392         "Woken for new max running time %" GST_STIME_FORMAT,
1393         GST_STIME_ARGS (splitmux->max_out_running_time));
1394   }
1395   while (1);
1396
1397   return GST_FLOW_OK;
1398 }
1399
1400 static GstClockTime
1401 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1402     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1403     GstVideoTimeCode ** next_tc)
1404 {
1405   GstVideoTimeCode *target_tc;
1406   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1407
1408   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1409     return GST_CLOCK_TIME_NONE;
1410
1411   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1412   if (!target_tc) {
1413     GST_ELEMENT_ERROR (splitmux,
1414         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1415     return GST_CLOCK_TIME_NONE;
1416   }
1417
1418   /* Convert to ns */
1419   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1420   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1421
1422   /* Add running_time, accounting for wraparound. */
1423   if (target_tc_time >= cur_tc_time) {
1424     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1425   } else {
1426     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1427
1428     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1429         (cur_tc->config.fps_d == 1001)) {
1430       /* Checking fps_d is probably unneeded, but better safe than sorry
1431        * (e.g. someone accidentally set a flag) */
1432       GstVideoTimeCode *tc_for_offset;
1433
1434       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1435        * but slightly less. Calculate that duration from a fake timecode. The
1436        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1437        * is to add one frame to 23:59:59;29 */
1438       tc_for_offset =
1439           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1440           NULL, cur_tc->config.flags, 23, 59, 59,
1441           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1442       day_in_ns =
1443           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1444           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1445           cur_tc->config.fps_n);
1446       gst_video_time_code_free (tc_for_offset);
1447     }
1448     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1449   }
1450
1451 #ifndef GST_DISABLE_GST_DEBUG
1452   {
1453     gchar *next_max_tc_str, *cur_tc_str;
1454
1455     cur_tc_str = gst_video_time_code_to_string (cur_tc);
1456     next_max_tc_str = gst_video_time_code_to_string (target_tc);
1457
1458     GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1459         " from ref timecode %s time: %" GST_TIME_FORMAT,
1460         next_max_tc_str,
1461         GST_TIME_ARGS (next_max_tc_time),
1462         cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1463
1464     g_free (next_max_tc_str);
1465     g_free (cur_tc_str);
1466   }
1467 #endif
1468
1469   if (next_tc)
1470     *next_tc = target_tc;
1471   else
1472     gst_video_time_code_free (target_tc);
1473
1474   return next_max_tc_time;
1475 }
1476
1477 static gboolean
1478 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1479     GstClockTimeDiff running_time_dts)
1480 {
1481   GstEvent *ev;
1482   GstClockTime target_time;
1483   gboolean timecode_based = FALSE;
1484   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1485   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1486   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1487   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1488   InputGop *newest_gop = NULL;
1489   GList *l;
1490
1491   if (!splitmux->send_keyframe_requests)
1492     return TRUE;
1493
1494   /* Find the newest GOP where we passed in DTS the start PTS */
1495   for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1496     InputGop *tmp = l->data;
1497
1498     GST_TRACE_OBJECT (splitmux,
1499         "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1500         " and start time %" GST_STIME_FORMAT,
1501         GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1502
1503     if (tmp->sent_fku) {
1504       GST_DEBUG_OBJECT (splitmux,
1505           "Already checked for a keyframe request for this GOP");
1506       return TRUE;
1507     }
1508
1509     if (running_time_dts == GST_CLOCK_STIME_NONE ||
1510         tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1511         running_time_dts >= tmp->start_time_pts) {
1512       GST_DEBUG_OBJECT (splitmux,
1513           "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1514           GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1515           GST_STIME_ARGS (tmp->start_time));
1516       newest_gop = tmp;
1517       break;
1518     }
1519   }
1520
1521   if (!newest_gop) {
1522     GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1523     return TRUE;
1524   }
1525
1526   if (splitmux->tc_interval) {
1527     if (newest_gop->start_tc
1528         && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1529       GstVideoTimeCode *next_tc = NULL;
1530       max_tc_time =
1531           calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1532           newest_gop->start_time, &next_tc);
1533
1534       /* calculate the next expected keyframe time to prevent too early fku
1535        * event */
1536       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1537         next_max_tc_time =
1538             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1539       }
1540       if (next_tc)
1541         gst_video_time_code_free (next_tc);
1542
1543       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1544           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1545
1546       if (!timecode_based) {
1547         GST_WARNING_OBJECT (splitmux,
1548             "Couldn't calculate maximum fragment time for timecode mode");
1549       }
1550     } else {
1551       /* This can happen in the presence of GAP events that trigger
1552        * a new fragment start */
1553       GST_WARNING_OBJECT (splitmux,
1554           "No buffer available to calculate next timecode");
1555     }
1556   }
1557
1558   if ((splitmux->threshold_time == 0 && !timecode_based)
1559       || splitmux->threshold_bytes != 0)
1560     return TRUE;
1561
1562   if (timecode_based) {
1563     /* We might have rounding errors: aim slightly earlier */
1564     if (max_tc_time >= tc_rounding_error) {
1565       target_time = max_tc_time - tc_rounding_error;
1566     } else {
1567       /* unreliable target time */
1568       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1569           " is smaller than allowed rounding error, set it to zero",
1570           GST_TIME_ARGS (max_tc_time));
1571       target_time = 0;
1572     }
1573
1574     if (next_max_tc_time >= tc_rounding_error) {
1575       next_fku_time = next_max_tc_time - tc_rounding_error;
1576     } else {
1577       /* unreliable target time */
1578       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1579           " is smaller than allowed rounding error, set it to zero",
1580           GST_TIME_ARGS (next_max_tc_time));
1581       next_fku_time = 0;
1582     }
1583   } else {
1584     target_time = newest_gop->start_time + splitmux->threshold_time;
1585   }
1586
1587   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1588     GstClockTime allowed_time = splitmux->next_fku_time;
1589
1590     if (timecode_based) {
1591       if (allowed_time >= tc_rounding_error) {
1592         allowed_time -= tc_rounding_error;
1593       } else {
1594         /* unreliable next force key unit time */
1595         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1596             GST_TIME_FORMAT
1597             " is smaller than allowed rounding error, set it to zero",
1598             GST_TIME_ARGS (splitmux->next_fku_time));
1599         allowed_time = 0;
1600       }
1601     }
1602
1603     if (target_time < allowed_time) {
1604       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1605           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1606           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1607           GST_TIME_ARGS (target_time),
1608           GST_TIME_ARGS (splitmux->next_fku_time),
1609           GST_TIME_ARGS (allowed_time));
1610
1611       return TRUE;
1612     } else if (allowed_time != splitmux->next_fku_time &&
1613         target_time < splitmux->next_fku_time) {
1614       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1615           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1616           ", but the difference is smaller than allowed rounding error",
1617           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1618     }
1619   }
1620
1621   if (!timecode_based) {
1622     next_fku_time = target_time + splitmux->threshold_time;
1623   }
1624
1625   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1626       ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1627       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1628
1629   newest_gop->sent_fku = TRUE;
1630
1631   splitmux->next_fku_time = next_fku_time;
1632   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1633
1634   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1635 }
1636
1637 static GstPadProbeReturn
1638 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1639 {
1640   GstSplitMuxSink *splitmux = ctx->splitmux;
1641   MqStreamBuf *buf_info = NULL;
1642   GstFlowReturn ret = GST_FLOW_OK;
1643
1644   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1645
1646   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1647   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1648     g_warning ("Buffer list handling not implemented");
1649     return GST_PAD_PROBE_DROP;
1650   }
1651   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1652       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1653     GstEvent *event = gst_pad_probe_info_get_event (info);
1654     gboolean locked = FALSE, wait = !ctx->is_reference;
1655
1656     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1657
1658     switch (GST_EVENT_TYPE (event)) {
1659       case GST_EVENT_SEGMENT:
1660         gst_event_copy_segment (event, &ctx->out_segment);
1661         break;
1662       case GST_EVENT_FLUSH_STOP:
1663         GST_SPLITMUX_LOCK (splitmux);
1664         locked = TRUE;
1665         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1666         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1667         g_queue_clear (&ctx->queued_bufs);
1668         g_queue_clear (&ctx->queued_bufs);
1669         /* If this is the reference context, we just threw away any queued keyframes */
1670         if (ctx->is_reference)
1671           splitmux->queued_keyframes = 0;
1672         ctx->flushing = FALSE;
1673         wait = FALSE;
1674         break;
1675       case GST_EVENT_FLUSH_START:
1676         GST_SPLITMUX_LOCK (splitmux);
1677         locked = TRUE;
1678         GST_LOG_OBJECT (pad, "Flush start");
1679         ctx->flushing = TRUE;
1680         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1681         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1682         break;
1683       case GST_EVENT_EOS:
1684         GST_SPLITMUX_LOCK (splitmux);
1685         locked = TRUE;
1686         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1687           goto beach;
1688         ctx->out_eos = TRUE;
1689
1690         if (ctx == splitmux->reference_ctx) {
1691           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1692           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1693         }
1694
1695         GST_INFO_OBJECT (splitmux,
1696             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1697         break;
1698       case GST_EVENT_GAP:{
1699         GstClockTime gap_ts;
1700         GstClockTimeDiff rtime;
1701
1702         gst_event_parse_gap (event, &gap_ts, NULL);
1703         if (gap_ts == GST_CLOCK_TIME_NONE)
1704           break;
1705
1706         GST_SPLITMUX_LOCK (splitmux);
1707         locked = TRUE;
1708
1709         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1710           goto beach;
1711
1712         /* When we get a gap event on the
1713          * reference stream and we're trying to open a
1714          * new file, we need to store it until we get
1715          * the buffer afterwards
1716          */
1717         if (ctx->is_reference &&
1718             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1719           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1720           gst_event_replace (&ctx->pending_gap, event);
1721           GST_SPLITMUX_UNLOCK (splitmux);
1722           return GST_PAD_PROBE_HANDLED;
1723         }
1724
1725         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1726
1727         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1728             GST_STIME_ARGS (rtime));
1729
1730         if (rtime != GST_CLOCK_STIME_NONE) {
1731           ctx->out_running_time = rtime;
1732           complete_or_wait_on_out (splitmux, ctx);
1733         }
1734         break;
1735       }
1736       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1737         const GstStructure *s;
1738         GstClockTimeDiff ts = 0;
1739
1740         s = gst_event_get_structure (event);
1741         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1742           break;
1743
1744         gst_structure_get_int64 (s, "timestamp", &ts);
1745
1746         GST_SPLITMUX_LOCK (splitmux);
1747         locked = TRUE;
1748
1749         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1750           goto beach;
1751         ctx->out_running_time = ts;
1752         if (!ctx->is_reference)
1753           ret = complete_or_wait_on_out (splitmux, ctx);
1754         GST_SPLITMUX_UNLOCK (splitmux);
1755         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1756         return GST_PAD_PROBE_DROP;
1757       }
1758       case GST_EVENT_CAPS:{
1759         GstPad *peer;
1760
1761         if (!ctx->is_reference)
1762           break;
1763
1764         peer = gst_pad_get_peer (pad);
1765         if (peer) {
1766           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1767
1768           gst_object_unref (peer);
1769
1770           if (ok)
1771             break;
1772
1773         } else {
1774           break;
1775         }
1776         /* This is in the case the muxer doesn't allow this change of caps */
1777         GST_SPLITMUX_LOCK (splitmux);
1778         locked = TRUE;
1779         ctx->caps_change = TRUE;
1780
1781         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1782           GST_DEBUG_OBJECT (splitmux,
1783               "New caps were not accepted. Switching output file");
1784           if (ctx->out_eos == FALSE) {
1785             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1786             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1787           }
1788         }
1789
1790         /* Lets it fall through, if it fails again, then the muxer just can't
1791          * support this format, but at least we have a closed file.
1792          */
1793         break;
1794       }
1795       default:
1796         break;
1797     }
1798
1799     /* We need to make sure events aren't passed
1800      * until the muxer / sink are ready for it */
1801     if (!locked)
1802       GST_SPLITMUX_LOCK (splitmux);
1803     if (wait)
1804       ret = complete_or_wait_on_out (splitmux, ctx);
1805     GST_SPLITMUX_UNLOCK (splitmux);
1806
1807     /* Don't try to forward sticky events before the next buffer is there
1808      * because it would cause a new file to be created without the first
1809      * buffer being available.
1810      */
1811     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1812     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1813       gst_event_unref (event);
1814       return GST_PAD_PROBE_HANDLED;
1815     } else {
1816       return GST_PAD_PROBE_PASS;
1817     }
1818   }
1819
1820   /* Allow everything through until the configured next stopping point */
1821   GST_SPLITMUX_LOCK (splitmux);
1822
1823   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1824   if (buf_info == NULL) {
1825     /* Can only happen due to a poorly timed flush */
1826     ret = GST_FLOW_FLUSHING;
1827     goto beach;
1828   }
1829
1830   /* If we have popped a keyframe, decrement the queued_gop count */
1831   if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1832     splitmux->queued_keyframes--;
1833
1834   ctx->out_running_time = buf_info->run_ts;
1835   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1836
1837   GST_LOG_OBJECT (splitmux,
1838       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1839       " size %" G_GUINT64_FORMAT,
1840       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1841
1842   ctx->caps_change = FALSE;
1843
1844   ret = complete_or_wait_on_out (splitmux, ctx);
1845
1846   splitmux->muxed_out_bytes += buf_info->buf_size;
1847
1848 #ifndef GST_DISABLE_GST_DEBUG
1849   {
1850     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1851     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1852         " run ts %" GST_STIME_FORMAT, buf,
1853         GST_STIME_ARGS (ctx->out_running_time));
1854   }
1855 #endif
1856
1857   ctx->cur_out_buffer = NULL;
1858   GST_SPLITMUX_UNLOCK (splitmux);
1859
1860   /* pending_gap is protected by the STREAM lock */
1861   if (ctx->pending_gap) {
1862     /* If we previously stored a gap event, send it now */
1863     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1864
1865     GST_DEBUG_OBJECT (splitmux,
1866         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1867
1868     gst_pad_send_event (peer, ctx->pending_gap);
1869     ctx->pending_gap = NULL;
1870
1871     gst_object_unref (peer);
1872   }
1873
1874   mq_stream_buf_free (buf_info);
1875
1876   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1877   return GST_PAD_PROBE_PASS;
1878
1879 beach:
1880   GST_SPLITMUX_UNLOCK (splitmux);
1881   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1882   return GST_PAD_PROBE_DROP;
1883 }
1884
1885 static gboolean
1886 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1887 {
1888   return gst_pad_send_event (peer, gst_event_ref (*event));
1889 }
1890
1891 static void
1892 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1893 {
1894   if (ctx->fragment_block_id > 0) {
1895     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1896     ctx->fragment_block_id = 0;
1897   }
1898 }
1899
1900 static void
1901 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1902 {
1903   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1904
1905   gst_pad_sticky_events_foreach (ctx->srcpad,
1906       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1907
1908   /* Clear EOS flag if not actually EOS */
1909   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1910   ctx->out_eos_async_done = ctx->out_eos;
1911
1912   gst_object_unref (peer);
1913 }
1914
1915 static void
1916 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1917 {
1918   GstPad *sinkpad, *srcpad, *newpad;
1919   GstPadTemplate *templ;
1920
1921   srcpad = ctx->srcpad;
1922   sinkpad = gst_pad_get_peer (srcpad);
1923
1924   templ = sinkpad->padtemplate;
1925   newpad =
1926       gst_element_request_pad (splitmux->muxer, templ,
1927       GST_PAD_NAME (sinkpad), NULL);
1928
1929   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1930       newpad);
1931   if (!gst_pad_unlink (srcpad, sinkpad)) {
1932     gst_object_unref (sinkpad);
1933     goto fail;
1934   }
1935   if (gst_pad_link_full (srcpad, newpad,
1936           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1937     gst_element_release_request_pad (splitmux->muxer, newpad);
1938     gst_object_unref (sinkpad);
1939     gst_object_unref (newpad);
1940     goto fail;
1941   }
1942   gst_object_unref (newpad);
1943   gst_object_unref (sinkpad);
1944   return;
1945
1946 fail:
1947   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1948       ("Could not create the new muxer/sink"), NULL);
1949 }
1950
1951 static GstPadProbeReturn
1952 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1953 {
1954   return GST_PAD_PROBE_OK;
1955 }
1956
1957 static void
1958 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1959 {
1960   ctx->fragment_block_id =
1961       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1962       NULL, NULL);
1963 }
1964
1965 static gboolean
1966 _set_property_from_structure (GQuark field_id, const GValue * value,
1967     gpointer user_data)
1968 {
1969   const gchar *property_name = g_quark_to_string (field_id);
1970   GObject *element = G_OBJECT (user_data);
1971
1972   g_object_set_property (element, property_name, value);
1973
1974   return TRUE;
1975 }
1976
1977 static void
1978 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1979 {
1980   gst_element_set_locked_state (element, TRUE);
1981   gst_element_set_state (element, GST_STATE_NULL);
1982   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1983   gst_bin_remove (GST_BIN (splitmux), element);
1984 }
1985
1986
1987 static void
1988 _send_event (const GValue * value, gpointer user_data)
1989 {
1990   GstPad *pad = g_value_get_object (value);
1991   GstEvent *ev = user_data;
1992
1993   gst_pad_send_event (pad, gst_event_ref (ev));
1994 }
1995
1996 /* Called with lock held when a fragment
1997  * reaches EOS and it is time to restart
1998  * a new fragment
1999  */
2000 static GstFlowReturn
2001 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2002 {
2003   GstElement *muxer, *sink;
2004
2005   g_assert (ctx->is_reference);
2006
2007   /* 1 change to new file */
2008   splitmux->switching_fragment = TRUE;
2009
2010   /* We need to drop the splitmux lock to acquire the state lock
2011    * here and ensure there's no racy state change going on elsewhere */
2012   muxer = gst_object_ref (splitmux->muxer);
2013   sink = gst_object_ref (splitmux->active_sink);
2014
2015   GST_SPLITMUX_UNLOCK (splitmux);
2016   GST_SPLITMUX_STATE_LOCK (splitmux);
2017
2018   if (splitmux->shutdown) {
2019     GST_DEBUG_OBJECT (splitmux,
2020         "Shutdown requested. Aborting fragment switch.");
2021     GST_SPLITMUX_LOCK (splitmux);
2022     GST_SPLITMUX_STATE_UNLOCK (splitmux);
2023     gst_object_unref (muxer);
2024     gst_object_unref (sink);
2025     return GST_FLOW_FLUSHING;
2026   }
2027
2028   if (splitmux->async_finalize) {
2029     if (splitmux->muxed_out_bytes > 0
2030         || splitmux->fragment_id != splitmux->start_index) {
2031       gchar *newname;
2032       GstElement *new_sink, *new_muxer;
2033
2034       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2035           splitmux->fragment_id);
2036       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2037       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2038       GST_SPLITMUX_LOCK (splitmux);
2039       if ((splitmux->sink =
2040               create_element (splitmux, splitmux->sink_factory, newname,
2041                   TRUE)) == NULL)
2042         goto fail;
2043       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2044         gst_preset_load_preset (GST_PRESET (splitmux->sink),
2045             splitmux->sink_preset);
2046       if (splitmux->sink_properties)
2047         gst_structure_foreach (splitmux->sink_properties,
2048             _set_property_from_structure, splitmux->sink);
2049       splitmux->active_sink = splitmux->sink;
2050       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2051       g_free (newname);
2052       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2053       if ((splitmux->muxer =
2054               create_element (splitmux, splitmux->muxer_factory, newname,
2055                   TRUE)) == NULL)
2056         goto fail;
2057       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2058               "async") != NULL) {
2059         /* async child elements are causing state change races and weird
2060          * failures, so let's try and turn that off */
2061         g_object_set (splitmux->sink, "async", FALSE, NULL);
2062       }
2063       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2064         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2065             splitmux->muxer_preset);
2066       if (splitmux->muxer_properties)
2067         gst_structure_foreach (splitmux->muxer_properties,
2068             _set_property_from_structure, splitmux->muxer);
2069       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2070       g_free (newname);
2071       new_sink = splitmux->sink;
2072       new_muxer = splitmux->muxer;
2073       GST_SPLITMUX_UNLOCK (splitmux);
2074       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2075       gst_element_link (new_muxer, new_sink);
2076
2077       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2078         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2079                     EOS_FROM_US)) == 2) {
2080           _lock_and_set_to_null (muxer, splitmux);
2081           _lock_and_set_to_null (sink, splitmux);
2082         } else {
2083           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2084               GINT_TO_POINTER (2));
2085         }
2086       }
2087       gst_object_unref (muxer);
2088       gst_object_unref (sink);
2089       muxer = new_muxer;
2090       sink = new_sink;
2091       gst_object_ref (muxer);
2092       gst_object_ref (sink);
2093     }
2094   } else {
2095
2096     gst_element_set_locked_state (muxer, TRUE);
2097     gst_element_set_locked_state (sink, TRUE);
2098     gst_element_set_state (sink, GST_STATE_NULL);
2099
2100     if (splitmux->reset_muxer) {
2101       gst_element_set_state (muxer, GST_STATE_NULL);
2102     } else {
2103       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2104       GstEvent *ev;
2105       guint32 seqnum;
2106
2107       ev = gst_event_new_flush_start ();
2108       seqnum = gst_event_get_seqnum (ev);
2109       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110       gst_event_unref (ev);
2111
2112       gst_iterator_resync (it);
2113
2114       ev = gst_event_new_flush_stop (TRUE);
2115       gst_event_set_seqnum (ev, seqnum);
2116       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2117       gst_event_unref (ev);
2118
2119       gst_iterator_free (it);
2120     }
2121   }
2122
2123   GST_SPLITMUX_LOCK (splitmux);
2124   set_next_filename (splitmux, ctx);
2125   splitmux->muxed_out_bytes = 0;
2126   GST_SPLITMUX_UNLOCK (splitmux);
2127
2128   if (gst_element_set_state (sink,
2129           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2130     gst_element_set_state (sink, GST_STATE_NULL);
2131     gst_element_set_locked_state (muxer, FALSE);
2132     gst_element_set_locked_state (sink, FALSE);
2133
2134     goto fail_output;
2135   }
2136
2137   if (gst_element_set_state (muxer,
2138           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2139     gst_element_set_state (muxer, GST_STATE_NULL);
2140     gst_element_set_state (sink, GST_STATE_NULL);
2141     gst_element_set_locked_state (muxer, FALSE);
2142     gst_element_set_locked_state (sink, FALSE);
2143     goto fail_muxer;
2144   }
2145
2146   gst_element_set_locked_state (muxer, FALSE);
2147   gst_element_set_locked_state (sink, FALSE);
2148
2149   gst_object_unref (sink);
2150   gst_object_unref (muxer);
2151
2152   GST_SPLITMUX_LOCK (splitmux);
2153   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2154   splitmux->switching_fragment = FALSE;
2155   do_async_done (splitmux);
2156
2157   splitmux->ready_for_output = TRUE;
2158
2159   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2160   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2161
2162   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2163
2164   /* FIXME: Is this always the correct next state? */
2165   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2166   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2167   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2168   return GST_FLOW_OK;
2169
2170 fail:
2171   gst_object_unref (sink);
2172   gst_object_unref (muxer);
2173
2174   GST_SPLITMUX_LOCK (splitmux);
2175   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2176   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2177       ("Could not create the new muxer/sink"), NULL);
2178   return GST_FLOW_ERROR;
2179
2180 fail_output:
2181   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2182       ("Could not start new output sink"), NULL);
2183   gst_object_unref (sink);
2184   gst_object_unref (muxer);
2185
2186   GST_SPLITMUX_LOCK (splitmux);
2187   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2188   splitmux->switching_fragment = FALSE;
2189   return GST_FLOW_ERROR;
2190
2191 fail_muxer:
2192   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2193       ("Could not start new muxer"), NULL);
2194   gst_object_unref (sink);
2195   gst_object_unref (muxer);
2196
2197   GST_SPLITMUX_LOCK (splitmux);
2198   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2199   splitmux->switching_fragment = FALSE;
2200   return GST_FLOW_ERROR;
2201 }
2202
2203 static void
2204 bus_handler (GstBin * bin, GstMessage * message)
2205 {
2206   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2207
2208   switch (GST_MESSAGE_TYPE (message)) {
2209     case GST_MESSAGE_EOS:{
2210       /* If the state is draining out the current file, drop this EOS */
2211       GstElement *sink;
2212
2213       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2214       GST_SPLITMUX_LOCK (splitmux);
2215
2216       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2217
2218       if (splitmux->async_finalize) {
2219
2220         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2221           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2222                       EOS_FROM_US)) == 2) {
2223             GstElement *muxer;
2224             GstPad *sinksink, *muxersrc;
2225
2226             sinksink = gst_element_get_static_pad (sink, "sink");
2227             muxersrc = gst_pad_get_peer (sinksink);
2228             muxer = gst_pad_get_parent_element (muxersrc);
2229             gst_object_unref (sinksink);
2230             gst_object_unref (muxersrc);
2231
2232             gst_element_call_async (muxer,
2233                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2234                 gst_object_ref (splitmux), gst_object_unref);
2235             gst_element_call_async (sink,
2236                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2237                 gst_object_ref (splitmux), gst_object_unref);
2238             gst_object_unref (muxer);
2239           } else {
2240             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2241                 GINT_TO_POINTER (2));
2242           }
2243           GST_DEBUG_OBJECT (splitmux,
2244               "Caught async EOS from previous muxer+sink. Dropping.");
2245           /* We forward the EOS so that it gets aggregated as normal. If the sink
2246            * finishes and is removed before the end, it will be de-aggregated */
2247           gst_message_unref (message);
2248           GST_SPLITMUX_UNLOCK (splitmux);
2249           return;
2250         }
2251       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2252         GST_DEBUG_OBJECT (splitmux,
2253             "Passing EOS message. Output state %d max_out_running_time %"
2254             GST_STIME_FORMAT, splitmux->output_state,
2255             GST_STIME_ARGS (splitmux->max_out_running_time));
2256       } else {
2257         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2258         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2259         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2260
2261         gst_message_unref (message);
2262         GST_SPLITMUX_UNLOCK (splitmux);
2263         return;
2264       }
2265       GST_SPLITMUX_UNLOCK (splitmux);
2266       break;
2267     }
2268     case GST_MESSAGE_ASYNC_START:
2269     case GST_MESSAGE_ASYNC_DONE:
2270       /* Ignore state changes from our children while switching */
2271       GST_SPLITMUX_LOCK (splitmux);
2272       if (splitmux->switching_fragment) {
2273         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2274             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2275           GST_LOG_OBJECT (splitmux,
2276               "Ignoring state change from child %" GST_PTR_FORMAT
2277               " while switching", GST_MESSAGE_SRC (message));
2278           gst_message_unref (message);
2279           GST_SPLITMUX_UNLOCK (splitmux);
2280           return;
2281         }
2282       }
2283       GST_SPLITMUX_UNLOCK (splitmux);
2284       break;
2285     case GST_MESSAGE_WARNING:
2286     {
2287       GError *gerror = NULL;
2288
2289       gst_message_parse_warning (message, &gerror, NULL);
2290
2291       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2292         GList *item;
2293         gboolean caps_change = FALSE;
2294
2295         GST_SPLITMUX_LOCK (splitmux);
2296
2297         for (item = splitmux->contexts; item; item = item->next) {
2298           MqStreamCtx *ctx = item->data;
2299
2300           if (ctx->caps_change) {
2301             caps_change = TRUE;
2302             break;
2303           }
2304         }
2305
2306         GST_SPLITMUX_UNLOCK (splitmux);
2307
2308         if (caps_change) {
2309           GST_LOG_OBJECT (splitmux,
2310               "Ignoring warning change from child %" GST_PTR_FORMAT
2311               " while switching caps", GST_MESSAGE_SRC (message));
2312           gst_message_unref (message);
2313           return;
2314         }
2315       }
2316       break;
2317     }
2318     default:
2319       break;
2320   }
2321
2322   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2323 }
2324
2325 static void
2326 ctx_set_unblock (MqStreamCtx * ctx)
2327 {
2328   ctx->need_unblock = TRUE;
2329 }
2330
2331 static gboolean
2332 need_new_fragment (GstSplitMuxSink * splitmux,
2333     GstClockTime queued_time, GstClockTime queued_gop_time,
2334     guint64 queued_bytes)
2335 {
2336   guint64 thresh_bytes;
2337   GstClockTime thresh_time;
2338   gboolean check_robust_muxing;
2339   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2340   GstClockTime *ptr_to_time;
2341   const InputGop *gop, *next_gop;
2342
2343   GST_OBJECT_LOCK (splitmux);
2344   thresh_bytes = splitmux->threshold_bytes;
2345   thresh_time = splitmux->threshold_time;
2346   ptr_to_time = (GstClockTime *)
2347       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2348   if (ptr_to_time)
2349     time_to_split = *ptr_to_time;
2350   check_robust_muxing = splitmux->use_robust_muxing
2351       && splitmux->muxer_has_reserved_props;
2352   GST_OBJECT_UNLOCK (splitmux);
2353
2354   /* Have we muxed at least one thing from the reference
2355    * stream into the file? If not, no other streams can have
2356    * either */
2357   if (splitmux->fragment_reference_bytes <= 0) {
2358     GST_TRACE_OBJECT (splitmux,
2359         "Not ready to split - nothing muxed on the reference stream");
2360     return FALSE;
2361   }
2362
2363   /* User told us to split now */
2364   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2365     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2366     return TRUE;
2367   }
2368
2369   gop = g_queue_peek_head (&splitmux->pending_input_gops);
2370   /* We need a full GOP queued up at this point */
2371   g_assert (gop != NULL);
2372   next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2373   /* And the beginning of the next GOP or otherwise EOS */
2374
2375   /* User told us to split at this running time */
2376   if (gop->start_time >= time_to_split) {
2377     GST_OBJECT_LOCK (splitmux);
2378     /* Dequeue running time */
2379     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2380     /* Empty any running times after this that are past now */
2381     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382     while (ptr_to_time) {
2383       time_to_split = *ptr_to_time;
2384       if (gop->start_time < time_to_split) {
2385         break;
2386       }
2387       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2388       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2389     }
2390     GST_TRACE_OBJECT (splitmux,
2391         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2392         GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2393         GST_STIME_ARGS (time_to_split));
2394     GST_OBJECT_UNLOCK (splitmux);
2395     return TRUE;
2396   }
2397
2398   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2399     GST_TRACE_OBJECT (splitmux,
2400         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2401     return TRUE;                /* Would overrun byte limit */
2402   }
2403
2404   if (thresh_time > 0 && queued_time > thresh_time) {
2405     GST_TRACE_OBJECT (splitmux,
2406         "queued time %" GST_STIME_FORMAT " overruns time limit",
2407         GST_STIME_ARGS (queued_time));
2408     return TRUE;                /* Would overrun time limit */
2409   }
2410
2411   if (splitmux->tc_interval) {
2412     GstClockTime next_gop_start_time =
2413         next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2414
2415     if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2416         GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2417         next_gop_start_time >
2418         splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2419       GST_TRACE_OBJECT (splitmux,
2420           "in running time %" GST_STIME_FORMAT " overruns time limit %"
2421           GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2422           GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2423       return TRUE;
2424     }
2425   }
2426
2427   if (check_robust_muxing) {
2428     GstClockTime mux_reserved_remain;
2429
2430     g_object_get (splitmux->muxer,
2431         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2432
2433     GST_LOG_OBJECT (splitmux,
2434         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2435         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2436         mux_reserved_remain, queued_gop_time);
2437
2438     if (queued_gop_time >= mux_reserved_remain) {
2439       GST_INFO_OBJECT (splitmux,
2440           "File is about to run out of header room - %" G_GUINT64_FORMAT
2441           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2442           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2443       return TRUE;
2444     }
2445   }
2446
2447   /* Continue and mux this GOP */
2448   return FALSE;
2449 }
2450
2451 /* probably we want to add this API? */
2452 static void
2453 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2454 {
2455   GstVideoTimeCode *timecode = NULL;
2456
2457   g_return_if_fail (old_tc != NULL);
2458
2459   if (*old_tc == new_tc)
2460     return;
2461
2462   if (new_tc)
2463     timecode = gst_video_time_code_copy (new_tc);
2464
2465   if (*old_tc)
2466     gst_video_time_code_free (*old_tc);
2467
2468   *old_tc = timecode;
2469 }
2470
2471 /* Called with splitmux lock held */
2472 /* Called when entering ProcessingCompleteGop state
2473  * Assess if mq contents overflowed the current file
2474  *   -> If yes, need to switch to new file
2475  *   -> if no, set max_out_running_time to let this GOP in and
2476  *      go to COLLECTING_GOP_START state
2477  */
2478 static void
2479 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2480     GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2481 {
2482   guint64 queued_bytes;
2483   GstClockTimeDiff queued_time = 0;
2484   GstClockTimeDiff queued_gop_time = 0;
2485   SplitMuxOutputCommand *cmd;
2486
2487   /* Assess if the multiqueue contents overflowed the current file */
2488   /* When considering if a newly gathered GOP overflows
2489    * the time limit for the file, only consider the running time of the
2490    * reference stream. Other streams might have run ahead a little bit,
2491    * but extra pieces won't be released to the muxer beyond the reference
2492    * stream cut-off anyway - so it forms the limit. */
2493   queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2494   queued_time = next_gop_start_time;
2495   /* queued_gop_time tracks how much unwritten data there is waiting to
2496    * be written to this fragment including this GOP */
2497   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2498     queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2499   else
2500     queued_gop_time = queued_time - gop->start_time;
2501
2502   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2503   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2504       " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2505       " gop start time %" GST_STIME_FORMAT,
2506       GST_STIME_ARGS (queued_time), queued_bytes,
2507       GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2508
2509   if (queued_gop_time < 0)
2510     goto error_gop_duration;
2511
2512   if (queued_time < splitmux->fragment_start_time)
2513     goto error_queued_time;
2514
2515   queued_time -= splitmux->fragment_start_time;
2516   if (queued_time < queued_gop_time)
2517     queued_gop_time = queued_time;
2518
2519   /* Expand queued bytes estimate by muxer overhead */
2520   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2521
2522   /* Check for overrun - have we output at least one byte and overrun
2523    * either threshold? */
2524   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2525     if (splitmux->async_finalize) {
2526       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2527       *sink_running_time = splitmux->reference_ctx->out_running_time;
2528       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2529           RUNNING_TIME, sink_running_time, g_free);
2530     }
2531     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2532     /* Tell the output side to start a new fragment */
2533     GST_INFO_OBJECT (splitmux,
2534         "This GOP (dur %" GST_STIME_FORMAT
2535         ") would overflow the fragment, Sending start_new_fragment cmd",
2536         GST_STIME_ARGS (queued_gop_time));
2537     cmd = out_cmd_buf_new ();
2538     cmd->start_new_fragment = TRUE;
2539     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2540     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2541
2542     splitmux->fragment_start_time = gop->start_time;
2543     splitmux->fragment_start_time_pts = gop->start_time_pts;
2544     splitmux->fragment_total_bytes = 0;
2545     splitmux->fragment_reference_bytes = 0;
2546
2547     video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2548     splitmux->next_fragment_start_tc_time =
2549         calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2550         splitmux->fragment_start_time, NULL);
2551     if (splitmux->tc_interval && splitmux->fragment_start_tc
2552         && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2553       GST_WARNING_OBJECT (splitmux,
2554           "Couldn't calculate next fragment start time for timecode mode");
2555     }
2556   }
2557
2558   /* And set up to collect the next GOP */
2559   if (max_out_running_time != G_MAXINT64) {
2560     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2561   } else {
2562     /* This is probably already the current state, but just in case: */
2563     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2564   }
2565
2566   /* And wake all input contexts to send a wake-up event */
2567   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2568   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2569
2570   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2571   splitmux->fragment_total_bytes += gop->total_bytes;
2572   splitmux->fragment_reference_bytes += gop->reference_bytes;
2573
2574   if (gop->total_bytes > 0) {
2575     GST_LOG_OBJECT (splitmux,
2576         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2577         " time %" GST_STIME_FORMAT,
2578         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2579
2580     /* Send this GOP to the output command queue */
2581     cmd = out_cmd_buf_new ();
2582     cmd->start_new_fragment = FALSE;
2583     cmd->max_output_ts = max_out_running_time;
2584     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2585         GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2586     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2587
2588     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2589   }
2590
2591   return;
2592
2593 error_gop_duration:
2594   GST_ELEMENT_ERROR (splitmux,
2595       STREAM, FAILED, ("Timestamping error on input streams"),
2596       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2597           GST_STIME_ARGS (queued_gop_time)));
2598   return;
2599 error_queued_time:
2600   GST_ELEMENT_ERROR (splitmux,
2601       STREAM, FAILED, ("Timestamping error on input streams"),
2602       ("Queued time is negative. Input went backwards. queued_time - %"
2603           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2604   return;
2605 }
2606
2607 /* Called with splitmux lock held */
2608 /* Called from each input pad when it is has all the pieces
2609  * for a GOP or EOS, starting with the reference pad which has set the
2610  * splitmux->max_in_running_time
2611  */
2612 static void
2613 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2614 {
2615   GList *cur;
2616   GstEvent *event;
2617
2618   /* On ENDING_FILE, the reference stream sends a command to start a new
2619    * fragment, then releases the GOP for output in the new fragment.
2620    *  If some streams received no buffer during the last GOP that overran,
2621    * because its next buffer has a timestamp bigger than
2622    * ctx->max_in_running_time, its queue is empty. In that case the only
2623    * way to wakeup the output thread is by injecting an event in the
2624    * queue. This usually happen with subtitle streams.
2625    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2626   if (ctx->need_unblock) {
2627     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2628     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2629         GST_EVENT_TYPE_SERIALIZED,
2630         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2631             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2632
2633     GST_SPLITMUX_UNLOCK (splitmux);
2634     gst_pad_send_event (ctx->sinkpad, event);
2635     GST_SPLITMUX_LOCK (splitmux);
2636
2637     ctx->need_unblock = FALSE;
2638     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2639     /* state may have changed while we were unlocked. Loop again if so */
2640     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2641       return;
2642   }
2643
2644   do {
2645     GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2646
2647     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2648       GstClockTimeDiff max_out_running_time;
2649       gboolean ready = TRUE;
2650       InputGop *gop;
2651       const InputGop *next_gop;
2652
2653       gop = g_queue_peek_head (&splitmux->pending_input_gops);
2654       next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2655
2656       /* If we have no GOP or no next GOP here then the reference context is
2657        * at EOS, otherwise use the start time of the next GOP if we're far
2658        * enough in the GOP to know it */
2659       if (gop && next_gop) {
2660         if (!splitmux->reference_ctx->in_eos
2661             && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2662             && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2663           GST_LOG_OBJECT (splitmux,
2664               "No further GOPs finished collecting, waiting until current DTS %"
2665               GST_STIME_FORMAT " has passed next GOP start PTS %"
2666               GST_STIME_FORMAT,
2667               GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2668               GST_STIME_ARGS (next_gop->start_time_pts));
2669           break;
2670         }
2671
2672         GST_LOG_OBJECT (splitmux,
2673             "Finished collecting GOP with start time %" GST_STIME_FORMAT
2674             ", next GOP start time %" GST_STIME_FORMAT,
2675             GST_STIME_ARGS (gop->start_time),
2676             GST_STIME_ARGS (next_gop->start_time));
2677         next_gop_start = next_gop->start_time;
2678         max_out_running_time =
2679             splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2680       } else if (!next_gop) {
2681         GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2682         next_gop_start = splitmux->max_in_running_time;
2683         max_out_running_time = G_MAXINT64;
2684       } else if (!gop) {
2685         GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2686         break;
2687       } else {
2688         g_assert_not_reached ();
2689       }
2690
2691       g_assert (gop != NULL);
2692
2693       /* Iterate each pad, and check that the input running time is at least
2694        * up to the start running time of the next GOP or EOS, and if so handle
2695        * the collected GOP */
2696       GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2697           GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2698       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2699           cur = g_list_next (cur)) {
2700         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2701
2702         GST_LOG_OBJECT (splitmux,
2703             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2704             " EOS %d", tmpctx, tmpctx->sinkpad,
2705             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2706
2707         if (next_gop_start != GST_CLOCK_STIME_NONE &&
2708             tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2709           GST_LOG_OBJECT (splitmux,
2710               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2711               tmpctx, tmpctx->sinkpad);
2712           ready = FALSE;
2713           break;
2714         }
2715       }
2716       if (ready) {
2717         GST_DEBUG_OBJECT (splitmux,
2718             "Collected GOP is complete. Processing (ctx %p)", ctx);
2719         /* All pads have a complete GOP, release it into the multiqueue */
2720         handle_gathered_gop (splitmux, gop, next_gop_start,
2721             max_out_running_time);
2722
2723         g_queue_pop_head (&splitmux->pending_input_gops);
2724         input_gop_free (gop);
2725
2726         /* The user has requested a split, we can split now that the previous GOP
2727          * has been collected to the correct location */
2728         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2729                 TRUE, FALSE)) {
2730           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2731         }
2732       }
2733     }
2734
2735     /* If upstream reached EOS we are not expecting more data, no need to wait
2736      * here. */
2737     if (ctx->in_eos)
2738       return;
2739
2740     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2741         !ctx->flushing &&
2742         ctx->in_running_time >= next_gop_start &&
2743         next_gop_start != GST_CLOCK_STIME_NONE) {
2744       /* Some pad is not yet ready, or GOP is being pushed
2745        * either way, sleep and wait to get woken */
2746       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2747       GST_SPLITMUX_WAIT_INPUT (splitmux);
2748       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2749     } else {
2750       /* This pad is not ready or the state changed - break out and get another
2751        * buffer / event */
2752       break;
2753     }
2754   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2755 }
2756
2757 static GstPadProbeReturn
2758 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2759 {
2760   GstSplitMuxSink *splitmux = ctx->splitmux;
2761   GstFlowReturn ret = GST_FLOW_OK;
2762   GstBuffer *buf;
2763   MqStreamBuf *buf_info = NULL;
2764   GstClockTime ts, pts, dts;
2765   GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2766   gboolean loop_again;
2767   gboolean keyframe = FALSE;
2768
2769   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2770
2771   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2772   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2773     g_warning ("Buffer list handling not implemented");
2774     return GST_PAD_PROBE_DROP;
2775   }
2776   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2777       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2778     GstEvent *event = gst_pad_probe_info_get_event (info);
2779
2780     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2781
2782     switch (GST_EVENT_TYPE (event)) {
2783       case GST_EVENT_SEGMENT:
2784         gst_event_copy_segment (event, &ctx->in_segment);
2785         break;
2786       case GST_EVENT_FLUSH_STOP:
2787         GST_SPLITMUX_LOCK (splitmux);
2788         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2789         ctx->in_eos = FALSE;
2790         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2791         GST_SPLITMUX_UNLOCK (splitmux);
2792         break;
2793       case GST_EVENT_EOS:
2794         GST_SPLITMUX_LOCK (splitmux);
2795         ctx->in_eos = TRUE;
2796
2797         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2798           ret = GST_FLOW_FLUSHING;
2799           goto beach;
2800         }
2801
2802         if (ctx->is_reference) {
2803           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2804           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2805           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2806           /* Wake up other input pads to collect this GOP */
2807           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2808           if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2809             GST_WARNING_OBJECT (splitmux,
2810                 "EOS with no buffers received on the reference pad");
2811
2812             /* - child muxer and sink might be still locked state
2813              *   (see gst_splitmux_reset_elements()) so should be unlocked
2814              *   for state change of splitmuxsink to be applied to child
2815              * - would need to post async done message
2816              * - location on sink element is still null then it will post
2817              *   error message on bus (muxer will produce something, header
2818              *   data for example)
2819              *
2820              * Calls start_next_fragment() here, the method will address
2821              * everything the above mentioned one */
2822             ret = start_next_fragment (splitmux, ctx);
2823             if (ret != GST_FLOW_OK)
2824               goto beach;
2825           } else {
2826             check_completed_gop (splitmux, ctx);
2827           }
2828         } else if (splitmux->input_state ==
2829             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2830           /* If we are waiting for a GOP to be completed (ie, for aux
2831            * pads to catch up), then this pad is complete, so check
2832            * if the whole GOP is.
2833            */
2834           if (!g_queue_is_empty (&splitmux->pending_input_gops))
2835             check_completed_gop (splitmux, ctx);
2836         }
2837         GST_SPLITMUX_UNLOCK (splitmux);
2838         break;
2839       case GST_EVENT_GAP:{
2840         GstClockTime gap_ts;
2841         GstClockTimeDiff rtime;
2842
2843         gst_event_parse_gap (event, &gap_ts, NULL);
2844         if (gap_ts == GST_CLOCK_TIME_NONE)
2845           break;
2846
2847         GST_SPLITMUX_LOCK (splitmux);
2848
2849         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2850           ret = GST_FLOW_FLUSHING;
2851           goto beach;
2852         }
2853         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2854
2855         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2856             GST_STIME_ARGS (rtime));
2857
2858         if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2859           /* If this GAP event happens before the first fragment then
2860            * initialize the fragment start time here. */
2861           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2862             splitmux->fragment_start_time = rtime;
2863             GST_LOG_OBJECT (splitmux,
2864                 "Fragment start time now %" GST_STIME_FORMAT,
2865                 GST_STIME_ARGS (splitmux->fragment_start_time));
2866
2867             /* Also take this as the first start time when starting up,
2868              * so that we start counting overflow from the first frame */
2869             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2870               splitmux->max_in_running_time = rtime;
2871             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2872               splitmux->max_in_running_time_dts = rtime;
2873           }
2874
2875           /* Similarly take it as fragment start PTS and GOP start time if
2876            * these are not set */
2877           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2878             splitmux->fragment_start_time_pts = rtime;
2879
2880           if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2881             InputGop *gop = g_slice_new0 (InputGop);
2882
2883             gop->from_gap = TRUE;
2884             gop->start_time = rtime;
2885             gop->start_time_pts = rtime;
2886
2887             g_queue_push_tail (&splitmux->pending_input_gops, gop);
2888           }
2889         }
2890
2891         GST_SPLITMUX_UNLOCK (splitmux);
2892         break;
2893       }
2894       default:
2895         break;
2896     }
2897     return GST_PAD_PROBE_PASS;
2898   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2899     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2900       case GST_QUERY_ALLOCATION:
2901         return GST_PAD_PROBE_DROP;
2902       default:
2903         return GST_PAD_PROBE_PASS;
2904     }
2905   }
2906
2907   buf = gst_pad_probe_info_get_buffer (info);
2908   buf_info = mq_stream_buf_new ();
2909
2910   pts = GST_BUFFER_PTS (buf);
2911   dts = GST_BUFFER_DTS (buf);
2912   if (GST_BUFFER_PTS_IS_VALID (buf))
2913     ts = GST_BUFFER_PTS (buf);
2914   else
2915     ts = GST_BUFFER_DTS (buf);
2916
2917   GST_LOG_OBJECT (pad,
2918       "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2919       GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2920       GST_TIME_ARGS (dts));
2921
2922   GST_SPLITMUX_LOCK (splitmux);
2923
2924   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2925     ret = GST_FLOW_FLUSHING;
2926     goto beach;
2927   }
2928
2929   /* If this buffer has a timestamp, advance the input timestamp of the
2930    * stream */
2931   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2932     running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2933
2934     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2935         GST_STIME_ARGS (running_time));
2936
2937     /* in running time is always the maximum PTS (or DTS) that was observed so far */
2938     if (GST_CLOCK_STIME_IS_VALID (running_time)
2939         && running_time > ctx->in_running_time)
2940       ctx->in_running_time = running_time;
2941   } else {
2942     running_time = ctx->in_running_time;
2943   }
2944
2945   if (GST_CLOCK_TIME_IS_VALID (pts))
2946     running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2947   else
2948     running_time_pts = GST_CLOCK_STIME_NONE;
2949
2950   if (GST_CLOCK_TIME_IS_VALID (dts))
2951     running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2952   else
2953     running_time_dts = GST_CLOCK_STIME_NONE;
2954
2955   /* Try to make sure we have a valid running time */
2956   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2957     ctx->in_running_time =
2958         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2959   }
2960
2961   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2962       GST_STIME_ARGS (ctx->in_running_time));
2963
2964   buf_info->run_ts = ctx->in_running_time;
2965   buf_info->buf_size = gst_buffer_get_size (buf);
2966   buf_info->duration = GST_BUFFER_DURATION (buf);
2967
2968   if (ctx->is_reference) {
2969     InputGop *gop = NULL;
2970     GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2971
2972     /* initialize fragment_start_time if it was not set yet (i.e. for the
2973      * first fragment), or otherwise set it to the minimum observed time */
2974     if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
2975         || splitmux->fragment_start_time > running_time) {
2976       if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
2977         splitmux->fragment_start_time_pts = running_time_pts;
2978       splitmux->fragment_start_time = running_time;
2979
2980       GST_LOG_OBJECT (splitmux,
2981           "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
2982           GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
2983           GST_STIME_ARGS (splitmux->fragment_start_time_pts));
2984
2985       /* Also take this as the first start time when starting up,
2986        * so that we start counting overflow from the first frame */
2987       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
2988           || splitmux->max_in_running_time < splitmux->fragment_start_time)
2989         splitmux->max_in_running_time = splitmux->fragment_start_time;
2990
2991       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2992         splitmux->max_in_running_time_dts = running_time_dts;
2993
2994       if (tc_meta) {
2995         video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2996
2997         splitmux->next_fragment_start_tc_time =
2998             calculate_next_max_timecode (splitmux, &tc_meta->tc,
2999             running_time, NULL);
3000         if (splitmux->tc_interval
3001             && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
3002         {
3003           GST_WARNING_OBJECT (splitmux,
3004               "Couldn't calculate next fragment start time for timecode mode");
3005         }
3006 #ifndef GST_DISABLE_GST_DEBUG
3007         {
3008           gchar *tc_str;
3009
3010           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3011           GST_DEBUG_OBJECT (splitmux,
3012               "Initialize fragment start timecode %s, next fragment start timecode time %"
3013               GST_TIME_FORMAT, tc_str,
3014               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
3015           g_free (tc_str);
3016         }
3017 #endif
3018       }
3019     }
3020
3021
3022     /* First check if we're at the very first GOP and the tracking was created
3023      * from a GAP event. In that case don't start a new GOP on keyframes but
3024      * just updated it as needed */
3025     gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3026
3027     if (!gop || (!gop->from_gap
3028             && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
3029       gop = g_slice_new0 (InputGop);
3030
3031       gop->start_time = running_time;
3032       gop->start_time_pts = running_time_pts;
3033
3034       GST_LOG_OBJECT (splitmux,
3035           "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3036           GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3037           GST_STIME_ARGS (gop->start_time_pts));
3038
3039       if (tc_meta) {
3040         video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3041
3042 #ifndef GST_DISABLE_GST_DEBUG
3043         {
3044           gchar *tc_str;
3045
3046           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3047           GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3048           g_free (tc_str);
3049         }
3050 #endif
3051       }
3052
3053       g_queue_push_tail (&splitmux->pending_input_gops, gop);
3054     } else {
3055       gop->from_gap = FALSE;
3056
3057       if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3058           || gop->start_time > running_time) {
3059         gop->start_time = running_time;
3060
3061         GST_LOG_OBJECT (splitmux,
3062             "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3063             GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3064             GST_STIME_ARGS (gop->start_time_pts));
3065
3066         if (tc_meta) {
3067           video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3068
3069 #ifndef GST_DISABLE_GST_DEBUG
3070           {
3071             gchar *tc_str;
3072
3073             tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3074             GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3075                 tc_str);
3076             g_free (tc_str);
3077           }
3078 #endif
3079         }
3080       }
3081     }
3082
3083     /* Check whether we need to request next keyframe depending on
3084      * current running time */
3085     if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3086       GST_WARNING_OBJECT (splitmux,
3087           "Could not request a keyframe. Files may not split at the exact location they should");
3088     }
3089   }
3090
3091   {
3092     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3093
3094     if (gop) {
3095       GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3096           " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3097           G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3098           gop->total_bytes, gop->total_bytes);
3099     }
3100   }
3101
3102   loop_again = TRUE;
3103   do {
3104     if (ctx->flushing) {
3105       ret = GST_FLOW_FLUSHING;
3106       goto beach;
3107     }
3108
3109     switch (splitmux->input_state) {
3110       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3111         if (ctx->is_reference) {
3112           const InputGop *gop, *next_gop;
3113
3114           /* This is the reference context. If it's a keyframe,
3115            * it marks the start of a new GOP and we should wait in
3116            * check_completed_gop before continuing, but either way
3117            * (keyframe or no, we'll pass this buffer through after
3118            * so set loop_again to FALSE */
3119           loop_again = FALSE;
3120
3121           gop = g_queue_peek_head (&splitmux->pending_input_gops);
3122           g_assert (gop != NULL);
3123           next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3124
3125           if (ctx->in_running_time > splitmux->max_in_running_time)
3126             splitmux->max_in_running_time = ctx->in_running_time;
3127           if (running_time_dts > splitmux->max_in_running_time_dts)
3128             splitmux->max_in_running_time_dts = running_time_dts;
3129
3130           GST_LOG_OBJECT (splitmux,
3131               "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3132               GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3133               GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3134
3135           if (!next_gop) {
3136             GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3137             /* Allow other input pads to catch up to here too */
3138             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3139             break;
3140           }
3141
3142           if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3143             GST_INFO_OBJECT (pad,
3144                 "Have keyframe with running time %" GST_STIME_FORMAT,
3145                 GST_STIME_ARGS (ctx->in_running_time));
3146             keyframe = TRUE;
3147           }
3148
3149           if (running_time_dts != GST_CLOCK_STIME_NONE
3150               && running_time_dts < next_gop->start_time_pts) {
3151             GST_DEBUG_OBJECT (splitmux,
3152                 "Waiting until DTS (%" GST_STIME_FORMAT
3153                 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3154                 GST_STIME_ARGS (running_time_dts),
3155                 GST_STIME_ARGS (next_gop->start_time_pts));
3156             /* Allow other input pads to catch up to here too */
3157             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3158             break;
3159           }
3160
3161           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3162           /* Wake up other input pads to collect this GOP */
3163           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3164           check_completed_gop (splitmux, ctx);
3165         } else {
3166           /* Pass this buffer if the reference ctx is far enough ahead */
3167           if (ctx->in_running_time < splitmux->max_in_running_time) {
3168             loop_again = FALSE;
3169             break;
3170           }
3171
3172           /* We're still waiting for a keyframe on the reference pad, sleep */
3173           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3174           GST_SPLITMUX_WAIT_INPUT (splitmux);
3175           GST_LOG_OBJECT (pad,
3176               "Done sleeping for GOP start input state now %d",
3177               splitmux->input_state);
3178         }
3179         break;
3180       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3181         /* We're collecting a GOP, this is normally only called for non-reference
3182          * contexts as the reference context would be waiting inside
3183          * check_completed_gop() */
3184         if (G_UNLIKELY (ctx->is_reference)) {
3185           check_completed_gop (splitmux, ctx);
3186           break;
3187         }
3188
3189         /* If we overran the target timestamp, it might be time to process
3190          * the GOP, otherwise bail out for more data. */
3191         GST_LOG_OBJECT (pad,
3192             "Checking TS %" GST_STIME_FORMAT " against max %"
3193             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3194             GST_STIME_ARGS (splitmux->max_in_running_time));
3195
3196         if (ctx->in_running_time < splitmux->max_in_running_time) {
3197           loop_again = FALSE;
3198           break;
3199         }
3200
3201         GST_LOG_OBJECT (pad,
3202             "Collected last packet of GOP. Checking other pads");
3203
3204         if (g_queue_is_empty (&splitmux->pending_input_gops)) {
3205           GST_WARNING_OBJECT (pad,
3206               "Reference was closed without GOP, dropping");
3207           goto drop;
3208         }
3209
3210         check_completed_gop (splitmux, ctx);
3211         break;
3212       }
3213       case SPLITMUX_INPUT_STATE_FINISHING_UP:
3214         loop_again = FALSE;
3215         break;
3216       default:
3217         loop_again = FALSE;
3218         break;
3219     }
3220   }
3221   while (loop_again);
3222
3223   if (keyframe && ctx->is_reference)
3224     splitmux->queued_keyframes++;
3225   buf_info->keyframe = keyframe;
3226
3227   /* Update total input byte counter for overflow detect unless we're after
3228    * EOS now */
3229   if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
3230       && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
3231     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3232
3233     /* We must have a GOP at this point */
3234     g_assert (gop != NULL);
3235
3236     gop->total_bytes += buf_info->buf_size;
3237     if (ctx->is_reference) {
3238       gop->reference_bytes += buf_info->buf_size;
3239     }
3240   }
3241
3242   /* Now add this buffer to the queue just before returning */
3243   g_queue_push_head (&ctx->queued_bufs, buf_info);
3244
3245   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3246       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3247
3248   GST_SPLITMUX_UNLOCK (splitmux);
3249   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3250   return GST_PAD_PROBE_PASS;
3251
3252 beach:
3253   GST_SPLITMUX_UNLOCK (splitmux);
3254   if (buf_info)
3255     mq_stream_buf_free (buf_info);
3256   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3257   return GST_PAD_PROBE_PASS;
3258 drop:
3259   GST_SPLITMUX_UNLOCK (splitmux);
3260   if (buf_info)
3261     mq_stream_buf_free (buf_info);
3262   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = GST_FLOW_EOS;
3263   return GST_PAD_PROBE_DROP;
3264 }
3265
3266 static void
3267 grow_blocked_queues (GstSplitMuxSink * splitmux)
3268 {
3269   GList *cur;
3270
3271   /* Scan other queues for full-ness and grow them */
3272   for (cur = g_list_first (splitmux->contexts);
3273       cur != NULL; cur = g_list_next (cur)) {
3274     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3275     guint cur_limit;
3276     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3277
3278     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3279     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3280
3281     if (cur_len >= cur_limit) {
3282       cur_limit = cur_len + 1;
3283       GST_DEBUG_OBJECT (tmpctx->q,
3284           "Queue overflowed and needs enlarging. Growing to %u buffers",
3285           cur_limit);
3286       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3287     }
3288   }
3289 }
3290
3291 static void
3292 handle_q_underrun (GstElement * q, gpointer user_data)
3293 {
3294   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3295   GstSplitMuxSink *splitmux = ctx->splitmux;
3296
3297   GST_SPLITMUX_LOCK (splitmux);
3298   GST_DEBUG_OBJECT (q,
3299       "Queue reported underrun with %d keyframes and %d cmds enqueued",
3300       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3301   grow_blocked_queues (splitmux);
3302   GST_SPLITMUX_UNLOCK (splitmux);
3303 }
3304
3305 static void
3306 handle_q_overrun (GstElement * q, gpointer user_data)
3307 {
3308   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3309   GstSplitMuxSink *splitmux = ctx->splitmux;
3310   gboolean allow_grow = FALSE;
3311
3312   GST_SPLITMUX_LOCK (splitmux);
3313   GST_DEBUG_OBJECT (q,
3314       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3315       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3316
3317   if (splitmux->queued_keyframes < 2) {
3318     /* Less than a full GOP queued, grow the queue */
3319     allow_grow = TRUE;
3320   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3321     allow_grow = TRUE;
3322   } else {
3323     /* If another queue is starved, grow */
3324     GList *cur;
3325     for (cur = g_list_first (splitmux->contexts);
3326         cur != NULL; cur = g_list_next (cur)) {
3327       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3328       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3329         allow_grow = TRUE;
3330       }
3331     }
3332   }
3333   GST_SPLITMUX_UNLOCK (splitmux);
3334
3335   if (allow_grow) {
3336     guint cur_limit;
3337
3338     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3339     cur_limit++;
3340
3341     GST_DEBUG_OBJECT (q,
3342         "Queue overflowed and needs enlarging. Growing to %u buffers",
3343         cur_limit);
3344
3345     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3346   }
3347 }
3348
3349 /* Called with SPLITMUX lock held */
3350 static const gchar *
3351 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3352 {
3353   const gchar *ret = NULL;
3354
3355   if (splitmux->muxerpad_map == NULL)
3356     return NULL;
3357
3358   if (sinkpad_name == NULL) {
3359     GST_WARNING_OBJECT (splitmux,
3360         "Can't look up request pad in pad map without providing a pad name");
3361     return NULL;
3362   }
3363
3364   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3365   if (ret) {
3366     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3367         ret);
3368     return g_strdup (ret);
3369   }
3370
3371   return NULL;
3372 }
3373
3374 static GstPad *
3375 gst_splitmux_sink_request_new_pad (GstElement * element,
3376     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3377 {
3378   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3379   GstPadTemplate *mux_template = NULL;
3380   GstPad *ret = NULL, *muxpad = NULL;
3381   GstElement *q;
3382   GstPad *q_sink = NULL, *q_src = NULL;
3383   gchar *gname, *qname;
3384   gboolean is_primary_video = FALSE, is_video = FALSE,
3385       muxer_is_requestpad = FALSE;
3386   MqStreamCtx *ctx;
3387   const gchar *muxer_padname = NULL;
3388
3389   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3390
3391   GST_SPLITMUX_LOCK (splitmux);
3392   if (!create_muxer (splitmux))
3393     goto fail;
3394   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3395
3396   if (g_str_equal (templ->name_template, "video") ||
3397       g_str_has_prefix (templ->name_template, "video_aux_")) {
3398     is_primary_video = g_str_equal (templ->name_template, "video");
3399     if (is_primary_video && splitmux->have_video)
3400       goto already_have_video;
3401     is_video = TRUE;
3402   }
3403
3404   /* See if there's a pad map and it lists this pad */
3405   muxer_padname = lookup_muxer_pad (splitmux, name);
3406
3407   if (muxer_padname == NULL) {
3408     if (is_video) {
3409       /* FIXME: Look for a pad template with matching caps, rather than by name */
3410       GST_DEBUG_OBJECT (element,
3411           "searching for pad-template with name 'video_%%u'");
3412       mux_template =
3413           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3414           (splitmux->muxer), "video_%u");
3415
3416       /* Fallback to find sink pad templates named 'video' (flvmux) */
3417       if (!mux_template) {
3418         GST_DEBUG_OBJECT (element,
3419             "searching for pad-template with name 'video'");
3420         mux_template =
3421             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3422             (splitmux->muxer), "video");
3423       }
3424       name = NULL;
3425     } else {
3426       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3427           templ->name_template);
3428       mux_template =
3429           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3430           (splitmux->muxer), templ->name_template);
3431
3432       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3433       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3434         GST_DEBUG_OBJECT (element,
3435             "searching for pad-template with name 'audio'");
3436         mux_template =
3437             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3438             (splitmux->muxer), "audio");
3439         name = NULL;
3440       }
3441     }
3442
3443     if (mux_template == NULL) {
3444       GST_DEBUG_OBJECT (element,
3445           "searching for pad-template with name 'sink_%%d'");
3446       mux_template =
3447           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3448           (splitmux->muxer), "sink_%d");
3449       name = NULL;
3450     }
3451     if (mux_template == NULL) {
3452       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3453       mux_template =
3454           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3455           (splitmux->muxer), "sink");
3456       name = NULL;
3457     }
3458
3459     if (mux_template == NULL) {
3460       GST_ERROR_OBJECT (element,
3461           "unable to find a suitable sink pad-template on the muxer");
3462       goto fail;
3463     }
3464     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3465         mux_template->name_template);
3466
3467     if (mux_template->presence == GST_PAD_REQUEST) {
3468       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3469
3470       muxpad =
3471           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3472       muxer_is_requestpad = TRUE;
3473     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3474       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3475
3476       muxpad =
3477           gst_element_get_static_pad (splitmux->muxer,
3478           mux_template->name_template);
3479     } else {
3480       GST_ERROR_OBJECT (element,
3481           "unexpected pad presence %d", mux_template->presence);
3482       goto fail;
3483     }
3484   } else {
3485     /* Have a muxer pad name */
3486     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3487       if ((muxpad =
3488               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3489         muxer_is_requestpad = TRUE;
3490     }
3491     g_free ((gchar *) muxer_padname);
3492     muxer_padname = NULL;
3493   }
3494
3495   /* One way or another, we must have a muxer pad by now */
3496   if (muxpad == NULL)
3497     goto fail;
3498
3499   if (is_primary_video)
3500     gname = g_strdup ("video");
3501   else if (name == NULL)
3502     gname = gst_pad_get_name (muxpad);
3503   else
3504     gname = g_strdup (name);
3505
3506   qname = g_strdup_printf ("queue_%s", gname);
3507   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3508     g_free (qname);
3509     goto fail;
3510   }
3511   g_free (qname);
3512
3513   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3514
3515   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3516       "max-size-buffers", 5, NULL);
3517
3518   q_sink = gst_element_get_static_pad (q, "sink");
3519   q_src = gst_element_get_static_pad (q, "src");
3520
3521   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3522     if (muxer_is_requestpad)
3523       gst_element_release_request_pad (splitmux->muxer, muxpad);
3524     gst_object_unref (GST_OBJECT (muxpad));
3525     goto fail;
3526   }
3527
3528   gst_object_unref (GST_OBJECT (muxpad));
3529
3530   ctx = mq_stream_ctx_new (splitmux);
3531   /* Context holds a ref: */
3532   ctx->q = gst_object_ref (q);
3533   ctx->srcpad = q_src;
3534   ctx->sinkpad = q_sink;
3535   ctx->q_overrun_id =
3536       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3537   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3538
3539   ctx->src_pad_block_id =
3540       gst_pad_add_probe (q_src,
3541       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3542       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3543   if (is_primary_video && splitmux->reference_ctx != NULL) {
3544     splitmux->reference_ctx->is_reference = FALSE;
3545     splitmux->reference_ctx = NULL;
3546   }
3547   if (splitmux->reference_ctx == NULL) {
3548     splitmux->reference_ctx = ctx;
3549     ctx->is_reference = TRUE;
3550   }
3551
3552   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3553   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3554
3555   ctx->sink_pad_block_id =
3556       gst_pad_add_probe (q_sink,
3557       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3558       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3559       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3560
3561   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3562       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3563
3564   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3565
3566   g_free (gname);
3567
3568   if (is_primary_video)
3569     splitmux->have_video = TRUE;
3570
3571   gst_pad_set_active (ret, TRUE);
3572   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3573
3574   GST_SPLITMUX_UNLOCK (splitmux);
3575
3576   return ret;
3577 fail:
3578   GST_SPLITMUX_UNLOCK (splitmux);
3579
3580   if (q_sink)
3581     gst_object_unref (q_sink);
3582   if (q_src)
3583     gst_object_unref (q_src);
3584   return NULL;
3585 already_have_video:
3586   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3587   GST_SPLITMUX_UNLOCK (splitmux);
3588   return NULL;
3589 }
3590
3591 static void
3592 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3593 {
3594   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3595   GstPad *muxpad = NULL;
3596   MqStreamCtx *ctx =
3597       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3598
3599   GST_SPLITMUX_LOCK (splitmux);
3600
3601   if (splitmux->muxer == NULL)
3602     goto fail;                  /* Elements don't exist yet - nothing to release */
3603
3604   GST_INFO_OBJECT (pad, "releasing request pad");
3605
3606   muxpad = gst_pad_get_peer (ctx->srcpad);
3607
3608   /* Remove the context from our consideration */
3609   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3610
3611   ctx->flushing = TRUE;
3612   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3613
3614   GST_SPLITMUX_UNLOCK (splitmux);
3615
3616   if (ctx->sink_pad_block_id) {
3617     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3618     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3619   }
3620
3621   if (ctx->src_pad_block_id)
3622     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3623
3624   /* Wait for the pad to be free */
3625   GST_PAD_STREAM_LOCK (pad);
3626   GST_SPLITMUX_LOCK (splitmux);
3627   GST_PAD_STREAM_UNLOCK (pad);
3628
3629   /* Can release the context now */
3630   mq_stream_ctx_free (ctx);
3631   if (ctx == splitmux->reference_ctx)
3632     splitmux->reference_ctx = NULL;
3633
3634   /* Release and free the muxer input */
3635   if (muxpad) {
3636     gst_element_release_request_pad (splitmux->muxer, muxpad);
3637     gst_object_unref (muxpad);
3638   }
3639
3640   if (GST_PAD_PAD_TEMPLATE (pad) &&
3641       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3642               (pad)), "video"))
3643     splitmux->have_video = FALSE;
3644
3645   gst_element_remove_pad (element, pad);
3646
3647   /* Reset the internal elements only after all request pads are released */
3648   if (splitmux->contexts == NULL)
3649     gst_splitmux_reset_elements (splitmux);
3650
3651   /* Wake up other input streams to check if the completion conditions have
3652    * changed */
3653   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3654
3655 fail:
3656   GST_SPLITMUX_UNLOCK (splitmux);
3657 }
3658
3659 static GstElement *
3660 create_element (GstSplitMuxSink * splitmux,
3661     const gchar * factory, const gchar * name, gboolean locked)
3662 {
3663   GstElement *ret = gst_element_factory_make (factory, name);
3664   if (ret == NULL) {
3665     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3666     return NULL;
3667   }
3668
3669   if (locked) {
3670     /* Ensure the sink starts in locked state and NULL - it will be changed
3671      * by the filename setting code */
3672     gst_element_set_locked_state (ret, TRUE);
3673     gst_element_set_state (ret, GST_STATE_NULL);
3674   }
3675
3676   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3677     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3678     gst_object_unref (ret);
3679     return NULL;
3680   }
3681
3682   return ret;
3683 }
3684
3685 static gboolean
3686 create_muxer (GstSplitMuxSink * splitmux)
3687 {
3688   /* Create internal elements */
3689   if (splitmux->muxer == NULL) {
3690     GstElement *provided_muxer = NULL;
3691
3692     GST_OBJECT_LOCK (splitmux);
3693     if (splitmux->provided_muxer != NULL)
3694       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3695     GST_OBJECT_UNLOCK (splitmux);
3696
3697     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3698         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3699       if ((splitmux->muxer =
3700               create_element (splitmux,
3701                   splitmux->muxer_factory ? splitmux->
3702                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3703         goto fail;
3704     } else if (splitmux->async_finalize) {
3705       if ((splitmux->muxer =
3706               create_element (splitmux, splitmux->muxer_factory, "muxer",
3707                   FALSE)) == NULL)
3708         goto fail;
3709       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3710         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3711             splitmux->muxer_preset);
3712       if (splitmux->muxer_properties)
3713         gst_structure_foreach (splitmux->muxer_properties,
3714             _set_property_from_structure, splitmux->muxer);
3715     } else {
3716       /* Ensure it's not in locked state (we might be reusing an old element) */
3717       gst_element_set_locked_state (provided_muxer, FALSE);
3718       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3719         g_warning ("Could not add muxer element - splitmuxsink will not work");
3720         gst_object_unref (provided_muxer);
3721         goto fail;
3722       }
3723
3724       splitmux->muxer = provided_muxer;
3725       gst_object_unref (provided_muxer);
3726     }
3727
3728     if (splitmux->use_robust_muxing) {
3729       update_muxer_properties (splitmux);
3730     }
3731   }
3732
3733   return TRUE;
3734 fail:
3735   return FALSE;
3736 }
3737
3738 static GstElement *
3739 find_sink (GstElement * e)
3740 {
3741   GstElement *res = NULL;
3742   GstIterator *iter;
3743   gboolean done = FALSE;
3744   GValue data = { 0, };
3745
3746   if (!GST_IS_BIN (e))
3747     return e;
3748
3749   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3750     return e;
3751
3752   iter = gst_bin_iterate_sinks (GST_BIN (e));
3753   while (!done) {
3754     switch (gst_iterator_next (iter, &data)) {
3755       case GST_ITERATOR_OK:
3756       {
3757         GstElement *child = g_value_get_object (&data);
3758         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3759                 "location") != NULL) {
3760           res = child;
3761           done = TRUE;
3762         }
3763         g_value_reset (&data);
3764         break;
3765       }
3766       case GST_ITERATOR_RESYNC:
3767         gst_iterator_resync (iter);
3768         break;
3769       case GST_ITERATOR_DONE:
3770         done = TRUE;
3771         break;
3772       case GST_ITERATOR_ERROR:
3773         g_assert_not_reached ();
3774         break;
3775     }
3776   }
3777   g_value_unset (&data);
3778   gst_iterator_free (iter);
3779
3780   return res;
3781 }
3782
3783 static gboolean
3784 create_sink (GstSplitMuxSink * splitmux)
3785 {
3786   GstElement *provided_sink = NULL;
3787
3788   if (splitmux->active_sink == NULL) {
3789
3790     GST_OBJECT_LOCK (splitmux);
3791     if (splitmux->provided_sink != NULL)
3792       provided_sink = gst_object_ref (splitmux->provided_sink);
3793     GST_OBJECT_UNLOCK (splitmux);
3794
3795     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3796         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3797       if ((splitmux->sink =
3798               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3799         goto fail;
3800       splitmux->active_sink = splitmux->sink;
3801     } else if (splitmux->async_finalize) {
3802       if ((splitmux->sink =
3803               create_element (splitmux, splitmux->sink_factory, "sink",
3804                   TRUE)) == NULL)
3805         goto fail;
3806       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3807         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3808             splitmux->sink_preset);
3809       if (splitmux->sink_properties)
3810         gst_structure_foreach (splitmux->sink_properties,
3811             _set_property_from_structure, splitmux->sink);
3812       splitmux->active_sink = splitmux->sink;
3813     } else {
3814       /* Ensure the sink starts in locked state and NULL - it will be changed
3815        * by the filename setting code */
3816       gst_element_set_locked_state (provided_sink, TRUE);
3817       gst_element_set_state (provided_sink, GST_STATE_NULL);
3818       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3819         g_warning ("Could not add sink elements - splitmuxsink will not work");
3820         gst_object_unref (provided_sink);
3821         goto fail;
3822       }
3823
3824       splitmux->active_sink = provided_sink;
3825
3826       /* The bin holds a ref now, we can drop our tmp ref */
3827       gst_object_unref (provided_sink);
3828
3829       /* Find the sink element */
3830       splitmux->sink = find_sink (splitmux->active_sink);
3831       if (splitmux->sink == NULL) {
3832         g_warning
3833             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3834         goto fail;
3835       }
3836     }
3837
3838 #if 1
3839     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3840             "async") != NULL) {
3841       /* async child elements are causing state change races and weird
3842        * failures, so let's try and turn that off */
3843       g_object_set (splitmux->sink, "async", FALSE, NULL);
3844     }
3845 #endif
3846
3847     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3848       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3849       goto fail;
3850     }
3851   }
3852
3853   return TRUE;
3854 fail:
3855   return FALSE;
3856 }
3857
3858 #ifdef __GNUC__
3859 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3860 #endif
3861 static void
3862 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3863 {
3864   gchar *fname = NULL;
3865   GstSample *sample;
3866   GstCaps *caps;
3867
3868   gst_splitmux_sink_ensure_max_files (splitmux);
3869
3870   if (ctx->cur_out_buffer == NULL) {
3871     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3872   }
3873
3874   caps = gst_pad_get_current_caps (ctx->srcpad);
3875   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3876   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3877       splitmux->fragment_id, sample, &fname);
3878   gst_sample_unref (sample);
3879   if (caps)
3880     gst_caps_unref (caps);
3881
3882   if (fname == NULL) {
3883     /* Fallback to the old signal if the new one returned nothing */
3884     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3885         splitmux->fragment_id, &fname);
3886   }
3887
3888   if (!fname)
3889     fname = splitmux->location ?
3890         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3891
3892   if (fname) {
3893     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3894     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3895             "location") != NULL)
3896       g_object_set (splitmux->sink, "location", fname, NULL);
3897     g_free (fname);
3898   }
3899
3900   splitmux->fragment_id++;
3901 }
3902
3903 /* called with GST_SPLITMUX_LOCK */
3904 static void
3905 do_async_start (GstSplitMuxSink * splitmux)
3906 {
3907   GstMessage *message;
3908
3909   if (!splitmux->need_async_start) {
3910     GST_INFO_OBJECT (splitmux, "no async_start needed");
3911     return;
3912   }
3913
3914   splitmux->async_pending = TRUE;
3915
3916   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3917   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3918
3919   GST_SPLITMUX_UNLOCK (splitmux);
3920   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3921       (splitmux), message);
3922   GST_SPLITMUX_LOCK (splitmux);
3923 }
3924
3925 /* called with GST_SPLITMUX_LOCK */
3926 static void
3927 do_async_done (GstSplitMuxSink * splitmux)
3928 {
3929   GstMessage *message;
3930
3931   if (splitmux->async_pending) {
3932     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3933     splitmux->async_pending = FALSE;
3934     GST_SPLITMUX_UNLOCK (splitmux);
3935
3936     message =
3937         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3938         GST_CLOCK_TIME_NONE);
3939     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3940         (splitmux), message);
3941     GST_SPLITMUX_LOCK (splitmux);
3942   }
3943
3944   splitmux->need_async_start = FALSE;
3945 }
3946
3947 static void
3948 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3949 {
3950   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3951   splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3952
3953   splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3954   splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3955   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3956
3957   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3958   g_queue_clear (&splitmux->pending_input_gops);
3959
3960   splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3961   splitmux->fragment_total_bytes = 0;
3962   splitmux->fragment_reference_bytes = 0;
3963   splitmux->muxed_out_bytes = 0;
3964   splitmux->ready_for_output = FALSE;
3965
3966   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3967   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3968
3969   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3970   gst_queue_array_clear (splitmux->times_to_split);
3971
3972   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3973   splitmux->queued_keyframes = 0;
3974
3975   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3976   g_queue_clear (&splitmux->out_cmd_q);
3977 }
3978
3979 static GstStateChangeReturn
3980 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3981 {
3982   GstStateChangeReturn ret;
3983   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3984
3985   switch (transition) {
3986     case GST_STATE_CHANGE_NULL_TO_READY:{
3987       GST_SPLITMUX_LOCK (splitmux);
3988       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3989         ret = GST_STATE_CHANGE_FAILURE;
3990         GST_SPLITMUX_UNLOCK (splitmux);
3991         goto beach;
3992       }
3993       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3994       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3995       GST_SPLITMUX_UNLOCK (splitmux);
3996       splitmux->fragment_id = splitmux->start_index;
3997       break;
3998     }
3999     case GST_STATE_CHANGE_READY_TO_PAUSED:{
4000       GST_SPLITMUX_LOCK (splitmux);
4001       /* Make sure contexts and tracking times are cleared, in case we're being reused */
4002       gst_splitmux_sink_reset (splitmux);
4003       /* Start by collecting one input on each pad */
4004       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
4005       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
4006
4007       GST_SPLITMUX_UNLOCK (splitmux);
4008
4009       GST_SPLITMUX_STATE_LOCK (splitmux);
4010       splitmux->shutdown = FALSE;
4011       GST_SPLITMUX_STATE_UNLOCK (splitmux);
4012       break;
4013     }
4014     case GST_STATE_CHANGE_PAUSED_TO_READY:
4015     case GST_STATE_CHANGE_READY_TO_READY:
4016       g_atomic_int_set (&(splitmux->split_requested), FALSE);
4017       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
4018       /* Fall through */
4019     case GST_STATE_CHANGE_READY_TO_NULL:
4020       GST_SPLITMUX_STATE_LOCK (splitmux);
4021       splitmux->shutdown = TRUE;
4022       GST_SPLITMUX_STATE_UNLOCK (splitmux);
4023
4024       GST_SPLITMUX_LOCK (splitmux);
4025       gst_splitmux_sink_reset (splitmux);
4026       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
4027       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
4028       /* Wake up any blocked threads */
4029       GST_LOG_OBJECT (splitmux,
4030           "State change -> NULL or READY. Waking threads");
4031       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
4032       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
4033       GST_SPLITMUX_UNLOCK (splitmux);
4034       break;
4035     default:
4036       break;
4037   }
4038
4039   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4040   if (ret == GST_STATE_CHANGE_FAILURE)
4041     goto beach;
4042
4043   switch (transition) {
4044     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4045       splitmux->need_async_start = TRUE;
4046       break;
4047     case GST_STATE_CHANGE_READY_TO_PAUSED:{
4048       /* Change state async, because our child sink might not
4049        * be ready to do that for us yet if it's state is still locked */
4050
4051       splitmux->need_async_start = TRUE;
4052       /* we want to go async to PAUSED until we managed to configure and add the
4053        * sink */
4054       GST_SPLITMUX_LOCK (splitmux);
4055       do_async_start (splitmux);
4056       GST_SPLITMUX_UNLOCK (splitmux);
4057       ret = GST_STATE_CHANGE_ASYNC;
4058       break;
4059     }
4060     case GST_STATE_CHANGE_READY_TO_NULL:
4061       GST_SPLITMUX_LOCK (splitmux);
4062       splitmux->fragment_id = 0;
4063       /* Reset internal elements only if no pad contexts are using them */
4064       if (splitmux->contexts == NULL)
4065         gst_splitmux_reset_elements (splitmux);
4066       do_async_done (splitmux);
4067       GST_SPLITMUX_UNLOCK (splitmux);
4068       break;
4069     default:
4070       break;
4071   }
4072
4073   return ret;
4074
4075 beach:
4076   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4077     /* Cleanup elements on failed transition out of NULL */
4078     gst_splitmux_reset_elements (splitmux);
4079     GST_SPLITMUX_LOCK (splitmux);
4080     do_async_done (splitmux);
4081     GST_SPLITMUX_UNLOCK (splitmux);
4082   }
4083   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4084     /* READY to READY transition only happens when we're already
4085      * in READY state, but a child element is in NULL, which
4086      * happens when there's an error changing the state of the sink.
4087      * We need to make sure not to fail the state transition, or
4088      * the core won't transition us back to NULL successfully */
4089     ret = GST_STATE_CHANGE_SUCCESS;
4090   }
4091   return ret;
4092 }
4093
4094 static void
4095 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4096 {
4097   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4098     splitmux->fragment_id = 0;
4099   }
4100 }
4101
4102 static void
4103 split_now (GstSplitMuxSink * splitmux)
4104 {
4105   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4106 }
4107
4108 static void
4109 split_after (GstSplitMuxSink * splitmux)
4110 {
4111   g_atomic_int_set (&(splitmux->split_requested), TRUE);
4112 }
4113
4114 static void
4115 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4116 {
4117   gboolean send_keyframe_requests;
4118
4119   GST_SPLITMUX_LOCK (splitmux);
4120   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4121   send_keyframe_requests = splitmux->send_keyframe_requests;
4122   GST_SPLITMUX_UNLOCK (splitmux);
4123
4124   if (send_keyframe_requests) {
4125     GstEvent *ev =
4126         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4127     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4128         GST_TIME_ARGS (split_time));
4129     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4130       GST_WARNING_OBJECT (splitmux,
4131           "Could not request keyframe at %" GST_TIME_FORMAT,
4132           GST_TIME_ARGS (split_time));
4133     }
4134   }
4135 }