splitmuxsink: Avoid deadlock on release, harder
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202
203 static void
204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250     GstVideoTimeCode ** next_tc);
251
252 static MqStreamBuf *
253 mq_stream_buf_new (void)
254 {
255   return g_slice_new0 (MqStreamBuf);
256 }
257
258 static void
259 mq_stream_buf_free (MqStreamBuf * data)
260 {
261   g_slice_free (MqStreamBuf, data);
262 }
263
264 static SplitMuxOutputCommand *
265 out_cmd_buf_new (void)
266 {
267   return g_slice_new0 (SplitMuxOutputCommand);
268 }
269
270 static void
271 out_cmd_buf_free (SplitMuxOutputCommand * data)
272 {
273   g_slice_free (SplitMuxOutputCommand, data);
274 }
275
276 static void
277 input_gop_free (InputGop * gop)
278 {
279   g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280   g_slice_free (InputGop, gop);
281 }
282
283 static void
284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
285 {
286   GObjectClass *gobject_class = (GObjectClass *) klass;
287   GstElementClass *gstelement_class = (GstElementClass *) klass;
288   GstBinClass *gstbin_class = (GstBinClass *) klass;
289
290   gobject_class->set_property = gst_splitmux_sink_set_property;
291   gobject_class->get_property = gst_splitmux_sink_get_property;
292   gobject_class->dispose = gst_splitmux_sink_dispose;
293   gobject_class->finalize = gst_splitmux_sink_finalize;
294
295   gst_element_class_set_static_metadata (gstelement_class,
296       "Split Muxing Bin", "Generic/Bin/Muxer",
297       "Convenience bin that muxes incoming streams into multiple time/size limited files",
298       "Jan Schmidt <jan@centricular.com>");
299
300   gst_element_class_add_static_pad_template (gstelement_class,
301       &video_sink_template);
302   gst_element_class_add_static_pad_template (gstelement_class,
303       &video_aux_sink_template);
304   gst_element_class_add_static_pad_template (gstelement_class,
305       &audio_sink_template);
306   gst_element_class_add_static_pad_template (gstelement_class,
307       &subtitle_sink_template);
308   gst_element_class_add_static_pad_template (gstelement_class,
309       &caption_sink_template);
310
311   gstelement_class->change_state =
312       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313   gstelement_class->request_new_pad =
314       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315   gstelement_class->release_pad =
316       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
317
318   gstbin_class->handle_message = bus_handler;
319
320   g_object_class_install_property (gobject_class, PROP_LOCATION,
321       g_param_spec_string ("location", "File Output Pattern",
322           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325       g_param_spec_double ("mux-overhead", "Muxing Overhead",
326           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327           DEFAULT_MUXER_OVERHEAD,
328           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
329
330   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333           DEFAULT_MAX_SIZE_TIME,
334           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335           G_PARAM_STATIC_STRINGS));
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339           DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344           "Maximum difference in timecode between first and last frame. "
345           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346           "Will only be effective if a timecode track is present.", NULL,
347           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350       g_param_spec_boolean ("send-keyframe-requests",
351           "Request keyframes at max-size-time",
352           "Request a keyframe every max-size-time ns to try splitting at that point. "
353           "Needs max-size-bytes to be 0 in order to be effective.",
354           DEFAULT_SEND_KEYFRAME_REQUESTS,
355           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358       g_param_spec_uint ("max-files", "Max files",
359           "Maximum number of files to keep on disk. Once the maximum is reached,"
360           "old files start to be deleted to make room for new ones.", 0,
361           G_MAXUINT, DEFAULT_MAX_FILES,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365           "Allow non-reference streams to be that many ns before the reference"
366           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368           G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_MUXER,
371       g_param_spec_object ("muxer", "Muxer",
372           "The muxer element to use (NULL = default mp4mux). "
373           "Valid only for async-finalize = FALSE",
374           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375   g_object_class_install_property (gobject_class, PROP_SINK,
376       g_param_spec_object ("sink", "Sink",
377           "The sink element (or element chain) to use (NULL = default filesink). "
378           "Valid only for async-finalize = FALSE",
379           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382       g_param_spec_boolean ("use-robust-muxing",
383           "Support robust-muxing mode of some muxers",
384           "Check if muxers support robust muxing via the reserved-max-duration and "
385           "reserved-duration-remaining properties and use them if so. "
386           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387           " create new fragments if the reserved header space is about to overflow. "
388           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389           "manually by the app to a non-zero value for robust muxing to have an effect.",
390           DEFAULT_USE_ROBUST_MUXING,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394       g_param_spec_boolean ("reset-muxer",
395           "Reset Muxer",
396           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400       g_param_spec_boolean ("async-finalize",
401           "Finalize fragments asynchronously",
402           "Finalize each fragment asynchronously and start a new one",
403           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405       g_param_spec_string ("muxer-factory", "Muxer factory",
406           "The muxer element factory to use (default = mp4mux). "
407           "Valid only for async-finalize = TRUE",
408           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409   /**
410    * GstSplitMuxSink:muxer-preset
411    *
412    * An optional #GstPreset name to use for the muxer. This only has an effect
413    * in `async-finalize=TRUE` mode.
414    *
415    * Since: 1.18
416    */
417   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418       g_param_spec_string ("muxer-preset", "Muxer preset",
419           "The muxer preset to use. "
420           "Valid only for async-finalize = TRUE",
421           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423       g_param_spec_boxed ("muxer-properties", "Muxer properties",
424           "The muxer element properties to use. "
425           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426           "Valid only for async-finalize = TRUE",
427           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429       g_param_spec_string ("sink-factory", "Sink factory",
430           "The sink element factory to use (default = filesink). "
431           "Valid only for async-finalize = TRUE",
432           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433   /**
434    * GstSplitMuxSink:sink-preset
435    *
436    * An optional #GstPreset name to use for the sink. This only has an effect
437    * in `async-finalize=TRUE` mode.
438    *
439    * Since: 1.18
440    */
441   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442       g_param_spec_string ("sink-preset", "Sink preset",
443           "The sink preset to use. "
444           "Valid only for async-finalize = TRUE",
445           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447       g_param_spec_boxed ("sink-properties", "Sink properties",
448           "The sink element properties to use. "
449           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450           "Valid only for async-finalize = TRUE",
451           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452   g_object_class_install_property (gobject_class, PROP_START_INDEX,
453       g_param_spec_int ("start-index", "Start Index",
454           "Start value of fragment index.",
455           0, G_MAXINT, DEFAULT_START_INDEX,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457
458   /**
459    * GstSplitMuxSink::muxer-pad-map
460    *
461    * An optional GstStructure that provides a map from splitmuxsink sinkpad
462    * names to muxer pad names they should feed. Splitmuxsink has some default
463    * mapping behaviour to link video to video pads and audio to audio pads
464    * that usually works fine. This property is useful if you need to ensure
465    * a particular mapping to muxed streams.
466    *
467    * The GstStructure contains string fields like so:
468    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
469    *
470    * Since: 1.18
471    */
472   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
475           GST_TYPE_STRUCTURE,
476           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
477
478   /**
479    * GstSplitMuxSink::format-location:
480    * @splitmux: the #GstSplitMuxSink
481    * @fragment_id: the sequence number of the file to be created
482    *
483    * Returns: the location to be used for the next output file. This must be
484    *    a newly-allocated string which will be freed with g_free() by the
485    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
486    *    g_strdup_printf() or similar functions to allocate it.
487    */
488   signals[SIGNAL_FORMAT_LOCATION] =
489       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
491
492   /**
493    * GstSplitMuxSink::format-location-full:
494    * @splitmux: the #GstSplitMuxSink
495    * @fragment_id: the sequence number of the file to be created
496    * @first_sample: A #GstSample containing the first buffer
497    *   from the reference stream in the new file
498    *
499    * Returns: the location to be used for the next output file. This must be
500    *    a newly-allocated string which will be freed with g_free() by the
501    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
502    *    g_strdup_printf() or similar functions to allocate it.
503    *
504    * Since: 1.12
505    */
506   signals[SIGNAL_FORMAT_LOCATION_FULL] =
507       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
509       GST_TYPE_SAMPLE);
510
511   /**
512    * GstSplitMuxSink::split-now:
513    * @splitmux: the #GstSplitMuxSink
514    *
515    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516    * The current GOP will be output to the new file.
517    *
518    * Since: 1.14
519    */
520   signals[SIGNAL_SPLIT_NOW] =
521       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
524       G_TYPE_NONE, 0);
525
526   /**
527    * GstSplitMuxSink::split-after:
528    * @splitmux: the #GstSplitMuxSink
529    *
530    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
532    *
533    * Since: 1.16
534    */
535   signals[SIGNAL_SPLIT_AFTER] =
536       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
539       G_TYPE_NONE, 0);
540
541   /**
542    * GstSplitMuxSink::split-at-running-time:
543    * @splitmux: the #GstSplitMuxSink
544    *
545    * When called by the user, this action signal splits the video file (and
546    * begins a new one) as soon as the given running time is reached. If this
547    * action signal is called multiple times, running times are queued up and
548    * processed in the order they were given.
549    *
550    * Note that this is prone to race conditions, where said running time is
551    * reached and surpassed before we had a chance to split. The file will
552    * still split immediately, but in order to make sure that the split doesn't
553    * happen too late, it is recommended to call this action signal from
554    * something that will prevent further buffers from flowing into
555    * splitmuxsink before the split is completed, such as a pad probe before
556    * splitmuxsink.
557    *
558    *
559    * Since: 1.16
560    */
561   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
566
567   /**
568    * GstSplitMuxSink::muxer-added:
569    * @splitmux: the #GstSplitMuxSink
570    * @muxer: the newly added muxer element
571    *
572    * Since: 1.14
573    */
574   signals[SIGNAL_MUXER_ADDED] =
575       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
577
578   /**
579    * GstSplitMuxSink::sink-added:
580    * @splitmux: the #GstSplitMuxSink
581    * @sink: the newly added sink element
582    *
583    * Since: 1.14
584    */
585   signals[SIGNAL_SINK_ADDED] =
586       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
588
589   klass->split_now = split_now;
590   klass->split_after = split_after;
591   klass->split_at_running_time = split_at_running_time;
592 }
593
594 static void
595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
596 {
597   g_mutex_init (&splitmux->lock);
598   g_mutex_init (&splitmux->state_lock);
599   g_cond_init (&splitmux->input_cond);
600   g_cond_init (&splitmux->output_cond);
601   g_queue_init (&splitmux->out_cmd_q);
602
603   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606   splitmux->max_files = DEFAULT_MAX_FILES;
607   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
611
612   splitmux->threshold_timecode_str = NULL;
613
614   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616   splitmux->muxer_properties = NULL;
617   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618   splitmux->sink_properties = NULL;
619
620   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621   splitmux->split_requested = FALSE;
622   splitmux->do_split_next_gop = FALSE;
623   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
625
626   g_queue_init (&splitmux->pending_input_gops);
627 }
628
629 static void
630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
631 {
632   if (splitmux->muxer) {
633     gst_element_set_locked_state (splitmux->muxer, TRUE);
634     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
636   }
637   if (splitmux->active_sink) {
638     gst_element_set_locked_state (splitmux->active_sink, TRUE);
639     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
641   }
642
643   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
644 }
645
646 static void
647 gst_splitmux_sink_dispose (GObject * object)
648 {
649   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
650
651   /* Calling parent dispose invalidates all child pointers */
652   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
653
654   G_OBJECT_CLASS (parent_class)->dispose (object);
655 }
656
657 static void
658 gst_splitmux_sink_finalize (GObject * object)
659 {
660   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
661
662   g_cond_clear (&splitmux->input_cond);
663   g_cond_clear (&splitmux->output_cond);
664   g_mutex_clear (&splitmux->lock);
665   g_mutex_clear (&splitmux->state_lock);
666   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667   g_queue_clear (&splitmux->out_cmd_q);
668   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669   g_queue_clear (&splitmux->pending_input_gops);
670
671   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
672
673   if (splitmux->muxerpad_map)
674     gst_structure_free (splitmux->muxerpad_map);
675
676   if (splitmux->provided_sink)
677     gst_object_unref (splitmux->provided_sink);
678   if (splitmux->provided_muxer)
679     gst_object_unref (splitmux->provided_muxer);
680
681   if (splitmux->muxer_factory)
682     g_free (splitmux->muxer_factory);
683   if (splitmux->muxer_preset)
684     g_free (splitmux->muxer_preset);
685   if (splitmux->muxer_properties)
686     gst_structure_free (splitmux->muxer_properties);
687   if (splitmux->sink_factory)
688     g_free (splitmux->sink_factory);
689   if (splitmux->sink_preset)
690     g_free (splitmux->sink_preset);
691   if (splitmux->sink_properties)
692     gst_structure_free (splitmux->sink_properties);
693
694   if (splitmux->threshold_timecode_str)
695     g_free (splitmux->threshold_timecode_str);
696   if (splitmux->tc_interval)
697     gst_video_time_code_interval_free (splitmux->tc_interval);
698
699   if (splitmux->times_to_split)
700     gst_queue_array_free (splitmux->times_to_split);
701
702   g_free (splitmux->location);
703
704   /* Make sure to free any un-released contexts. There should not be any,
705    * because the dispose will have freed all request pads though */
706   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707   g_list_free (splitmux->contexts);
708
709   G_OBJECT_CLASS (parent_class)->finalize (object);
710 }
711
712 /*
713  * Set any time threshold to the muxer, if it has
714  * reserved-max-duration and reserved-duration-remaining
715  * properties. Called when creating/claiming the muxer
716  * in create_elements() */
717 static void
718 update_muxer_properties (GstSplitMuxSink * sink)
719 {
720   GObjectClass *klass;
721   GstClockTime threshold_time;
722
723   sink->muxer_has_reserved_props = FALSE;
724   if (sink->muxer == NULL)
725     return;
726   klass = G_OBJECT_GET_CLASS (sink->muxer);
727   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
728     return;
729   if (g_object_class_find_property (klass,
730           "reserved-duration-remaining") == NULL)
731     return;
732   sink->muxer_has_reserved_props = TRUE;
733
734   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735       GST_TIME_ARGS (sink->threshold_time));
736   GST_OBJECT_LOCK (sink);
737   threshold_time = sink->threshold_time;
738   GST_OBJECT_UNLOCK (sink);
739
740   if (threshold_time > 0) {
741     /* Tell the muxer how much space to reserve */
742     GstClockTime muxer_threshold = threshold_time;
743     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
744   }
745 }
746
747 static void
748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749     const GValue * value, GParamSpec * pspec)
750 {
751   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
752
753   switch (prop_id) {
754     case PROP_LOCATION:{
755       GST_OBJECT_LOCK (splitmux);
756       g_free (splitmux->location);
757       splitmux->location = g_value_dup_string (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     }
761     case PROP_START_INDEX:
762       GST_OBJECT_LOCK (splitmux);
763       splitmux->start_index = g_value_get_int (value);
764       GST_OBJECT_UNLOCK (splitmux);
765       break;
766     case PROP_MAX_SIZE_BYTES:
767       GST_OBJECT_LOCK (splitmux);
768       splitmux->threshold_bytes = g_value_get_uint64 (value);
769       GST_OBJECT_UNLOCK (splitmux);
770       break;
771     case PROP_MAX_SIZE_TIME:
772       GST_OBJECT_LOCK (splitmux);
773       splitmux->threshold_time = g_value_get_uint64 (value);
774       GST_OBJECT_UNLOCK (splitmux);
775       break;
776     case PROP_MAX_SIZE_TIMECODE:
777       GST_OBJECT_LOCK (splitmux);
778       g_free (splitmux->threshold_timecode_str);
779       /* will be calculated later */
780       g_clear_pointer (&splitmux->tc_interval,
781           gst_video_time_code_interval_free);
782
783       splitmux->threshold_timecode_str = g_value_dup_string (value);
784       if (splitmux->threshold_timecode_str) {
785         splitmux->tc_interval =
786             gst_video_time_code_interval_new_from_string
787             (splitmux->threshold_timecode_str);
788         if (!splitmux->tc_interval) {
789           g_warning ("Wrong timecode string %s",
790               splitmux->threshold_timecode_str);
791           g_free (splitmux->threshold_timecode_str);
792           splitmux->threshold_timecode_str = NULL;
793         }
794       }
795       splitmux->next_fragment_start_tc_time =
796           calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
797           splitmux->fragment_start_time, NULL);
798       if (splitmux->tc_interval && splitmux->fragment_start_tc
799           && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
800         GST_WARNING_OBJECT (splitmux,
801             "Couldn't calculate next fragment start time for timecode mode");
802       }
803       GST_OBJECT_UNLOCK (splitmux);
804       break;
805     case PROP_SEND_KEYFRAME_REQUESTS:
806       GST_OBJECT_LOCK (splitmux);
807       splitmux->send_keyframe_requests = g_value_get_boolean (value);
808       GST_OBJECT_UNLOCK (splitmux);
809       break;
810     case PROP_MAX_FILES:
811       GST_OBJECT_LOCK (splitmux);
812       splitmux->max_files = g_value_get_uint (value);
813       GST_OBJECT_UNLOCK (splitmux);
814       break;
815     case PROP_MUXER_OVERHEAD:
816       GST_OBJECT_LOCK (splitmux);
817       splitmux->mux_overhead = g_value_get_double (value);
818       GST_OBJECT_UNLOCK (splitmux);
819       break;
820     case PROP_USE_ROBUST_MUXING:
821       GST_OBJECT_LOCK (splitmux);
822       splitmux->use_robust_muxing = g_value_get_boolean (value);
823       GST_OBJECT_UNLOCK (splitmux);
824       if (splitmux->use_robust_muxing)
825         update_muxer_properties (splitmux);
826       break;
827     case PROP_ALIGNMENT_THRESHOLD:
828       GST_OBJECT_LOCK (splitmux);
829       splitmux->alignment_threshold = g_value_get_uint64 (value);
830       GST_OBJECT_UNLOCK (splitmux);
831       break;
832     case PROP_SINK:
833       GST_OBJECT_LOCK (splitmux);
834       gst_clear_object (&splitmux->provided_sink);
835       splitmux->provided_sink = g_value_get_object (value);
836       if (splitmux->provided_sink)
837         gst_object_ref_sink (splitmux->provided_sink);
838       GST_OBJECT_UNLOCK (splitmux);
839       break;
840     case PROP_MUXER:
841       GST_OBJECT_LOCK (splitmux);
842       gst_clear_object (&splitmux->provided_muxer);
843       splitmux->provided_muxer = g_value_get_object (value);
844       if (splitmux->provided_muxer)
845         gst_object_ref_sink (splitmux->provided_muxer);
846       GST_OBJECT_UNLOCK (splitmux);
847       break;
848     case PROP_RESET_MUXER:
849       GST_OBJECT_LOCK (splitmux);
850       splitmux->reset_muxer = g_value_get_boolean (value);
851       GST_OBJECT_UNLOCK (splitmux);
852       break;
853     case PROP_ASYNC_FINALIZE:
854       GST_OBJECT_LOCK (splitmux);
855       splitmux->async_finalize = g_value_get_boolean (value);
856       GST_OBJECT_UNLOCK (splitmux);
857       break;
858     case PROP_MUXER_FACTORY:
859       GST_OBJECT_LOCK (splitmux);
860       if (splitmux->muxer_factory)
861         g_free (splitmux->muxer_factory);
862       splitmux->muxer_factory = g_value_dup_string (value);
863       GST_OBJECT_UNLOCK (splitmux);
864       break;
865     case PROP_MUXER_PRESET:
866       GST_OBJECT_LOCK (splitmux);
867       if (splitmux->muxer_preset)
868         g_free (splitmux->muxer_preset);
869       splitmux->muxer_preset = g_value_dup_string (value);
870       GST_OBJECT_UNLOCK (splitmux);
871       break;
872     case PROP_MUXER_PROPERTIES:
873       GST_OBJECT_LOCK (splitmux);
874       if (splitmux->muxer_properties)
875         gst_structure_free (splitmux->muxer_properties);
876       if (gst_value_get_structure (value))
877         splitmux->muxer_properties =
878             gst_structure_copy (gst_value_get_structure (value));
879       else
880         splitmux->muxer_properties = NULL;
881       GST_OBJECT_UNLOCK (splitmux);
882       break;
883     case PROP_SINK_FACTORY:
884       GST_OBJECT_LOCK (splitmux);
885       if (splitmux->sink_factory)
886         g_free (splitmux->sink_factory);
887       splitmux->sink_factory = g_value_dup_string (value);
888       GST_OBJECT_UNLOCK (splitmux);
889       break;
890     case PROP_SINK_PRESET:
891       GST_OBJECT_LOCK (splitmux);
892       if (splitmux->sink_preset)
893         g_free (splitmux->sink_preset);
894       splitmux->sink_preset = g_value_dup_string (value);
895       GST_OBJECT_UNLOCK (splitmux);
896       break;
897     case PROP_SINK_PROPERTIES:
898       GST_OBJECT_LOCK (splitmux);
899       if (splitmux->sink_properties)
900         gst_structure_free (splitmux->sink_properties);
901       if (gst_value_get_structure (value))
902         splitmux->sink_properties =
903             gst_structure_copy (gst_value_get_structure (value));
904       else
905         splitmux->sink_properties = NULL;
906       GST_OBJECT_UNLOCK (splitmux);
907       break;
908     case PROP_MUXERPAD_MAP:
909     {
910       const GstStructure *s = gst_value_get_structure (value);
911       GST_SPLITMUX_LOCK (splitmux);
912       if (splitmux->muxerpad_map) {
913         gst_structure_free (splitmux->muxerpad_map);
914       }
915       if (s)
916         splitmux->muxerpad_map = gst_structure_copy (s);
917       else
918         splitmux->muxerpad_map = NULL;
919       GST_SPLITMUX_UNLOCK (splitmux);
920       break;
921     }
922     default:
923       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
924       break;
925   }
926 }
927
928 static void
929 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
930     GValue * value, GParamSpec * pspec)
931 {
932   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
933
934   switch (prop_id) {
935     case PROP_LOCATION:
936       GST_OBJECT_LOCK (splitmux);
937       g_value_set_string (value, splitmux->location);
938       GST_OBJECT_UNLOCK (splitmux);
939       break;
940     case PROP_START_INDEX:
941       GST_OBJECT_LOCK (splitmux);
942       g_value_set_int (value, splitmux->start_index);
943       GST_OBJECT_UNLOCK (splitmux);
944       break;
945     case PROP_MAX_SIZE_BYTES:
946       GST_OBJECT_LOCK (splitmux);
947       g_value_set_uint64 (value, splitmux->threshold_bytes);
948       GST_OBJECT_UNLOCK (splitmux);
949       break;
950     case PROP_MAX_SIZE_TIME:
951       GST_OBJECT_LOCK (splitmux);
952       g_value_set_uint64 (value, splitmux->threshold_time);
953       GST_OBJECT_UNLOCK (splitmux);
954       break;
955     case PROP_MAX_SIZE_TIMECODE:
956       GST_OBJECT_LOCK (splitmux);
957       g_value_set_string (value, splitmux->threshold_timecode_str);
958       GST_OBJECT_UNLOCK (splitmux);
959       break;
960     case PROP_SEND_KEYFRAME_REQUESTS:
961       GST_OBJECT_LOCK (splitmux);
962       g_value_set_boolean (value, splitmux->send_keyframe_requests);
963       GST_OBJECT_UNLOCK (splitmux);
964       break;
965     case PROP_MAX_FILES:
966       GST_OBJECT_LOCK (splitmux);
967       g_value_set_uint (value, splitmux->max_files);
968       GST_OBJECT_UNLOCK (splitmux);
969       break;
970     case PROP_MUXER_OVERHEAD:
971       GST_OBJECT_LOCK (splitmux);
972       g_value_set_double (value, splitmux->mux_overhead);
973       GST_OBJECT_UNLOCK (splitmux);
974       break;
975     case PROP_USE_ROBUST_MUXING:
976       GST_OBJECT_LOCK (splitmux);
977       g_value_set_boolean (value, splitmux->use_robust_muxing);
978       GST_OBJECT_UNLOCK (splitmux);
979       break;
980     case PROP_ALIGNMENT_THRESHOLD:
981       GST_OBJECT_LOCK (splitmux);
982       g_value_set_uint64 (value, splitmux->alignment_threshold);
983       GST_OBJECT_UNLOCK (splitmux);
984       break;
985     case PROP_SINK:
986       GST_OBJECT_LOCK (splitmux);
987       g_value_set_object (value, splitmux->provided_sink);
988       GST_OBJECT_UNLOCK (splitmux);
989       break;
990     case PROP_MUXER:
991       GST_OBJECT_LOCK (splitmux);
992       g_value_set_object (value, splitmux->provided_muxer);
993       GST_OBJECT_UNLOCK (splitmux);
994       break;
995     case PROP_RESET_MUXER:
996       GST_OBJECT_LOCK (splitmux);
997       g_value_set_boolean (value, splitmux->reset_muxer);
998       GST_OBJECT_UNLOCK (splitmux);
999       break;
1000     case PROP_ASYNC_FINALIZE:
1001       GST_OBJECT_LOCK (splitmux);
1002       g_value_set_boolean (value, splitmux->async_finalize);
1003       GST_OBJECT_UNLOCK (splitmux);
1004       break;
1005     case PROP_MUXER_FACTORY:
1006       GST_OBJECT_LOCK (splitmux);
1007       g_value_set_string (value, splitmux->muxer_factory);
1008       GST_OBJECT_UNLOCK (splitmux);
1009       break;
1010     case PROP_MUXER_PRESET:
1011       GST_OBJECT_LOCK (splitmux);
1012       g_value_set_string (value, splitmux->muxer_preset);
1013       GST_OBJECT_UNLOCK (splitmux);
1014       break;
1015     case PROP_MUXER_PROPERTIES:
1016       GST_OBJECT_LOCK (splitmux);
1017       gst_value_set_structure (value, splitmux->muxer_properties);
1018       GST_OBJECT_UNLOCK (splitmux);
1019       break;
1020     case PROP_SINK_FACTORY:
1021       GST_OBJECT_LOCK (splitmux);
1022       g_value_set_string (value, splitmux->sink_factory);
1023       GST_OBJECT_UNLOCK (splitmux);
1024       break;
1025     case PROP_SINK_PRESET:
1026       GST_OBJECT_LOCK (splitmux);
1027       g_value_set_string (value, splitmux->sink_preset);
1028       GST_OBJECT_UNLOCK (splitmux);
1029       break;
1030     case PROP_SINK_PROPERTIES:
1031       GST_OBJECT_LOCK (splitmux);
1032       gst_value_set_structure (value, splitmux->sink_properties);
1033       GST_OBJECT_UNLOCK (splitmux);
1034       break;
1035     case PROP_MUXERPAD_MAP:
1036       GST_SPLITMUX_LOCK (splitmux);
1037       gst_value_set_structure (value, splitmux->muxerpad_map);
1038       GST_SPLITMUX_UNLOCK (splitmux);
1039       break;
1040     default:
1041       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1042       break;
1043   }
1044 }
1045
1046 /* Convenience function */
1047 static inline GstClockTimeDiff
1048 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1049 {
1050   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1051
1052   if (GST_CLOCK_TIME_IS_VALID (val)) {
1053     gboolean sign =
1054         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1055     if (sign > 0)
1056       res = val;
1057     else if (sign < 0)
1058       res = -val;
1059   }
1060   return res;
1061 }
1062
1063 static void
1064 mq_stream_ctx_reset (MqStreamCtx * ctx)
1065 {
1066   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1067   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1068   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1069   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1070   g_queue_clear (&ctx->queued_bufs);
1071 }
1072
1073 static MqStreamCtx *
1074 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1075 {
1076   MqStreamCtx *ctx;
1077
1078   ctx = g_new0 (MqStreamCtx, 1);
1079   ctx->splitmux = splitmux;
1080   g_queue_init (&ctx->queued_bufs);
1081   mq_stream_ctx_reset (ctx);
1082
1083   return ctx;
1084 }
1085
1086 static void
1087 mq_stream_ctx_free (MqStreamCtx * ctx)
1088 {
1089   if (ctx->q) {
1090     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1091
1092     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1093
1094     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1095       gst_element_set_locked_state (ctx->q, TRUE);
1096       gst_element_set_state (ctx->q, GST_STATE_NULL);
1097       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1098       gst_object_unref (parent);
1099     }
1100     gst_object_unref (ctx->q);
1101   }
1102   gst_object_unref (ctx->sinkpad);
1103   gst_object_unref (ctx->srcpad);
1104   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1105   g_queue_clear (&ctx->queued_bufs);
1106   g_free (ctx);
1107 }
1108
1109 static void
1110 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1111     GstElement * sink)
1112 {
1113   gchar *location = NULL;
1114   GstMessage *msg;
1115   const gchar *msg_name = opened ?
1116       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1117   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1118
1119   if (!opened) {
1120     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1121     if (rtime)
1122       running_time = *rtime;
1123   }
1124
1125   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1126           "location") != NULL)
1127     g_object_get (sink, "location", &location, NULL);
1128
1129   GST_DEBUG_OBJECT (splitmux,
1130       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1131       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1132
1133   /* If it's in the middle of a teardown, the reference_ctc might have become
1134    * NULL */
1135   if (splitmux->reference_ctx) {
1136     msg = gst_message_new_element (GST_OBJECT (splitmux),
1137         gst_structure_new (msg_name,
1138             "location", G_TYPE_STRING, location,
1139             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1140             "sink", GST_TYPE_ELEMENT, sink, NULL));
1141     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1142   }
1143
1144   g_free (location);
1145 }
1146
1147 static void
1148 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1149 {
1150   GstEvent *eos;
1151   GstPad *pad;
1152   MqStreamCtx *ctx;
1153
1154   eos = gst_event_new_eos ();
1155   pad = helper->pad;
1156   ctx = helper->ctx;
1157
1158   GST_SPLITMUX_LOCK (splitmux);
1159   if (!pad)
1160     pad = gst_pad_get_peer (ctx->srcpad);
1161   GST_SPLITMUX_UNLOCK (splitmux);
1162
1163   gst_pad_send_event (pad, eos);
1164   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1165
1166   gst_object_unref (pad);
1167   g_free (helper);
1168 }
1169
1170 /* Called with lock held, drops the lock to send EOS to the
1171  * pad
1172  */
1173 static void
1174 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1175 {
1176   GstEvent *eos;
1177   GstPad *pad;
1178
1179   eos = gst_event_new_eos ();
1180   pad = gst_pad_get_peer (ctx->srcpad);
1181
1182   ctx->out_eos = TRUE;
1183
1184   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1185   GST_SPLITMUX_UNLOCK (splitmux);
1186   gst_pad_send_event (pad, eos);
1187   GST_SPLITMUX_LOCK (splitmux);
1188
1189   gst_object_unref (pad);
1190 }
1191
1192 /* Called with lock held. Schedules an EOS event to the ctx pad
1193  * to happen in another thread */
1194 static void
1195 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1196 {
1197   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1198   GstPad *srcpad, *sinkpad;
1199
1200   srcpad = ctx->srcpad;
1201   sinkpad = gst_pad_get_peer (srcpad);
1202
1203   helper->ctx = ctx;
1204   helper->pad = sinkpad;        /* Takes the reference */
1205
1206   ctx->out_eos_async_done = TRUE;
1207
1208   /* There used to be a bug here, where we had to explicitly remove
1209    * the SINK flag so that GstBin would ignore it for EOS purposes.
1210    * That fixed a race where if splitmuxsink really reaches EOS
1211    * before an asynchronous background element has finished, then
1212    * the bin wouldn't actually send EOS to the pipeline. Even after
1213    * finishing and removing the old element, the bin didn't re-check
1214    * EOS status on removing a SINK element. That bug was fixed
1215    * in core. */
1216   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1217       sinkpad, ctx);
1218
1219   g_assert_nonnull (helper->pad);
1220   gst_element_call_async (GST_ELEMENT (splitmux),
1221       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1222 }
1223
1224 /* Called with lock held. TRUE iff all contexts have a
1225  * pending (or delivered) async eos event */
1226 static gboolean
1227 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1228 {
1229   gboolean ret = TRUE;
1230   GList *item;
1231
1232   for (item = splitmux->contexts; item; item = item->next) {
1233     MqStreamCtx *ctx = item->data;
1234     ret &= ctx->out_eos_async_done;
1235   }
1236   return ret;
1237 }
1238
1239 /* Called with splitmux lock held to check if this output
1240  * context needs to sleep to wait for the release of the
1241  * next GOP, or to send EOS to close out the current file
1242  */
1243 static GstFlowReturn
1244 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1245 {
1246   if (ctx->caps_change)
1247     return GST_FLOW_OK;
1248
1249   do {
1250     /* When first starting up, the reference stream has to output
1251      * the first buffer to prepare the muxer and sink */
1252     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1253     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1254
1255     if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1256         && my_max_out_running_time != G_MAXINT64) {
1257       my_max_out_running_time -= splitmux->alignment_threshold;
1258       GST_LOG_OBJECT (ctx->srcpad,
1259           "Max out running time currently %" GST_STIME_FORMAT
1260           ", with threshold applied it is %" GST_STIME_FORMAT,
1261           GST_STIME_ARGS (splitmux->max_out_running_time),
1262           GST_STIME_ARGS (my_max_out_running_time));
1263     }
1264
1265     if (ctx->flushing
1266         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1267       return GST_FLOW_FLUSHING;
1268
1269     GST_LOG_OBJECT (ctx->srcpad,
1270         "Checking running time %" GST_STIME_FORMAT " against max %"
1271         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1272         GST_STIME_ARGS (my_max_out_running_time));
1273
1274     if (can_output) {
1275       if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1276           ctx->out_running_time < my_max_out_running_time) {
1277         return GST_FLOW_OK;
1278       }
1279
1280       switch (splitmux->output_state) {
1281         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1282           /* We only get here if we've finished outputting a GOP and need to know
1283            * what to do next */
1284           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1285           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1286           continue;
1287
1288         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1289         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1290           /* We've reached the max out running_time to get here, so end this file now */
1291           if (ctx->out_eos == FALSE) {
1292             if (splitmux->async_finalize) {
1293               /* We must set EOS asynchronously at this point. We cannot defer
1294                * it, because we need all contexts to wake up, for the
1295                * reference context to eventually give us something at
1296                * START_NEXT_FILE. Otherwise, collectpads might choose another
1297                * context to give us the first buffer, and format-location-full
1298                * will not contain a valid sample. */
1299               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1300                   GINT_TO_POINTER (1));
1301               eos_context_async (ctx, splitmux);
1302               if (all_contexts_are_async_eos (splitmux)) {
1303                 GST_INFO_OBJECT (splitmux,
1304                     "All contexts are async_eos. Moving to the next file.");
1305                 /* We can start the next file once we've asked each pad to go EOS */
1306                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1307                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1308                 continue;
1309               }
1310             } else {
1311               send_eos (splitmux, ctx);
1312               continue;
1313             }
1314           } else {
1315             GST_INFO_OBJECT (splitmux,
1316                 "At end-of-file state, but context %p is already EOS", ctx);
1317           }
1318           break;
1319         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1320           if (ctx->is_reference) {
1321             GstFlowReturn ret = GST_FLOW_OK;
1322
1323             /* Special handling on the reference ctx to start new fragments
1324              * and collect commands from the command queue */
1325             /* drops the splitmux lock briefly: */
1326             /* We must have reference ctx in order for format-location-full to
1327              * have a sample */
1328             ret = start_next_fragment (splitmux, ctx);
1329             if (ret != GST_FLOW_OK)
1330               return ret;
1331
1332             continue;
1333           }
1334           break;
1335         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1336           do {
1337             SplitMuxOutputCommand *cmd =
1338                 g_queue_pop_tail (&splitmux->out_cmd_q);
1339             if (cmd != NULL) {
1340               /* If we pop the last command, we need to make our queues bigger */
1341               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1342                 grow_blocked_queues (splitmux);
1343
1344               if (cmd->start_new_fragment) {
1345                 if (splitmux->muxed_out_bytes > 0) {
1346                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1347                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1348                 } else {
1349                   GST_DEBUG_OBJECT (splitmux,
1350                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1351                 }
1352               } else {
1353                 GST_DEBUG_OBJECT (splitmux,
1354                     "Got new output cmd for time %" GST_STIME_FORMAT,
1355                     GST_STIME_ARGS (cmd->max_output_ts));
1356
1357                 /* Extend the output range immediately */
1358                 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1359                     || cmd->max_output_ts > splitmux->max_out_running_time)
1360                   splitmux->max_out_running_time = cmd->max_output_ts;
1361                 GST_DEBUG_OBJECT (splitmux,
1362                     "Max out running time now %" GST_STIME_FORMAT,
1363                     GST_STIME_ARGS (splitmux->max_out_running_time));
1364                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1365               }
1366               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1367
1368               out_cmd_buf_free (cmd);
1369               break;
1370             } else {
1371               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1372             }
1373           } while (!ctx->flushing && splitmux->output_state ==
1374               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1375           /* loop and re-check the state */
1376           continue;
1377         }
1378         case SPLITMUX_OUTPUT_STATE_STOPPED:
1379           return GST_FLOW_FLUSHING;
1380       }
1381     } else {
1382       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1383     }
1384
1385     GST_INFO_OBJECT (ctx->srcpad,
1386         "Sleeping for running time %"
1387         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1388         GST_STIME_ARGS (ctx->out_running_time),
1389         GST_STIME_ARGS (splitmux->max_out_running_time));
1390     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1391     GST_INFO_OBJECT (ctx->srcpad,
1392         "Woken for new max running time %" GST_STIME_FORMAT,
1393         GST_STIME_ARGS (splitmux->max_out_running_time));
1394   }
1395   while (1);
1396
1397   return GST_FLOW_OK;
1398 }
1399
1400 static GstClockTime
1401 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1402     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1403     GstVideoTimeCode ** next_tc)
1404 {
1405   GstVideoTimeCode *target_tc;
1406   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1407
1408   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1409     return GST_CLOCK_TIME_NONE;
1410
1411   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1412   if (!target_tc) {
1413     GST_ELEMENT_ERROR (splitmux,
1414         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1415     return GST_CLOCK_TIME_NONE;
1416   }
1417
1418   /* Convert to ns */
1419   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1420   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1421
1422   /* Add running_time, accounting for wraparound. */
1423   if (target_tc_time >= cur_tc_time) {
1424     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1425   } else {
1426     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1427
1428     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1429         (cur_tc->config.fps_d == 1001)) {
1430       /* Checking fps_d is probably unneeded, but better safe than sorry
1431        * (e.g. someone accidentally set a flag) */
1432       GstVideoTimeCode *tc_for_offset;
1433
1434       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1435        * but slightly less. Calculate that duration from a fake timecode. The
1436        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1437        * is to add one frame to 23:59:59;29 */
1438       tc_for_offset =
1439           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1440           NULL, cur_tc->config.flags, 23, 59, 59,
1441           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1442       day_in_ns =
1443           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1444           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1445           cur_tc->config.fps_n);
1446       gst_video_time_code_free (tc_for_offset);
1447     }
1448     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1449   }
1450
1451 #ifndef GST_DISABLE_GST_DEBUG
1452   {
1453     gchar *next_max_tc_str, *cur_tc_str;
1454
1455     cur_tc_str = gst_video_time_code_to_string (cur_tc);
1456     next_max_tc_str = gst_video_time_code_to_string (target_tc);
1457
1458     GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1459         " from ref timecode %s time: %" GST_TIME_FORMAT,
1460         next_max_tc_str,
1461         GST_TIME_ARGS (next_max_tc_time),
1462         cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1463
1464     g_free (next_max_tc_str);
1465     g_free (cur_tc_str);
1466   }
1467 #endif
1468
1469   if (next_tc)
1470     *next_tc = target_tc;
1471   else
1472     gst_video_time_code_free (target_tc);
1473
1474   return next_max_tc_time;
1475 }
1476
1477 static gboolean
1478 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1479     GstClockTimeDiff running_time_dts)
1480 {
1481   GstEvent *ev;
1482   GstClockTime target_time;
1483   gboolean timecode_based = FALSE;
1484   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1485   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1486   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1487   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1488   InputGop *newest_gop = NULL;
1489   GList *l;
1490
1491   if (!splitmux->send_keyframe_requests)
1492     return TRUE;
1493
1494   /* Find the newest GOP where we passed in DTS the start PTS */
1495   for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1496     InputGop *tmp = l->data;
1497
1498     GST_TRACE_OBJECT (splitmux,
1499         "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1500         " and start time %" GST_STIME_FORMAT,
1501         GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1502
1503     if (tmp->sent_fku) {
1504       GST_DEBUG_OBJECT (splitmux,
1505           "Already checked for a keyframe request for this GOP");
1506       return TRUE;
1507     }
1508
1509     if (running_time_dts == GST_CLOCK_STIME_NONE ||
1510         tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1511         running_time_dts >= tmp->start_time_pts) {
1512       GST_DEBUG_OBJECT (splitmux,
1513           "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1514           GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1515           GST_STIME_ARGS (tmp->start_time));
1516       newest_gop = tmp;
1517       break;
1518     }
1519   }
1520
1521   if (!newest_gop) {
1522     GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1523     return TRUE;
1524   }
1525
1526   if (splitmux->tc_interval) {
1527     if (newest_gop->start_tc
1528         && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1529       GstVideoTimeCode *next_tc = NULL;
1530       max_tc_time =
1531           calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1532           newest_gop->start_time, &next_tc);
1533
1534       /* calculate the next expected keyframe time to prevent too early fku
1535        * event */
1536       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1537         next_max_tc_time =
1538             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1539       }
1540       if (next_tc)
1541         gst_video_time_code_free (next_tc);
1542
1543       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1544           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1545
1546       if (!timecode_based) {
1547         GST_WARNING_OBJECT (splitmux,
1548             "Couldn't calculate maximum fragment time for timecode mode");
1549       }
1550     } else {
1551       /* This can happen in the presence of GAP events that trigger
1552        * a new fragment start */
1553       GST_WARNING_OBJECT (splitmux,
1554           "No buffer available to calculate next timecode");
1555     }
1556   }
1557
1558   if ((splitmux->threshold_time == 0 && !timecode_based)
1559       || splitmux->threshold_bytes != 0)
1560     return TRUE;
1561
1562   if (timecode_based) {
1563     /* We might have rounding errors: aim slightly earlier */
1564     if (max_tc_time >= tc_rounding_error) {
1565       target_time = max_tc_time - tc_rounding_error;
1566     } else {
1567       /* unreliable target time */
1568       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1569           " is smaller than allowed rounding error, set it to zero",
1570           GST_TIME_ARGS (max_tc_time));
1571       target_time = 0;
1572     }
1573
1574     if (next_max_tc_time >= tc_rounding_error) {
1575       next_fku_time = next_max_tc_time - tc_rounding_error;
1576     } else {
1577       /* unreliable target time */
1578       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1579           " is smaller than allowed rounding error, set it to zero",
1580           GST_TIME_ARGS (next_max_tc_time));
1581       next_fku_time = 0;
1582     }
1583   } else {
1584     target_time = newest_gop->start_time + splitmux->threshold_time;
1585   }
1586
1587   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1588     GstClockTime allowed_time = splitmux->next_fku_time;
1589
1590     if (timecode_based) {
1591       if (allowed_time >= tc_rounding_error) {
1592         allowed_time -= tc_rounding_error;
1593       } else {
1594         /* unreliable next force key unit time */
1595         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1596             GST_TIME_FORMAT
1597             " is smaller than allowed rounding error, set it to zero",
1598             GST_TIME_ARGS (splitmux->next_fku_time));
1599         allowed_time = 0;
1600       }
1601     }
1602
1603     if (target_time < allowed_time) {
1604       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1605           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1606           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1607           GST_TIME_ARGS (target_time),
1608           GST_TIME_ARGS (splitmux->next_fku_time),
1609           GST_TIME_ARGS (allowed_time));
1610
1611       return TRUE;
1612     } else if (allowed_time != splitmux->next_fku_time &&
1613         target_time < splitmux->next_fku_time) {
1614       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1615           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1616           ", but the difference is smaller than allowed rounding error",
1617           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1618     }
1619   }
1620
1621   if (!timecode_based) {
1622     next_fku_time = target_time + splitmux->threshold_time;
1623   }
1624
1625   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1626       ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1627       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1628
1629   newest_gop->sent_fku = TRUE;
1630
1631   splitmux->next_fku_time = next_fku_time;
1632   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1633
1634   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1635 }
1636
1637 static GstPadProbeReturn
1638 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1639 {
1640   GstSplitMuxSink *splitmux = ctx->splitmux;
1641   MqStreamBuf *buf_info = NULL;
1642   GstFlowReturn ret = GST_FLOW_OK;
1643
1644   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1645
1646   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1647   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1648     g_warning ("Buffer list handling not implemented");
1649     return GST_PAD_PROBE_DROP;
1650   }
1651   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1652       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1653     GstEvent *event = gst_pad_probe_info_get_event (info);
1654     gboolean locked = FALSE, wait = !ctx->is_reference;
1655
1656     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1657
1658     switch (GST_EVENT_TYPE (event)) {
1659       case GST_EVENT_SEGMENT:
1660         gst_event_copy_segment (event, &ctx->out_segment);
1661         break;
1662       case GST_EVENT_FLUSH_STOP:
1663         GST_SPLITMUX_LOCK (splitmux);
1664         locked = TRUE;
1665         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1666         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1667         g_queue_clear (&ctx->queued_bufs);
1668         g_queue_clear (&ctx->queued_bufs);
1669         /* If this is the reference context, we just threw away any queued keyframes */
1670         if (ctx->is_reference)
1671           splitmux->queued_keyframes = 0;
1672         ctx->flushing = FALSE;
1673         wait = FALSE;
1674         break;
1675       case GST_EVENT_FLUSH_START:
1676         GST_SPLITMUX_LOCK (splitmux);
1677         locked = TRUE;
1678         GST_LOG_OBJECT (pad, "Flush start");
1679         ctx->flushing = TRUE;
1680         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1681         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1682         break;
1683       case GST_EVENT_EOS:
1684         GST_SPLITMUX_LOCK (splitmux);
1685         locked = TRUE;
1686         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1687           goto beach;
1688         ctx->out_eos = TRUE;
1689
1690         if (ctx == splitmux->reference_ctx) {
1691           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1692           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1693         }
1694
1695         GST_INFO_OBJECT (splitmux,
1696             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1697         break;
1698       case GST_EVENT_GAP:{
1699         GstClockTime gap_ts;
1700         GstClockTimeDiff rtime;
1701
1702         gst_event_parse_gap (event, &gap_ts, NULL);
1703         if (gap_ts == GST_CLOCK_TIME_NONE)
1704           break;
1705
1706         GST_SPLITMUX_LOCK (splitmux);
1707         locked = TRUE;
1708
1709         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1710           goto beach;
1711
1712         /* When we get a gap event on the
1713          * reference stream and we're trying to open a
1714          * new file, we need to store it until we get
1715          * the buffer afterwards
1716          */
1717         if (ctx->is_reference &&
1718             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1719           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1720           gst_event_replace (&ctx->pending_gap, event);
1721           GST_SPLITMUX_UNLOCK (splitmux);
1722           return GST_PAD_PROBE_HANDLED;
1723         }
1724
1725         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1726
1727         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1728             GST_STIME_ARGS (rtime));
1729
1730         if (rtime != GST_CLOCK_STIME_NONE) {
1731           ctx->out_running_time = rtime;
1732           complete_or_wait_on_out (splitmux, ctx);
1733         }
1734         break;
1735       }
1736       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1737         const GstStructure *s;
1738         GstClockTimeDiff ts = 0;
1739
1740         s = gst_event_get_structure (event);
1741         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1742           break;
1743
1744         gst_structure_get_int64 (s, "timestamp", &ts);
1745
1746         GST_SPLITMUX_LOCK (splitmux);
1747         locked = TRUE;
1748
1749         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1750           goto beach;
1751         ctx->out_running_time = ts;
1752         if (!ctx->is_reference)
1753           ret = complete_or_wait_on_out (splitmux, ctx);
1754         GST_SPLITMUX_UNLOCK (splitmux);
1755         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1756         return GST_PAD_PROBE_DROP;
1757       }
1758       case GST_EVENT_CAPS:{
1759         GstPad *peer;
1760
1761         if (!ctx->is_reference)
1762           break;
1763
1764         peer = gst_pad_get_peer (pad);
1765         if (peer) {
1766           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1767
1768           gst_object_unref (peer);
1769
1770           if (ok)
1771             break;
1772
1773         } else {
1774           break;
1775         }
1776         /* This is in the case the muxer doesn't allow this change of caps */
1777         GST_SPLITMUX_LOCK (splitmux);
1778         locked = TRUE;
1779         ctx->caps_change = TRUE;
1780
1781         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1782           GST_DEBUG_OBJECT (splitmux,
1783               "New caps were not accepted. Switching output file");
1784           if (ctx->out_eos == FALSE) {
1785             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1786             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1787           }
1788         }
1789
1790         /* Lets it fall through, if it fails again, then the muxer just can't
1791          * support this format, but at least we have a closed file.
1792          */
1793         break;
1794       }
1795       default:
1796         break;
1797     }
1798
1799     /* We need to make sure events aren't passed
1800      * until the muxer / sink are ready for it */
1801     if (!locked)
1802       GST_SPLITMUX_LOCK (splitmux);
1803     if (wait)
1804       ret = complete_or_wait_on_out (splitmux, ctx);
1805     GST_SPLITMUX_UNLOCK (splitmux);
1806
1807     /* Don't try to forward sticky events before the next buffer is there
1808      * because it would cause a new file to be created without the first
1809      * buffer being available.
1810      */
1811     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1812     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1813       gst_event_unref (event);
1814       return GST_PAD_PROBE_HANDLED;
1815     } else {
1816       return GST_PAD_PROBE_PASS;
1817     }
1818   }
1819
1820   /* Allow everything through until the configured next stopping point */
1821   GST_SPLITMUX_LOCK (splitmux);
1822
1823   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1824   if (buf_info == NULL) {
1825     /* Can only happen due to a poorly timed flush */
1826     ret = GST_FLOW_FLUSHING;
1827     goto beach;
1828   }
1829
1830   /* If we have popped a keyframe, decrement the queued_gop count */
1831   if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1832     splitmux->queued_keyframes--;
1833
1834   ctx->out_running_time = buf_info->run_ts;
1835   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1836
1837   GST_LOG_OBJECT (splitmux,
1838       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1839       " size %" G_GUINT64_FORMAT,
1840       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1841
1842   ctx->caps_change = FALSE;
1843
1844   ret = complete_or_wait_on_out (splitmux, ctx);
1845
1846   splitmux->muxed_out_bytes += buf_info->buf_size;
1847
1848 #ifndef GST_DISABLE_GST_DEBUG
1849   {
1850     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1851     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1852         " run ts %" GST_STIME_FORMAT, buf,
1853         GST_STIME_ARGS (ctx->out_running_time));
1854   }
1855 #endif
1856
1857   ctx->cur_out_buffer = NULL;
1858   GST_SPLITMUX_UNLOCK (splitmux);
1859
1860   /* pending_gap is protected by the STREAM lock */
1861   if (ctx->pending_gap) {
1862     /* If we previously stored a gap event, send it now */
1863     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1864
1865     GST_DEBUG_OBJECT (splitmux,
1866         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1867
1868     gst_pad_send_event (peer, ctx->pending_gap);
1869     ctx->pending_gap = NULL;
1870
1871     gst_object_unref (peer);
1872   }
1873
1874   mq_stream_buf_free (buf_info);
1875
1876   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1877   return GST_PAD_PROBE_PASS;
1878
1879 beach:
1880   GST_SPLITMUX_UNLOCK (splitmux);
1881   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1882   return GST_PAD_PROBE_DROP;
1883 }
1884
1885 static gboolean
1886 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1887 {
1888   return gst_pad_send_event (peer, gst_event_ref (*event));
1889 }
1890
1891 static void
1892 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1893 {
1894   if (ctx->fragment_block_id > 0) {
1895     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1896     ctx->fragment_block_id = 0;
1897   }
1898 }
1899
1900 static void
1901 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1902 {
1903   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1904
1905   gst_pad_sticky_events_foreach (ctx->srcpad,
1906       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1907
1908   /* Clear EOS flag if not actually EOS */
1909   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1910   ctx->out_eos_async_done = ctx->out_eos;
1911
1912   gst_object_unref (peer);
1913 }
1914
1915 static void
1916 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1917 {
1918   GstPad *sinkpad, *srcpad, *newpad;
1919   GstPadTemplate *templ;
1920
1921   srcpad = ctx->srcpad;
1922   sinkpad = gst_pad_get_peer (srcpad);
1923
1924   templ = sinkpad->padtemplate;
1925   newpad =
1926       gst_element_request_pad (splitmux->muxer, templ,
1927       GST_PAD_NAME (sinkpad), NULL);
1928
1929   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1930       newpad);
1931   if (!gst_pad_unlink (srcpad, sinkpad)) {
1932     gst_object_unref (sinkpad);
1933     goto fail;
1934   }
1935   if (gst_pad_link_full (srcpad, newpad,
1936           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1937     gst_element_release_request_pad (splitmux->muxer, newpad);
1938     gst_object_unref (sinkpad);
1939     gst_object_unref (newpad);
1940     goto fail;
1941   }
1942   gst_object_unref (newpad);
1943   gst_object_unref (sinkpad);
1944   return;
1945
1946 fail:
1947   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1948       ("Could not create the new muxer/sink"), NULL);
1949 }
1950
1951 static GstPadProbeReturn
1952 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1953 {
1954   return GST_PAD_PROBE_OK;
1955 }
1956
1957 static void
1958 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1959 {
1960   ctx->fragment_block_id =
1961       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1962       NULL, NULL);
1963 }
1964
1965 static gboolean
1966 _set_property_from_structure (GQuark field_id, const GValue * value,
1967     gpointer user_data)
1968 {
1969   const gchar *property_name = g_quark_to_string (field_id);
1970   GObject *element = G_OBJECT (user_data);
1971
1972   g_object_set_property (element, property_name, value);
1973
1974   return TRUE;
1975 }
1976
1977 static void
1978 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1979 {
1980   gst_element_set_locked_state (element, TRUE);
1981   gst_element_set_state (element, GST_STATE_NULL);
1982   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1983   gst_bin_remove (GST_BIN (splitmux), element);
1984 }
1985
1986
1987 static void
1988 _send_event (const GValue * value, gpointer user_data)
1989 {
1990   GstPad *pad = g_value_get_object (value);
1991   GstEvent *ev = user_data;
1992
1993   gst_pad_send_event (pad, gst_event_ref (ev));
1994 }
1995
1996 /* Called with lock held when a fragment
1997  * reaches EOS and it is time to restart
1998  * a new fragment
1999  */
2000 static GstFlowReturn
2001 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2002 {
2003   GstElement *muxer, *sink;
2004
2005   g_assert (ctx->is_reference);
2006
2007   /* 1 change to new file */
2008   splitmux->switching_fragment = TRUE;
2009
2010   /* We need to drop the splitmux lock to acquire the state lock
2011    * here and ensure there's no racy state change going on elsewhere */
2012   muxer = gst_object_ref (splitmux->muxer);
2013   sink = gst_object_ref (splitmux->active_sink);
2014
2015   GST_SPLITMUX_UNLOCK (splitmux);
2016   GST_SPLITMUX_STATE_LOCK (splitmux);
2017
2018   if (splitmux->shutdown) {
2019     GST_DEBUG_OBJECT (splitmux,
2020         "Shutdown requested. Aborting fragment switch.");
2021     GST_SPLITMUX_LOCK (splitmux);
2022     GST_SPLITMUX_STATE_UNLOCK (splitmux);
2023     gst_object_unref (muxer);
2024     gst_object_unref (sink);
2025     return GST_FLOW_FLUSHING;
2026   }
2027
2028   if (splitmux->async_finalize) {
2029     if (splitmux->muxed_out_bytes > 0
2030         || splitmux->fragment_id != splitmux->start_index) {
2031       gchar *newname;
2032       GstElement *new_sink, *new_muxer;
2033
2034       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2035           splitmux->fragment_id);
2036       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2037       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2038       GST_SPLITMUX_LOCK (splitmux);
2039       if ((splitmux->sink =
2040               create_element (splitmux, splitmux->sink_factory, newname,
2041                   TRUE)) == NULL)
2042         goto fail;
2043       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2044         gst_preset_load_preset (GST_PRESET (splitmux->sink),
2045             splitmux->sink_preset);
2046       if (splitmux->sink_properties)
2047         gst_structure_foreach (splitmux->sink_properties,
2048             _set_property_from_structure, splitmux->sink);
2049       splitmux->active_sink = splitmux->sink;
2050       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2051       g_free (newname);
2052       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2053       if ((splitmux->muxer =
2054               create_element (splitmux, splitmux->muxer_factory, newname,
2055                   TRUE)) == NULL)
2056         goto fail;
2057       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2058               "async") != NULL) {
2059         /* async child elements are causing state change races and weird
2060          * failures, so let's try and turn that off */
2061         g_object_set (splitmux->sink, "async", FALSE, NULL);
2062       }
2063       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2064         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2065             splitmux->muxer_preset);
2066       if (splitmux->muxer_properties)
2067         gst_structure_foreach (splitmux->muxer_properties,
2068             _set_property_from_structure, splitmux->muxer);
2069       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2070       g_free (newname);
2071       new_sink = splitmux->sink;
2072       new_muxer = splitmux->muxer;
2073       GST_SPLITMUX_UNLOCK (splitmux);
2074       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2075       gst_element_link (new_muxer, new_sink);
2076
2077       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2078         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2079                     EOS_FROM_US)) == 2) {
2080           _lock_and_set_to_null (muxer, splitmux);
2081           _lock_and_set_to_null (sink, splitmux);
2082         } else {
2083           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2084               GINT_TO_POINTER (2));
2085         }
2086       }
2087       gst_object_unref (muxer);
2088       gst_object_unref (sink);
2089       muxer = new_muxer;
2090       sink = new_sink;
2091       gst_object_ref (muxer);
2092       gst_object_ref (sink);
2093     }
2094   } else {
2095
2096     gst_element_set_locked_state (muxer, TRUE);
2097     gst_element_set_locked_state (sink, TRUE);
2098     gst_element_set_state (sink, GST_STATE_NULL);
2099
2100     if (splitmux->reset_muxer) {
2101       gst_element_set_state (muxer, GST_STATE_NULL);
2102     } else {
2103       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2104       GstEvent *ev;
2105       guint32 seqnum;
2106
2107       ev = gst_event_new_flush_start ();
2108       seqnum = gst_event_get_seqnum (ev);
2109       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110       gst_event_unref (ev);
2111
2112       gst_iterator_resync (it);
2113
2114       ev = gst_event_new_flush_stop (TRUE);
2115       gst_event_set_seqnum (ev, seqnum);
2116       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2117       gst_event_unref (ev);
2118
2119       gst_iterator_free (it);
2120     }
2121   }
2122
2123   GST_SPLITMUX_LOCK (splitmux);
2124   set_next_filename (splitmux, ctx);
2125   splitmux->muxed_out_bytes = 0;
2126   GST_SPLITMUX_UNLOCK (splitmux);
2127
2128   if (gst_element_set_state (sink,
2129           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2130     gst_element_set_state (sink, GST_STATE_NULL);
2131     gst_element_set_locked_state (muxer, FALSE);
2132     gst_element_set_locked_state (sink, FALSE);
2133
2134     goto fail_output;
2135   }
2136
2137   if (gst_element_set_state (muxer,
2138           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2139     gst_element_set_state (muxer, GST_STATE_NULL);
2140     gst_element_set_state (sink, GST_STATE_NULL);
2141     gst_element_set_locked_state (muxer, FALSE);
2142     gst_element_set_locked_state (sink, FALSE);
2143     goto fail_muxer;
2144   }
2145
2146   gst_element_set_locked_state (muxer, FALSE);
2147   gst_element_set_locked_state (sink, FALSE);
2148
2149   gst_object_unref (sink);
2150   gst_object_unref (muxer);
2151
2152   GST_SPLITMUX_LOCK (splitmux);
2153   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2154   splitmux->switching_fragment = FALSE;
2155   do_async_done (splitmux);
2156
2157   splitmux->ready_for_output = TRUE;
2158
2159   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2160   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2161
2162   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2163
2164   /* FIXME: Is this always the correct next state? */
2165   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2166   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2167   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2168   return GST_FLOW_OK;
2169
2170 fail:
2171   gst_object_unref (sink);
2172   gst_object_unref (muxer);
2173
2174   GST_SPLITMUX_LOCK (splitmux);
2175   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2176   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2177       ("Could not create the new muxer/sink"), NULL);
2178   return GST_FLOW_ERROR;
2179
2180 fail_output:
2181   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2182       ("Could not start new output sink"), NULL);
2183   gst_object_unref (sink);
2184   gst_object_unref (muxer);
2185
2186   GST_SPLITMUX_LOCK (splitmux);
2187   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2188   splitmux->switching_fragment = FALSE;
2189   return GST_FLOW_ERROR;
2190
2191 fail_muxer:
2192   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2193       ("Could not start new muxer"), NULL);
2194   gst_object_unref (sink);
2195   gst_object_unref (muxer);
2196
2197   GST_SPLITMUX_LOCK (splitmux);
2198   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2199   splitmux->switching_fragment = FALSE;
2200   return GST_FLOW_ERROR;
2201 }
2202
2203 static void
2204 bus_handler (GstBin * bin, GstMessage * message)
2205 {
2206   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2207
2208   switch (GST_MESSAGE_TYPE (message)) {
2209     case GST_MESSAGE_EOS:{
2210       /* If the state is draining out the current file, drop this EOS */
2211       GstElement *sink;
2212
2213       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2214       GST_SPLITMUX_LOCK (splitmux);
2215
2216       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2217
2218       if (splitmux->async_finalize) {
2219
2220         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2221           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2222                       EOS_FROM_US)) == 2) {
2223             GstElement *muxer;
2224             GstPad *sinksink, *muxersrc;
2225
2226             sinksink = gst_element_get_static_pad (sink, "sink");
2227             muxersrc = gst_pad_get_peer (sinksink);
2228             muxer = gst_pad_get_parent_element (muxersrc);
2229             gst_object_unref (sinksink);
2230             gst_object_unref (muxersrc);
2231
2232             gst_element_call_async (muxer,
2233                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2234                 gst_object_ref (splitmux), gst_object_unref);
2235             gst_element_call_async (sink,
2236                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2237                 gst_object_ref (splitmux), gst_object_unref);
2238             gst_object_unref (muxer);
2239           } else {
2240             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2241                 GINT_TO_POINTER (2));
2242           }
2243           GST_DEBUG_OBJECT (splitmux,
2244               "Caught async EOS from previous muxer+sink. Dropping.");
2245           /* We forward the EOS so that it gets aggregated as normal. If the sink
2246            * finishes and is removed before the end, it will be de-aggregated */
2247           gst_message_unref (message);
2248           GST_SPLITMUX_UNLOCK (splitmux);
2249           return;
2250         }
2251       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2252         GST_DEBUG_OBJECT (splitmux,
2253             "Passing EOS message. Output state %d max_out_running_time %"
2254             GST_STIME_FORMAT, splitmux->output_state,
2255             GST_STIME_ARGS (splitmux->max_out_running_time));
2256       } else {
2257         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2258         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2259         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2260
2261         gst_message_unref (message);
2262         GST_SPLITMUX_UNLOCK (splitmux);
2263         return;
2264       }
2265       GST_SPLITMUX_UNLOCK (splitmux);
2266       break;
2267     }
2268     case GST_MESSAGE_ASYNC_START:
2269     case GST_MESSAGE_ASYNC_DONE:
2270       /* Ignore state changes from our children while switching */
2271       GST_SPLITMUX_LOCK (splitmux);
2272       if (splitmux->switching_fragment) {
2273         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2274             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2275           GST_LOG_OBJECT (splitmux,
2276               "Ignoring state change from child %" GST_PTR_FORMAT
2277               " while switching", GST_MESSAGE_SRC (message));
2278           gst_message_unref (message);
2279           GST_SPLITMUX_UNLOCK (splitmux);
2280           return;
2281         }
2282       }
2283       GST_SPLITMUX_UNLOCK (splitmux);
2284       break;
2285     case GST_MESSAGE_WARNING:
2286     {
2287       GError *gerror = NULL;
2288
2289       gst_message_parse_warning (message, &gerror, NULL);
2290
2291       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2292         GList *item;
2293         gboolean caps_change = FALSE;
2294
2295         GST_SPLITMUX_LOCK (splitmux);
2296
2297         for (item = splitmux->contexts; item; item = item->next) {
2298           MqStreamCtx *ctx = item->data;
2299
2300           if (ctx->caps_change) {
2301             caps_change = TRUE;
2302             break;
2303           }
2304         }
2305
2306         GST_SPLITMUX_UNLOCK (splitmux);
2307
2308         if (caps_change) {
2309           GST_LOG_OBJECT (splitmux,
2310               "Ignoring warning change from child %" GST_PTR_FORMAT
2311               " while switching caps", GST_MESSAGE_SRC (message));
2312           gst_message_unref (message);
2313           return;
2314         }
2315       }
2316       break;
2317     }
2318     default:
2319       break;
2320   }
2321
2322   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2323 }
2324
2325 static void
2326 ctx_set_unblock (MqStreamCtx * ctx)
2327 {
2328   ctx->need_unblock = TRUE;
2329 }
2330
2331 static gboolean
2332 need_new_fragment (GstSplitMuxSink * splitmux,
2333     GstClockTime queued_time, GstClockTime queued_gop_time,
2334     guint64 queued_bytes)
2335 {
2336   guint64 thresh_bytes;
2337   GstClockTime thresh_time;
2338   gboolean check_robust_muxing;
2339   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2340   GstClockTime *ptr_to_time;
2341   const InputGop *gop, *next_gop;
2342
2343   GST_OBJECT_LOCK (splitmux);
2344   thresh_bytes = splitmux->threshold_bytes;
2345   thresh_time = splitmux->threshold_time;
2346   ptr_to_time = (GstClockTime *)
2347       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2348   if (ptr_to_time)
2349     time_to_split = *ptr_to_time;
2350   check_robust_muxing = splitmux->use_robust_muxing
2351       && splitmux->muxer_has_reserved_props;
2352   GST_OBJECT_UNLOCK (splitmux);
2353
2354   /* Have we muxed at least one thing from the reference
2355    * stream into the file? If not, no other streams can have
2356    * either */
2357   if (splitmux->fragment_reference_bytes <= 0) {
2358     GST_TRACE_OBJECT (splitmux,
2359         "Not ready to split - nothing muxed on the reference stream");
2360     return FALSE;
2361   }
2362
2363   /* User told us to split now */
2364   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2365     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2366     return TRUE;
2367   }
2368
2369   gop = g_queue_peek_head (&splitmux->pending_input_gops);
2370   /* We need a full GOP queued up at this point */
2371   g_assert (gop != NULL);
2372   next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2373   /* And the beginning of the next GOP or otherwise EOS */
2374
2375   /* User told us to split at this running time */
2376   if (gop->start_time >= time_to_split) {
2377     GST_OBJECT_LOCK (splitmux);
2378     /* Dequeue running time */
2379     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2380     /* Empty any running times after this that are past now */
2381     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382     while (ptr_to_time) {
2383       time_to_split = *ptr_to_time;
2384       if (gop->start_time < time_to_split) {
2385         break;
2386       }
2387       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2388       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2389     }
2390     GST_TRACE_OBJECT (splitmux,
2391         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2392         GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2393         GST_STIME_ARGS (time_to_split));
2394     GST_OBJECT_UNLOCK (splitmux);
2395     return TRUE;
2396   }
2397
2398   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2399     GST_TRACE_OBJECT (splitmux,
2400         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2401     return TRUE;                /* Would overrun byte limit */
2402   }
2403
2404   if (thresh_time > 0 && queued_time > thresh_time) {
2405     GST_TRACE_OBJECT (splitmux,
2406         "queued time %" GST_STIME_FORMAT " overruns time limit",
2407         GST_STIME_ARGS (queued_time));
2408     return TRUE;                /* Would overrun time limit */
2409   }
2410
2411   if (splitmux->tc_interval) {
2412     GstClockTime next_gop_start_time =
2413         next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2414
2415     if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2416         GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2417         next_gop_start_time >
2418         splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2419       GST_TRACE_OBJECT (splitmux,
2420           "in running time %" GST_STIME_FORMAT " overruns time limit %"
2421           GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2422           GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2423       return TRUE;
2424     }
2425   }
2426
2427   if (check_robust_muxing) {
2428     GstClockTime mux_reserved_remain;
2429
2430     g_object_get (splitmux->muxer,
2431         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2432
2433     GST_LOG_OBJECT (splitmux,
2434         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2435         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2436         mux_reserved_remain, queued_gop_time);
2437
2438     if (queued_gop_time >= mux_reserved_remain) {
2439       GST_INFO_OBJECT (splitmux,
2440           "File is about to run out of header room - %" G_GUINT64_FORMAT
2441           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2442           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2443       return TRUE;
2444     }
2445   }
2446
2447   /* Continue and mux this GOP */
2448   return FALSE;
2449 }
2450
2451 /* probably we want to add this API? */
2452 static void
2453 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2454 {
2455   GstVideoTimeCode *timecode = NULL;
2456
2457   g_return_if_fail (old_tc != NULL);
2458
2459   if (*old_tc == new_tc)
2460     return;
2461
2462   if (new_tc)
2463     timecode = gst_video_time_code_copy (new_tc);
2464
2465   if (*old_tc)
2466     gst_video_time_code_free (*old_tc);
2467
2468   *old_tc = timecode;
2469 }
2470
2471 /* Called with splitmux lock held */
2472 /* Called when entering ProcessingCompleteGop state
2473  * Assess if mq contents overflowed the current file
2474  *   -> If yes, need to switch to new file
2475  *   -> if no, set max_out_running_time to let this GOP in and
2476  *      go to COLLECTING_GOP_START state
2477  */
2478 static void
2479 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2480     GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2481 {
2482   guint64 queued_bytes;
2483   GstClockTimeDiff queued_time = 0;
2484   GstClockTimeDiff queued_gop_time = 0;
2485   SplitMuxOutputCommand *cmd;
2486
2487   /* Assess if the multiqueue contents overflowed the current file */
2488   /* When considering if a newly gathered GOP overflows
2489    * the time limit for the file, only consider the running time of the
2490    * reference stream. Other streams might have run ahead a little bit,
2491    * but extra pieces won't be released to the muxer beyond the reference
2492    * stream cut-off anyway - so it forms the limit. */
2493   queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2494   queued_time = next_gop_start_time;
2495   /* queued_gop_time tracks how much unwritten data there is waiting to
2496    * be written to this fragment including this GOP */
2497   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2498     queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2499   else
2500     queued_gop_time = queued_time - gop->start_time;
2501
2502   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2503   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2504       " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2505       " gop start time %" GST_STIME_FORMAT,
2506       GST_STIME_ARGS (queued_time), queued_bytes,
2507       GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2508
2509   if (queued_gop_time < 0)
2510     goto error_gop_duration;
2511
2512   if (queued_time < splitmux->fragment_start_time)
2513     goto error_queued_time;
2514
2515   queued_time -= splitmux->fragment_start_time;
2516   if (queued_time < queued_gop_time)
2517     queued_gop_time = queued_time;
2518
2519   /* Expand queued bytes estimate by muxer overhead */
2520   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2521
2522   /* Check for overrun - have we output at least one byte and overrun
2523    * either threshold? */
2524   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2525     if (splitmux->async_finalize) {
2526       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2527       *sink_running_time = splitmux->reference_ctx->out_running_time;
2528       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2529           RUNNING_TIME, sink_running_time, g_free);
2530     }
2531     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2532     /* Tell the output side to start a new fragment */
2533     GST_INFO_OBJECT (splitmux,
2534         "This GOP (dur %" GST_STIME_FORMAT
2535         ") would overflow the fragment, Sending start_new_fragment cmd",
2536         GST_STIME_ARGS (queued_gop_time));
2537     cmd = out_cmd_buf_new ();
2538     cmd->start_new_fragment = TRUE;
2539     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2540     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2541
2542     splitmux->fragment_start_time = gop->start_time;
2543     splitmux->fragment_start_time_pts = gop->start_time_pts;
2544     splitmux->fragment_total_bytes = 0;
2545     splitmux->fragment_reference_bytes = 0;
2546
2547     video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2548     splitmux->next_fragment_start_tc_time =
2549         calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2550         splitmux->fragment_start_time, NULL);
2551     if (splitmux->tc_interval && splitmux->fragment_start_tc
2552         && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2553       GST_WARNING_OBJECT (splitmux,
2554           "Couldn't calculate next fragment start time for timecode mode");
2555     }
2556   }
2557
2558   /* And set up to collect the next GOP */
2559   if (max_out_running_time != G_MAXINT64) {
2560     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2561   } else {
2562     /* This is probably already the current state, but just in case: */
2563     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2564   }
2565
2566   /* And wake all input contexts to send a wake-up event */
2567   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2568   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2569
2570   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2571   splitmux->fragment_total_bytes += gop->total_bytes;
2572   splitmux->fragment_reference_bytes += gop->reference_bytes;
2573
2574   if (gop->total_bytes > 0) {
2575     GST_LOG_OBJECT (splitmux,
2576         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2577         " time %" GST_STIME_FORMAT,
2578         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2579
2580     /* Send this GOP to the output command queue */
2581     cmd = out_cmd_buf_new ();
2582     cmd->start_new_fragment = FALSE;
2583     cmd->max_output_ts = max_out_running_time;
2584     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2585         GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2586     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2587
2588     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2589   }
2590
2591   return;
2592
2593 error_gop_duration:
2594   GST_ELEMENT_ERROR (splitmux,
2595       STREAM, FAILED, ("Timestamping error on input streams"),
2596       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2597           GST_STIME_ARGS (queued_gop_time)));
2598   return;
2599 error_queued_time:
2600   GST_ELEMENT_ERROR (splitmux,
2601       STREAM, FAILED, ("Timestamping error on input streams"),
2602       ("Queued time is negative. Input went backwards. queued_time - %"
2603           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2604   return;
2605 }
2606
2607 /* Called with splitmux lock held */
2608 /* Called from each input pad when it is has all the pieces
2609  * for a GOP or EOS, starting with the reference pad which has set the
2610  * splitmux->max_in_running_time
2611  */
2612 static void
2613 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2614 {
2615   GList *cur;
2616   GstEvent *event;
2617
2618   /* On ENDING_FILE, the reference stream sends a command to start a new
2619    * fragment, then releases the GOP for output in the new fragment.
2620    *  If some streams received no buffer during the last GOP that overran,
2621    * because its next buffer has a timestamp bigger than
2622    * ctx->max_in_running_time, its queue is empty. In that case the only
2623    * way to wakeup the output thread is by injecting an event in the
2624    * queue. This usually happen with subtitle streams.
2625    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2626   if (ctx->need_unblock) {
2627     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2628     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2629         GST_EVENT_TYPE_SERIALIZED,
2630         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2631             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2632
2633     GST_SPLITMUX_UNLOCK (splitmux);
2634     gst_pad_send_event (ctx->sinkpad, event);
2635     GST_SPLITMUX_LOCK (splitmux);
2636
2637     ctx->need_unblock = FALSE;
2638     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2639     /* state may have changed while we were unlocked. Loop again if so */
2640     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2641       return;
2642   }
2643
2644   do {
2645     GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2646
2647     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2648       GstClockTimeDiff max_out_running_time;
2649       gboolean ready = TRUE;
2650       InputGop *gop;
2651       const InputGop *next_gop;
2652
2653       gop = g_queue_peek_head (&splitmux->pending_input_gops);
2654       next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2655
2656       /* If we have no GOP or no next GOP here then the reference context is
2657        * at EOS, otherwise use the start time of the next GOP if we're far
2658        * enough in the GOP to know it */
2659       if (gop && next_gop) {
2660         if (!splitmux->reference_ctx->in_eos
2661             && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2662             && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2663           GST_LOG_OBJECT (splitmux,
2664               "No further GOPs finished collecting, waiting until current DTS %"
2665               GST_STIME_FORMAT " has passed next GOP start PTS %"
2666               GST_STIME_FORMAT,
2667               GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2668               GST_STIME_ARGS (next_gop->start_time_pts));
2669           break;
2670         }
2671
2672         GST_LOG_OBJECT (splitmux,
2673             "Finished collecting GOP with start time %" GST_STIME_FORMAT
2674             ", next GOP start time %" GST_STIME_FORMAT,
2675             GST_STIME_ARGS (gop->start_time),
2676             GST_STIME_ARGS (next_gop->start_time));
2677         next_gop_start = next_gop->start_time;
2678         max_out_running_time =
2679             splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2680       } else if (!next_gop) {
2681         GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2682         next_gop_start = splitmux->max_in_running_time;
2683         max_out_running_time = G_MAXINT64;
2684       } else if (!gop) {
2685         GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2686         break;
2687       } else {
2688         g_assert_not_reached ();
2689       }
2690
2691       g_assert (gop != NULL);
2692
2693       /* Iterate each pad, and check that the input running time is at least
2694        * up to the start running time of the next GOP or EOS, and if so handle
2695        * the collected GOP */
2696       GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2697           GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2698       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2699           cur = g_list_next (cur)) {
2700         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2701
2702         GST_LOG_OBJECT (splitmux,
2703             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2704             " EOS %d", tmpctx, tmpctx->sinkpad,
2705             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2706
2707         if (next_gop_start != GST_CLOCK_STIME_NONE &&
2708             tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2709           GST_LOG_OBJECT (splitmux,
2710               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2711               tmpctx, tmpctx->sinkpad);
2712           ready = FALSE;
2713           break;
2714         }
2715       }
2716       if (ready) {
2717         GST_DEBUG_OBJECT (splitmux,
2718             "Collected GOP is complete. Processing (ctx %p)", ctx);
2719         /* All pads have a complete GOP, release it into the multiqueue */
2720         handle_gathered_gop (splitmux, gop, next_gop_start,
2721             max_out_running_time);
2722
2723         g_queue_pop_head (&splitmux->pending_input_gops);
2724         input_gop_free (gop);
2725
2726         /* The user has requested a split, we can split now that the previous GOP
2727          * has been collected to the correct location */
2728         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2729                 TRUE, FALSE)) {
2730           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2731         }
2732       }
2733     }
2734
2735     /* If upstream reached EOS we are not expecting more data, no need to wait
2736      * here. */
2737     if (ctx->in_eos)
2738       return;
2739
2740     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2741         !ctx->flushing &&
2742         ctx->in_running_time >= next_gop_start &&
2743         next_gop_start != GST_CLOCK_STIME_NONE) {
2744       /* Some pad is not yet ready, or GOP is being pushed
2745        * either way, sleep and wait to get woken */
2746       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2747       GST_SPLITMUX_WAIT_INPUT (splitmux);
2748       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2749     } else {
2750       /* This pad is not ready or the state changed - break out and get another
2751        * buffer / event */
2752       break;
2753     }
2754   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2755 }
2756
2757 static GstPadProbeReturn
2758 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2759 {
2760   GstSplitMuxSink *splitmux = ctx->splitmux;
2761   GstFlowReturn ret = GST_FLOW_OK;
2762   GstBuffer *buf;
2763   MqStreamBuf *buf_info = NULL;
2764   GstClockTime ts, pts, dts;
2765   GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2766   gboolean loop_again;
2767   gboolean keyframe = FALSE;
2768
2769   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2770
2771   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2772   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2773     g_warning ("Buffer list handling not implemented");
2774     return GST_PAD_PROBE_DROP;
2775   }
2776   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2777       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2778     GstEvent *event = gst_pad_probe_info_get_event (info);
2779
2780     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2781
2782     switch (GST_EVENT_TYPE (event)) {
2783       case GST_EVENT_SEGMENT:
2784         gst_event_copy_segment (event, &ctx->in_segment);
2785         break;
2786       case GST_EVENT_FLUSH_STOP:
2787         GST_SPLITMUX_LOCK (splitmux);
2788         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2789         ctx->in_eos = FALSE;
2790         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2791         GST_SPLITMUX_UNLOCK (splitmux);
2792         break;
2793       case GST_EVENT_EOS:
2794         GST_SPLITMUX_LOCK (splitmux);
2795         ctx->in_eos = TRUE;
2796
2797         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2798           ret = GST_FLOW_FLUSHING;
2799           goto beach;
2800         }
2801
2802         if (ctx->is_reference) {
2803           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2804           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2805           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2806           /* Wake up other input pads to collect this GOP */
2807           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2808           check_completed_gop (splitmux, ctx);
2809         } else if (splitmux->input_state ==
2810             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2811           /* If we are waiting for a GOP to be completed (ie, for aux
2812            * pads to catch up), then this pad is complete, so check
2813            * if the whole GOP is.
2814            */
2815           check_completed_gop (splitmux, ctx);
2816         }
2817         GST_SPLITMUX_UNLOCK (splitmux);
2818         break;
2819       case GST_EVENT_GAP:{
2820         GstClockTime gap_ts;
2821         GstClockTimeDiff rtime;
2822
2823         gst_event_parse_gap (event, &gap_ts, NULL);
2824         if (gap_ts == GST_CLOCK_TIME_NONE)
2825           break;
2826
2827         GST_SPLITMUX_LOCK (splitmux);
2828
2829         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2830           ret = GST_FLOW_FLUSHING;
2831           goto beach;
2832         }
2833         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2834
2835         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2836             GST_STIME_ARGS (rtime));
2837
2838         if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2839           /* If this GAP event happens before the first fragment then
2840            * initialize the fragment start time here. */
2841           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2842             splitmux->fragment_start_time = rtime;
2843             GST_LOG_OBJECT (splitmux,
2844                 "Fragment start time now %" GST_STIME_FORMAT,
2845                 GST_STIME_ARGS (splitmux->fragment_start_time));
2846
2847             /* Also take this as the first start time when starting up,
2848              * so that we start counting overflow from the first frame */
2849             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2850               splitmux->max_in_running_time = rtime;
2851             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2852               splitmux->max_in_running_time_dts = rtime;
2853           }
2854
2855           /* Similarly take it as fragment start PTS and GOP start time if
2856            * these are not set */
2857           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2858             splitmux->fragment_start_time_pts = rtime;
2859
2860           if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2861             InputGop *gop = g_slice_new0 (InputGop);
2862
2863             gop->from_gap = TRUE;
2864             gop->start_time = rtime;
2865             gop->start_time_pts = rtime;
2866
2867             g_queue_push_tail (&splitmux->pending_input_gops, gop);
2868           }
2869         }
2870
2871         GST_SPLITMUX_UNLOCK (splitmux);
2872         break;
2873       }
2874       default:
2875         break;
2876     }
2877     return GST_PAD_PROBE_PASS;
2878   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2879     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2880       case GST_QUERY_ALLOCATION:
2881         return GST_PAD_PROBE_DROP;
2882       default:
2883         return GST_PAD_PROBE_PASS;
2884     }
2885   }
2886
2887   buf = gst_pad_probe_info_get_buffer (info);
2888   buf_info = mq_stream_buf_new ();
2889
2890   pts = GST_BUFFER_PTS (buf);
2891   dts = GST_BUFFER_DTS (buf);
2892   if (GST_BUFFER_PTS_IS_VALID (buf))
2893     ts = GST_BUFFER_PTS (buf);
2894   else
2895     ts = GST_BUFFER_DTS (buf);
2896
2897   GST_LOG_OBJECT (pad,
2898       "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2899       GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2900       GST_TIME_ARGS (dts));
2901
2902   GST_SPLITMUX_LOCK (splitmux);
2903
2904   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2905     ret = GST_FLOW_FLUSHING;
2906     goto beach;
2907   }
2908
2909   /* If this buffer has a timestamp, advance the input timestamp of the
2910    * stream */
2911   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2912     running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2913
2914     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2915         GST_STIME_ARGS (running_time));
2916
2917     /* in running time is always the maximum PTS (or DTS) that was observed so far */
2918     if (GST_CLOCK_STIME_IS_VALID (running_time)
2919         && running_time > ctx->in_running_time)
2920       ctx->in_running_time = running_time;
2921   } else {
2922     running_time = ctx->in_running_time;
2923   }
2924
2925   if (GST_CLOCK_TIME_IS_VALID (pts))
2926     running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2927   else
2928     running_time_pts = GST_CLOCK_STIME_NONE;
2929
2930   if (GST_CLOCK_TIME_IS_VALID (dts))
2931     running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2932   else
2933     running_time_dts = GST_CLOCK_STIME_NONE;
2934
2935   /* Try to make sure we have a valid running time */
2936   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2937     ctx->in_running_time =
2938         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2939   }
2940
2941   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2942       GST_STIME_ARGS (ctx->in_running_time));
2943
2944   buf_info->run_ts = ctx->in_running_time;
2945   buf_info->buf_size = gst_buffer_get_size (buf);
2946   buf_info->duration = GST_BUFFER_DURATION (buf);
2947
2948   if (ctx->is_reference) {
2949     InputGop *gop = NULL;
2950     GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2951
2952     /* initialize fragment_start_time if it was not set yet (i.e. for the
2953      * first fragment), or otherwise set it to the minimum observed time */
2954     if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
2955         || splitmux->fragment_start_time > running_time) {
2956       if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
2957         splitmux->fragment_start_time_pts = running_time_pts;
2958       splitmux->fragment_start_time = running_time;
2959
2960       GST_LOG_OBJECT (splitmux,
2961           "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
2962           GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
2963           GST_STIME_ARGS (splitmux->fragment_start_time_pts));
2964
2965       /* Also take this as the first start time when starting up,
2966        * so that we start counting overflow from the first frame */
2967       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
2968           || splitmux->max_in_running_time < splitmux->fragment_start_time)
2969         splitmux->max_in_running_time = splitmux->fragment_start_time;
2970
2971       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2972         splitmux->max_in_running_time_dts = running_time_dts;
2973
2974       if (tc_meta) {
2975         video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2976
2977         splitmux->next_fragment_start_tc_time =
2978             calculate_next_max_timecode (splitmux, &tc_meta->tc,
2979             running_time, NULL);
2980         if (splitmux->tc_interval
2981             && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
2982         {
2983           GST_WARNING_OBJECT (splitmux,
2984               "Couldn't calculate next fragment start time for timecode mode");
2985         }
2986 #ifndef GST_DISABLE_GST_DEBUG
2987         {
2988           gchar *tc_str;
2989
2990           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
2991           GST_DEBUG_OBJECT (splitmux,
2992               "Initialize fragment start timecode %s, next fragment start timecode time %"
2993               GST_TIME_FORMAT, tc_str,
2994               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2995           g_free (tc_str);
2996         }
2997 #endif
2998       }
2999     }
3000
3001
3002     /* First check if we're at the very first GOP and the tracking was created
3003      * from a GAP event. In that case don't start a new GOP on keyframes but
3004      * just updated it as needed */
3005     gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3006
3007     if (!gop || (!gop->from_gap
3008             && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
3009       gop = g_slice_new0 (InputGop);
3010
3011       gop->start_time = running_time;
3012       gop->start_time_pts = running_time_pts;
3013
3014       GST_LOG_OBJECT (splitmux,
3015           "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3016           GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3017           GST_STIME_ARGS (gop->start_time_pts));
3018
3019       if (tc_meta) {
3020         video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3021
3022 #ifndef GST_DISABLE_GST_DEBUG
3023         {
3024           gchar *tc_str;
3025
3026           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3027           GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3028           g_free (tc_str);
3029         }
3030 #endif
3031       }
3032
3033       g_queue_push_tail (&splitmux->pending_input_gops, gop);
3034     } else {
3035       gop->from_gap = FALSE;
3036
3037       if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3038           || gop->start_time > running_time) {
3039         gop->start_time = running_time;
3040
3041         GST_LOG_OBJECT (splitmux,
3042             "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3043             GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3044             GST_STIME_ARGS (gop->start_time_pts));
3045
3046         if (tc_meta) {
3047           video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3048
3049 #ifndef GST_DISABLE_GST_DEBUG
3050           {
3051             gchar *tc_str;
3052
3053             tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3054             GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3055                 tc_str);
3056             g_free (tc_str);
3057           }
3058 #endif
3059         }
3060       }
3061     }
3062
3063     /* Check whether we need to request next keyframe depending on
3064      * current running time */
3065     if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3066       GST_WARNING_OBJECT (splitmux,
3067           "Could not request a keyframe. Files may not split at the exact location they should");
3068     }
3069   }
3070
3071   {
3072     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3073
3074     if (gop) {
3075       GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3076           " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3077           G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3078           gop->total_bytes, gop->total_bytes);
3079     }
3080   }
3081
3082   loop_again = TRUE;
3083   do {
3084     if (ctx->flushing)
3085       break;
3086
3087     switch (splitmux->input_state) {
3088       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3089         if (ctx->is_releasing) {
3090           /* The pad belonging to this context is being released */
3091           GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
3092               "running. Data might not drain correctly");
3093           loop_again = FALSE;
3094         } else if (ctx->is_reference) {
3095           const InputGop *gop, *next_gop;
3096
3097           /* This is the reference context. If it's a keyframe,
3098            * it marks the start of a new GOP and we should wait in
3099            * check_completed_gop before continuing, but either way
3100            * (keyframe or no, we'll pass this buffer through after
3101            * so set loop_again to FALSE */
3102           loop_again = FALSE;
3103
3104           gop = g_queue_peek_head (&splitmux->pending_input_gops);
3105           g_assert (gop != NULL);
3106           next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3107
3108           if (ctx->in_running_time > splitmux->max_in_running_time)
3109             splitmux->max_in_running_time = ctx->in_running_time;
3110           if (running_time_dts > splitmux->max_in_running_time_dts)
3111             splitmux->max_in_running_time_dts = running_time_dts;
3112
3113           GST_LOG_OBJECT (splitmux,
3114               "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3115               GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3116               GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3117
3118           if (!next_gop) {
3119             GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3120             /* Allow other input pads to catch up to here too */
3121             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3122             break;
3123           }
3124
3125           if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3126             GST_INFO_OBJECT (pad,
3127                 "Have keyframe with running time %" GST_STIME_FORMAT,
3128                 GST_STIME_ARGS (ctx->in_running_time));
3129             keyframe = TRUE;
3130           }
3131
3132           if (running_time_dts != GST_CLOCK_STIME_NONE
3133               && running_time_dts < next_gop->start_time_pts) {
3134             GST_DEBUG_OBJECT (splitmux,
3135                 "Waiting until DTS (%" GST_STIME_FORMAT
3136                 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3137                 GST_STIME_ARGS (running_time_dts),
3138                 GST_STIME_ARGS (next_gop->start_time_pts));
3139             /* Allow other input pads to catch up to here too */
3140             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3141             break;
3142           }
3143
3144           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3145           /* Wake up other input pads to collect this GOP */
3146           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3147           check_completed_gop (splitmux, ctx);
3148         } else {
3149           /* Pass this buffer if the reference ctx is far enough ahead */
3150           if (ctx->in_running_time < splitmux->max_in_running_time) {
3151             loop_again = FALSE;
3152             break;
3153           }
3154
3155           /* We're still waiting for a keyframe on the reference pad, sleep */
3156           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3157           GST_SPLITMUX_WAIT_INPUT (splitmux);
3158           GST_LOG_OBJECT (pad,
3159               "Done sleeping for GOP start input state now %d",
3160               splitmux->input_state);
3161         }
3162         break;
3163       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3164         /* We're collecting a GOP, this is only ever called for non-reference
3165          * contexts as the reference context would be waiting inside
3166          * check_completed_gop() */
3167
3168         g_assert (!ctx->is_reference);
3169
3170         /* If we overran the target timestamp, it might be time to process
3171          * the GOP, otherwise bail out for more data. */
3172         GST_LOG_OBJECT (pad,
3173             "Checking TS %" GST_STIME_FORMAT " against max %"
3174             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3175             GST_STIME_ARGS (splitmux->max_in_running_time));
3176
3177         if (ctx->in_running_time < splitmux->max_in_running_time) {
3178           loop_again = FALSE;
3179           break;
3180         }
3181
3182         GST_LOG_OBJECT (pad,
3183             "Collected last packet of GOP. Checking other pads");
3184         check_completed_gop (splitmux, ctx);
3185         break;
3186       }
3187       case SPLITMUX_INPUT_STATE_FINISHING_UP:
3188         loop_again = FALSE;
3189         break;
3190       default:
3191         loop_again = FALSE;
3192         break;
3193     }
3194   }
3195   while (loop_again);
3196
3197   if (keyframe && ctx->is_reference)
3198     splitmux->queued_keyframes++;
3199   buf_info->keyframe = keyframe;
3200
3201   /* Update total input byte counter for overflow detect unless we're after
3202    * EOS now */
3203   if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
3204       && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
3205     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3206
3207     /* We must have a GOP at this point */
3208     g_assert (gop != NULL);
3209
3210     gop->total_bytes += buf_info->buf_size;
3211     if (ctx->is_reference) {
3212       gop->reference_bytes += buf_info->buf_size;
3213     }
3214   }
3215
3216   /* Now add this buffer to the queue just before returning */
3217   g_queue_push_head (&ctx->queued_bufs, buf_info);
3218
3219   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3220       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3221
3222   GST_SPLITMUX_UNLOCK (splitmux);
3223   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3224   return GST_PAD_PROBE_PASS;
3225
3226 beach:
3227   GST_SPLITMUX_UNLOCK (splitmux);
3228   if (buf_info)
3229     mq_stream_buf_free (buf_info);
3230   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3231   return GST_PAD_PROBE_PASS;
3232 }
3233
3234 static void
3235 grow_blocked_queues (GstSplitMuxSink * splitmux)
3236 {
3237   GList *cur;
3238
3239   /* Scan other queues for full-ness and grow them */
3240   for (cur = g_list_first (splitmux->contexts);
3241       cur != NULL; cur = g_list_next (cur)) {
3242     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3243     guint cur_limit;
3244     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3245
3246     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3247     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3248
3249     if (cur_len >= cur_limit) {
3250       cur_limit = cur_len + 1;
3251       GST_DEBUG_OBJECT (tmpctx->q,
3252           "Queue overflowed and needs enlarging. Growing to %u buffers",
3253           cur_limit);
3254       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3255     }
3256   }
3257 }
3258
3259 static void
3260 handle_q_underrun (GstElement * q, gpointer user_data)
3261 {
3262   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3263   GstSplitMuxSink *splitmux = ctx->splitmux;
3264
3265   GST_SPLITMUX_LOCK (splitmux);
3266   GST_DEBUG_OBJECT (q,
3267       "Queue reported underrun with %d keyframes and %d cmds enqueued",
3268       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3269   grow_blocked_queues (splitmux);
3270   GST_SPLITMUX_UNLOCK (splitmux);
3271 }
3272
3273 static void
3274 handle_q_overrun (GstElement * q, gpointer user_data)
3275 {
3276   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3277   GstSplitMuxSink *splitmux = ctx->splitmux;
3278   gboolean allow_grow = FALSE;
3279
3280   GST_SPLITMUX_LOCK (splitmux);
3281   GST_DEBUG_OBJECT (q,
3282       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3283       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3284
3285   if (splitmux->queued_keyframes < 2) {
3286     /* Less than a full GOP queued, grow the queue */
3287     allow_grow = TRUE;
3288   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3289     allow_grow = TRUE;
3290   } else {
3291     /* If another queue is starved, grow */
3292     GList *cur;
3293     for (cur = g_list_first (splitmux->contexts);
3294         cur != NULL; cur = g_list_next (cur)) {
3295       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3296       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3297         allow_grow = TRUE;
3298       }
3299     }
3300   }
3301   GST_SPLITMUX_UNLOCK (splitmux);
3302
3303   if (allow_grow) {
3304     guint cur_limit;
3305
3306     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3307     cur_limit++;
3308
3309     GST_DEBUG_OBJECT (q,
3310         "Queue overflowed and needs enlarging. Growing to %u buffers",
3311         cur_limit);
3312
3313     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3314   }
3315 }
3316
3317 /* Called with SPLITMUX lock held */
3318 static const gchar *
3319 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3320 {
3321   const gchar *ret = NULL;
3322
3323   if (splitmux->muxerpad_map == NULL)
3324     return NULL;
3325
3326   if (sinkpad_name == NULL) {
3327     GST_WARNING_OBJECT (splitmux,
3328         "Can't look up request pad in pad map without providing a pad name");
3329     return NULL;
3330   }
3331
3332   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3333   if (ret) {
3334     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3335         ret);
3336     return g_strdup (ret);
3337   }
3338
3339   return NULL;
3340 }
3341
3342 static GstPad *
3343 gst_splitmux_sink_request_new_pad (GstElement * element,
3344     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3345 {
3346   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3347   GstPadTemplate *mux_template = NULL;
3348   GstPad *ret = NULL, *muxpad = NULL;
3349   GstElement *q;
3350   GstPad *q_sink = NULL, *q_src = NULL;
3351   gchar *gname, *qname;
3352   gboolean is_primary_video = FALSE, is_video = FALSE,
3353       muxer_is_requestpad = FALSE;
3354   MqStreamCtx *ctx;
3355   const gchar *muxer_padname = NULL;
3356
3357   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3358
3359   GST_SPLITMUX_LOCK (splitmux);
3360   if (!create_muxer (splitmux))
3361     goto fail;
3362   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3363
3364   if (g_str_equal (templ->name_template, "video") ||
3365       g_str_has_prefix (templ->name_template, "video_aux_")) {
3366     is_primary_video = g_str_equal (templ->name_template, "video");
3367     if (is_primary_video && splitmux->have_video)
3368       goto already_have_video;
3369     is_video = TRUE;
3370   }
3371
3372   /* See if there's a pad map and it lists this pad */
3373   muxer_padname = lookup_muxer_pad (splitmux, name);
3374
3375   if (muxer_padname == NULL) {
3376     if (is_video) {
3377       /* FIXME: Look for a pad template with matching caps, rather than by name */
3378       GST_DEBUG_OBJECT (element,
3379           "searching for pad-template with name 'video_%%u'");
3380       mux_template =
3381           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3382           (splitmux->muxer), "video_%u");
3383
3384       /* Fallback to find sink pad templates named 'video' (flvmux) */
3385       if (!mux_template) {
3386         GST_DEBUG_OBJECT (element,
3387             "searching for pad-template with name 'video'");
3388         mux_template =
3389             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3390             (splitmux->muxer), "video");
3391       }
3392       name = NULL;
3393     } else {
3394       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3395           templ->name_template);
3396       mux_template =
3397           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3398           (splitmux->muxer), templ->name_template);
3399
3400       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3401       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3402         GST_DEBUG_OBJECT (element,
3403             "searching for pad-template with name 'audio'");
3404         mux_template =
3405             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3406             (splitmux->muxer), "audio");
3407         name = NULL;
3408       }
3409     }
3410
3411     if (mux_template == NULL) {
3412       GST_DEBUG_OBJECT (element,
3413           "searching for pad-template with name 'sink_%%d'");
3414       mux_template =
3415           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3416           (splitmux->muxer), "sink_%d");
3417       name = NULL;
3418     }
3419     if (mux_template == NULL) {
3420       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3421       mux_template =
3422           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3423           (splitmux->muxer), "sink");
3424       name = NULL;
3425     }
3426
3427     if (mux_template == NULL) {
3428       GST_ERROR_OBJECT (element,
3429           "unable to find a suitable sink pad-template on the muxer");
3430       goto fail;
3431     }
3432     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3433         mux_template->name_template);
3434
3435     if (mux_template->presence == GST_PAD_REQUEST) {
3436       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3437
3438       muxpad =
3439           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3440       muxer_is_requestpad = TRUE;
3441     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3442       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3443
3444       muxpad =
3445           gst_element_get_static_pad (splitmux->muxer,
3446           mux_template->name_template);
3447     } else {
3448       GST_ERROR_OBJECT (element,
3449           "unexpected pad presence %d", mux_template->presence);
3450       goto fail;
3451     }
3452   } else {
3453     /* Have a muxer pad name */
3454     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3455       if ((muxpad =
3456               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3457         muxer_is_requestpad = TRUE;
3458     }
3459     g_free ((gchar *) muxer_padname);
3460     muxer_padname = NULL;
3461   }
3462
3463   /* One way or another, we must have a muxer pad by now */
3464   if (muxpad == NULL)
3465     goto fail;
3466
3467   if (is_primary_video)
3468     gname = g_strdup ("video");
3469   else if (name == NULL)
3470     gname = gst_pad_get_name (muxpad);
3471   else
3472     gname = g_strdup (name);
3473
3474   qname = g_strdup_printf ("queue_%s", gname);
3475   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3476     g_free (qname);
3477     goto fail;
3478   }
3479   g_free (qname);
3480
3481   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3482
3483   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3484       "max-size-buffers", 5, NULL);
3485
3486   q_sink = gst_element_get_static_pad (q, "sink");
3487   q_src = gst_element_get_static_pad (q, "src");
3488
3489   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3490     if (muxer_is_requestpad)
3491       gst_element_release_request_pad (splitmux->muxer, muxpad);
3492     gst_object_unref (GST_OBJECT (muxpad));
3493     goto fail;
3494   }
3495
3496   gst_object_unref (GST_OBJECT (muxpad));
3497
3498   ctx = mq_stream_ctx_new (splitmux);
3499   /* Context holds a ref: */
3500   ctx->q = gst_object_ref (q);
3501   ctx->srcpad = q_src;
3502   ctx->sinkpad = q_sink;
3503   ctx->q_overrun_id =
3504       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3505   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3506
3507   ctx->src_pad_block_id =
3508       gst_pad_add_probe (q_src,
3509       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3510       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3511   if (is_primary_video && splitmux->reference_ctx != NULL) {
3512     splitmux->reference_ctx->is_reference = FALSE;
3513     splitmux->reference_ctx = NULL;
3514   }
3515   if (splitmux->reference_ctx == NULL) {
3516     splitmux->reference_ctx = ctx;
3517     ctx->is_reference = TRUE;
3518   }
3519
3520   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3521   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3522
3523   ctx->sink_pad_block_id =
3524       gst_pad_add_probe (q_sink,
3525       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3526       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3527       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3528
3529   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3530       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3531
3532   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3533
3534   g_free (gname);
3535
3536   if (is_primary_video)
3537     splitmux->have_video = TRUE;
3538
3539   gst_pad_set_active (ret, TRUE);
3540   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3541
3542   GST_SPLITMUX_UNLOCK (splitmux);
3543
3544   return ret;
3545 fail:
3546   GST_SPLITMUX_UNLOCK (splitmux);
3547
3548   if (q_sink)
3549     gst_object_unref (q_sink);
3550   if (q_src)
3551     gst_object_unref (q_src);
3552   return NULL;
3553 already_have_video:
3554   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3555   GST_SPLITMUX_UNLOCK (splitmux);
3556   return NULL;
3557 }
3558
3559 static void
3560 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3561 {
3562   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3563   GstPad *muxpad = NULL;
3564   MqStreamCtx *ctx =
3565       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3566
3567   GST_SPLITMUX_LOCK (splitmux);
3568
3569   if (splitmux->muxer == NULL)
3570     goto fail;                  /* Elements don't exist yet - nothing to release */
3571
3572   GST_INFO_OBJECT (pad, "releasing request pad");
3573
3574   muxpad = gst_pad_get_peer (ctx->srcpad);
3575
3576   /* Remove the context from our consideration */
3577   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3578
3579   ctx->is_releasing = TRUE;
3580   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3581
3582   GST_SPLITMUX_UNLOCK (splitmux);
3583
3584   if (ctx->sink_pad_block_id) {
3585     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3586     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3587   }
3588
3589   if (ctx->src_pad_block_id)
3590     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3591
3592   /* Wait for the pad to be free */
3593   GST_PAD_STREAM_LOCK (pad);
3594   GST_SPLITMUX_LOCK (splitmux);
3595   GST_PAD_STREAM_UNLOCK (pad);
3596
3597   /* Can release the context now */
3598   mq_stream_ctx_free (ctx);
3599   if (ctx == splitmux->reference_ctx)
3600     splitmux->reference_ctx = NULL;
3601
3602   /* Release and free the muxer input */
3603   if (muxpad) {
3604     gst_element_release_request_pad (splitmux->muxer, muxpad);
3605     gst_object_unref (muxpad);
3606   }
3607
3608   if (GST_PAD_PAD_TEMPLATE (pad) &&
3609       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3610               (pad)), "video"))
3611     splitmux->have_video = FALSE;
3612
3613   gst_element_remove_pad (element, pad);
3614
3615   /* Reset the internal elements only after all request pads are released */
3616   if (splitmux->contexts == NULL)
3617     gst_splitmux_reset_elements (splitmux);
3618
3619   /* Wake up other input streams to check if the completion conditions have
3620    * changed */
3621   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3622
3623 fail:
3624   GST_SPLITMUX_UNLOCK (splitmux);
3625 }
3626
3627 static GstElement *
3628 create_element (GstSplitMuxSink * splitmux,
3629     const gchar * factory, const gchar * name, gboolean locked)
3630 {
3631   GstElement *ret = gst_element_factory_make (factory, name);
3632   if (ret == NULL) {
3633     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3634     return NULL;
3635   }
3636
3637   if (locked) {
3638     /* Ensure the sink starts in locked state and NULL - it will be changed
3639      * by the filename setting code */
3640     gst_element_set_locked_state (ret, TRUE);
3641     gst_element_set_state (ret, GST_STATE_NULL);
3642   }
3643
3644   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3645     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3646     gst_object_unref (ret);
3647     return NULL;
3648   }
3649
3650   return ret;
3651 }
3652
3653 static gboolean
3654 create_muxer (GstSplitMuxSink * splitmux)
3655 {
3656   /* Create internal elements */
3657   if (splitmux->muxer == NULL) {
3658     GstElement *provided_muxer = NULL;
3659
3660     GST_OBJECT_LOCK (splitmux);
3661     if (splitmux->provided_muxer != NULL)
3662       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3663     GST_OBJECT_UNLOCK (splitmux);
3664
3665     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3666         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3667       if ((splitmux->muxer =
3668               create_element (splitmux,
3669                   splitmux->muxer_factory ? splitmux->
3670                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3671         goto fail;
3672     } else if (splitmux->async_finalize) {
3673       if ((splitmux->muxer =
3674               create_element (splitmux, splitmux->muxer_factory, "muxer",
3675                   FALSE)) == NULL)
3676         goto fail;
3677       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3678         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3679             splitmux->muxer_preset);
3680       if (splitmux->muxer_properties)
3681         gst_structure_foreach (splitmux->muxer_properties,
3682             _set_property_from_structure, splitmux->muxer);
3683     } else {
3684       /* Ensure it's not in locked state (we might be reusing an old element) */
3685       gst_element_set_locked_state (provided_muxer, FALSE);
3686       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3687         g_warning ("Could not add muxer element - splitmuxsink will not work");
3688         gst_object_unref (provided_muxer);
3689         goto fail;
3690       }
3691
3692       splitmux->muxer = provided_muxer;
3693       gst_object_unref (provided_muxer);
3694     }
3695
3696     if (splitmux->use_robust_muxing) {
3697       update_muxer_properties (splitmux);
3698     }
3699   }
3700
3701   return TRUE;
3702 fail:
3703   return FALSE;
3704 }
3705
3706 static GstElement *
3707 find_sink (GstElement * e)
3708 {
3709   GstElement *res = NULL;
3710   GstIterator *iter;
3711   gboolean done = FALSE;
3712   GValue data = { 0, };
3713
3714   if (!GST_IS_BIN (e))
3715     return e;
3716
3717   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3718     return e;
3719
3720   iter = gst_bin_iterate_sinks (GST_BIN (e));
3721   while (!done) {
3722     switch (gst_iterator_next (iter, &data)) {
3723       case GST_ITERATOR_OK:
3724       {
3725         GstElement *child = g_value_get_object (&data);
3726         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3727                 "location") != NULL) {
3728           res = child;
3729           done = TRUE;
3730         }
3731         g_value_reset (&data);
3732         break;
3733       }
3734       case GST_ITERATOR_RESYNC:
3735         gst_iterator_resync (iter);
3736         break;
3737       case GST_ITERATOR_DONE:
3738         done = TRUE;
3739         break;
3740       case GST_ITERATOR_ERROR:
3741         g_assert_not_reached ();
3742         break;
3743     }
3744   }
3745   g_value_unset (&data);
3746   gst_iterator_free (iter);
3747
3748   return res;
3749 }
3750
3751 static gboolean
3752 create_sink (GstSplitMuxSink * splitmux)
3753 {
3754   GstElement *provided_sink = NULL;
3755
3756   if (splitmux->active_sink == NULL) {
3757
3758     GST_OBJECT_LOCK (splitmux);
3759     if (splitmux->provided_sink != NULL)
3760       provided_sink = gst_object_ref (splitmux->provided_sink);
3761     GST_OBJECT_UNLOCK (splitmux);
3762
3763     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3764         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3765       if ((splitmux->sink =
3766               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3767         goto fail;
3768       splitmux->active_sink = splitmux->sink;
3769     } else if (splitmux->async_finalize) {
3770       if ((splitmux->sink =
3771               create_element (splitmux, splitmux->sink_factory, "sink",
3772                   TRUE)) == NULL)
3773         goto fail;
3774       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3775         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3776             splitmux->sink_preset);
3777       if (splitmux->sink_properties)
3778         gst_structure_foreach (splitmux->sink_properties,
3779             _set_property_from_structure, splitmux->sink);
3780       splitmux->active_sink = splitmux->sink;
3781     } else {
3782       /* Ensure the sink starts in locked state and NULL - it will be changed
3783        * by the filename setting code */
3784       gst_element_set_locked_state (provided_sink, TRUE);
3785       gst_element_set_state (provided_sink, GST_STATE_NULL);
3786       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3787         g_warning ("Could not add sink elements - splitmuxsink will not work");
3788         gst_object_unref (provided_sink);
3789         goto fail;
3790       }
3791
3792       splitmux->active_sink = provided_sink;
3793
3794       /* The bin holds a ref now, we can drop our tmp ref */
3795       gst_object_unref (provided_sink);
3796
3797       /* Find the sink element */
3798       splitmux->sink = find_sink (splitmux->active_sink);
3799       if (splitmux->sink == NULL) {
3800         g_warning
3801             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3802         goto fail;
3803       }
3804     }
3805
3806 #if 1
3807     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3808             "async") != NULL) {
3809       /* async child elements are causing state change races and weird
3810        * failures, so let's try and turn that off */
3811       g_object_set (splitmux->sink, "async", FALSE, NULL);
3812     }
3813 #endif
3814
3815     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3816       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3817       goto fail;
3818     }
3819   }
3820
3821   return TRUE;
3822 fail:
3823   return FALSE;
3824 }
3825
3826 #ifdef __GNUC__
3827 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3828 #endif
3829 static void
3830 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3831 {
3832   gchar *fname = NULL;
3833   GstSample *sample;
3834   GstCaps *caps;
3835
3836   gst_splitmux_sink_ensure_max_files (splitmux);
3837
3838   if (ctx->cur_out_buffer == NULL) {
3839     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3840   }
3841
3842   caps = gst_pad_get_current_caps (ctx->srcpad);
3843   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3844   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3845       splitmux->fragment_id, sample, &fname);
3846   gst_sample_unref (sample);
3847   if (caps)
3848     gst_caps_unref (caps);
3849
3850   if (fname == NULL) {
3851     /* Fallback to the old signal if the new one returned nothing */
3852     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3853         splitmux->fragment_id, &fname);
3854   }
3855
3856   if (!fname)
3857     fname = splitmux->location ?
3858         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3859
3860   if (fname) {
3861     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3862     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3863             "location") != NULL)
3864       g_object_set (splitmux->sink, "location", fname, NULL);
3865     g_free (fname);
3866   }
3867
3868   splitmux->fragment_id++;
3869 }
3870
3871 /* called with GST_SPLITMUX_LOCK */
3872 static void
3873 do_async_start (GstSplitMuxSink * splitmux)
3874 {
3875   GstMessage *message;
3876
3877   if (!splitmux->need_async_start) {
3878     GST_INFO_OBJECT (splitmux, "no async_start needed");
3879     return;
3880   }
3881
3882   splitmux->async_pending = TRUE;
3883
3884   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3885   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3886
3887   GST_SPLITMUX_UNLOCK (splitmux);
3888   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3889       (splitmux), message);
3890   GST_SPLITMUX_LOCK (splitmux);
3891 }
3892
3893 /* called with GST_SPLITMUX_LOCK */
3894 static void
3895 do_async_done (GstSplitMuxSink * splitmux)
3896 {
3897   GstMessage *message;
3898
3899   if (splitmux->async_pending) {
3900     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3901     splitmux->async_pending = FALSE;
3902     GST_SPLITMUX_UNLOCK (splitmux);
3903
3904     message =
3905         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3906         GST_CLOCK_TIME_NONE);
3907     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3908         (splitmux), message);
3909     GST_SPLITMUX_LOCK (splitmux);
3910   }
3911
3912   splitmux->need_async_start = FALSE;
3913 }
3914
3915 static void
3916 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3917 {
3918   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3919   splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3920
3921   splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3922   splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3923   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3924
3925   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3926   g_queue_clear (&splitmux->pending_input_gops);
3927
3928   splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3929   splitmux->fragment_total_bytes = 0;
3930   splitmux->fragment_reference_bytes = 0;
3931   splitmux->muxed_out_bytes = 0;
3932   splitmux->ready_for_output = FALSE;
3933
3934   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3935   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3936
3937   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3938   gst_queue_array_clear (splitmux->times_to_split);
3939
3940   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3941   splitmux->queued_keyframes = 0;
3942
3943   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3944   g_queue_clear (&splitmux->out_cmd_q);
3945 }
3946
3947 static GstStateChangeReturn
3948 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3949 {
3950   GstStateChangeReturn ret;
3951   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3952
3953   switch (transition) {
3954     case GST_STATE_CHANGE_NULL_TO_READY:{
3955       GST_SPLITMUX_LOCK (splitmux);
3956       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3957         ret = GST_STATE_CHANGE_FAILURE;
3958         GST_SPLITMUX_UNLOCK (splitmux);
3959         goto beach;
3960       }
3961       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3962       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3963       GST_SPLITMUX_UNLOCK (splitmux);
3964       splitmux->fragment_id = splitmux->start_index;
3965       break;
3966     }
3967     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3968       GST_SPLITMUX_LOCK (splitmux);
3969       /* Make sure contexts and tracking times are cleared, in case we're being reused */
3970       gst_splitmux_sink_reset (splitmux);
3971       /* Start by collecting one input on each pad */
3972       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3973       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3974
3975       GST_SPLITMUX_UNLOCK (splitmux);
3976
3977       GST_SPLITMUX_STATE_LOCK (splitmux);
3978       splitmux->shutdown = FALSE;
3979       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3980       break;
3981     }
3982     case GST_STATE_CHANGE_PAUSED_TO_READY:
3983     case GST_STATE_CHANGE_READY_TO_READY:
3984       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3985       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3986       /* Fall through */
3987     case GST_STATE_CHANGE_READY_TO_NULL:
3988       GST_SPLITMUX_STATE_LOCK (splitmux);
3989       splitmux->shutdown = TRUE;
3990       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3991
3992       GST_SPLITMUX_LOCK (splitmux);
3993       gst_splitmux_sink_reset (splitmux);
3994       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3995       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3996       /* Wake up any blocked threads */
3997       GST_LOG_OBJECT (splitmux,
3998           "State change -> NULL or READY. Waking threads");
3999       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
4000       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
4001       GST_SPLITMUX_UNLOCK (splitmux);
4002       break;
4003     default:
4004       break;
4005   }
4006
4007   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4008   if (ret == GST_STATE_CHANGE_FAILURE)
4009     goto beach;
4010
4011   switch (transition) {
4012     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4013       splitmux->need_async_start = TRUE;
4014       break;
4015     case GST_STATE_CHANGE_READY_TO_PAUSED:{
4016       /* Change state async, because our child sink might not
4017        * be ready to do that for us yet if it's state is still locked */
4018
4019       splitmux->need_async_start = TRUE;
4020       /* we want to go async to PAUSED until we managed to configure and add the
4021        * sink */
4022       GST_SPLITMUX_LOCK (splitmux);
4023       do_async_start (splitmux);
4024       GST_SPLITMUX_UNLOCK (splitmux);
4025       ret = GST_STATE_CHANGE_ASYNC;
4026       break;
4027     }
4028     case GST_STATE_CHANGE_READY_TO_NULL:
4029       GST_SPLITMUX_LOCK (splitmux);
4030       splitmux->fragment_id = 0;
4031       /* Reset internal elements only if no pad contexts are using them */
4032       if (splitmux->contexts == NULL)
4033         gst_splitmux_reset_elements (splitmux);
4034       do_async_done (splitmux);
4035       GST_SPLITMUX_UNLOCK (splitmux);
4036       break;
4037     default:
4038       break;
4039   }
4040
4041   return ret;
4042
4043 beach:
4044   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4045     /* Cleanup elements on failed transition out of NULL */
4046     gst_splitmux_reset_elements (splitmux);
4047     GST_SPLITMUX_LOCK (splitmux);
4048     do_async_done (splitmux);
4049     GST_SPLITMUX_UNLOCK (splitmux);
4050   }
4051   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4052     /* READY to READY transition only happens when we're already
4053      * in READY state, but a child element is in NULL, which
4054      * happens when there's an error changing the state of the sink.
4055      * We need to make sure not to fail the state transition, or
4056      * the core won't transition us back to NULL successfully */
4057     ret = GST_STATE_CHANGE_SUCCESS;
4058   }
4059   return ret;
4060 }
4061
4062 static void
4063 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4064 {
4065   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4066     splitmux->fragment_id = 0;
4067   }
4068 }
4069
4070 static void
4071 split_now (GstSplitMuxSink * splitmux)
4072 {
4073   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4074 }
4075
4076 static void
4077 split_after (GstSplitMuxSink * splitmux)
4078 {
4079   g_atomic_int_set (&(splitmux->split_requested), TRUE);
4080 }
4081
4082 static void
4083 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4084 {
4085   gboolean send_keyframe_requests;
4086
4087   GST_SPLITMUX_LOCK (splitmux);
4088   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4089   send_keyframe_requests = splitmux->send_keyframe_requests;
4090   GST_SPLITMUX_UNLOCK (splitmux);
4091
4092   if (send_keyframe_requests) {
4093     GstEvent *ev =
4094         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4095     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4096         GST_TIME_ARGS (split_time));
4097     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4098       GST_WARNING_OBJECT (splitmux,
4099           "Could not request keyframe at %" GST_TIME_FORMAT,
4100           GST_TIME_ARGS (split_time));
4101     }
4102   }
4103 }